1. Descripción general de la arquitectura

WIS2 Downloader es un sistema distribuido con cuatro componentes principales:

                         Docker Compose
  ┌──────────────────────────────────────────────────────────┐
  │                                                           │
  │  Web UI (NiceGUI :8080)                                   │
  │    - reads GDC catalogue data from Redis cache            │
  │    - posts subscriptions to Subscription Manager API      │
  │                          │                                │
  │                          ▼                                │
  │  Subscription Manager (Flask API :5001)                   │
  │    - persists subscriptions in Redis                      │
  │    - publishes subscribe/unsubscribe commands via PubSub  │
  │                          │                                │
  │              Redis PubSub (commands)                      │
  │                          │                                │
  │                          ▼                                │
  │  Subscriber (MQTT Client)                                 │
  │    - connects to WIS2 Global Broker                       │
  │    - on notification: queues Celery download task         │
  │                          │                                │
  │              Redis (Celery queue + dedup state)           │
  │                          │                                │
  │                          ▼                                │
  │  Celery Workers                                           │
  │    - download file, verify hash, apply filters, save      │
  │                                                           │
  └──────────────────────────────────────────────────────────┘

1.1. Flujo de datos

  1. Al inicio, la interfaz web obtiene registros WCMP2 desde los tres puntos de acceso GDC (o caché Redis) y construye una jerarquía de temas en memoria

  2. El usuario explora la vista Catálogo o Árbol y selecciona un tema en la interfaz web

  3. El usuario hace clic en Suscribirse → la interfaz web envía un POST de suscripción a la API REST del gestor de suscripciones

  4. El gestor de suscripciones persiste la suscripción en Redis y publica un comando de suscripción en el canal PubSub de Redis

  5. El suscriptor recibe el comando y se suscribe al tema MQTT en el bróker global WIS2

  6. Cuando llega una notificación WIS2, el suscriptor pone en cola una tarea de descarga Celery

  7. El worker Celery descarga el archivo, verifica el hash, aplica filtros y guarda en disco

2. Estructura de módulos

modules/
├── shared/                 # Shared utilities
│   └── shared/
│       ├── __init__.py
│       ├── logging.py      # Centralized logging
│       └── redis_client.py # Redis client singleton
│
├── subscriber/             # MQTT subscriber service
│   └── subscriber/
│       ├── __init__.py
│       ├── manager.py      # Entry point, thread management
│       ├── subscriber.py   # MQTT client wrapper
│       └── command_listener.py  # Redis PubSub listener
│
├── subscription_manager/   # REST API service
│   └── subscription_manager/
│       ├── __init__.py
│       ├── app.py          # Flask application
│       └── static/
│           └── openapi.yml # API specification
│
├── task_manager/           # Celery tasks
│   └── task_manager/
│       ├── __init__.py
│       ├── worker.py       # Celery app configuration
│       ├── tasks/
│       │   └── wis2.py     # Download tasks
│       └── workflows/
│           └── __init__.py # Task chains
│
└── ui/                     # NiceGUI web interface (port 8080)
    ├── main.py             # Entry point, page route, AppState
    ├── layout.py           # PageLayout builder
    ├── config.py           # Environment variable config
    ├── data.py             # GDC data layer — fetch, merge, cache, hierarchy
    ├── assets/
    │   └── base.css        # All custom CSS
    ├── components/         # Reusable layout components
    ├── i18n/               # Internationalisation
    │   ├── __init__.py     # t(), current_lang(), is_rtl(), LANGUAGES
    │   ├── en.py           # English strings (source of truth)
    │   ├── fr.py           # French
    │   ├── es.py           # Spanish
    │   ├── ar.py           # Arabic (RTL)
    │   ├── zh.py           # Chinese
    │   └── ru.py           # Russian
    ├── models/
    │   └── wcmp2.py        # WCMP2Record dataclass
    └── views/              # Page views (catalogue, tree, subscriptions, etc.)

3. Detalles de módulos

3.1. Módulo compartido

Proporciona utilidades comunes usadas en todos los servicios.

3.1.1. Cliente Redis

modules/shared/shared/redis_client.py
from shared import get_redis_client

# Obtener el cliente Redis (singleton)
redis = get_redis_client()

# Usar como cliente Redis normal
redis.set('key', 'value')
redis.get('key')

El cliente:

  • Se conecta directamente al servidor Redis

  • Almacena en caché la conexión (patrón singleton)

  • Incluye lógica de reintento para fallos transitorios

3.1.2. Registro

modules/shared/shared/logging.py
from shared import setup_logging

# Configurar el registrador raíz (llamar una vez al inicio)
setup_logging()

# Obtener un registrador específico del módulo
LOGGER = setup_logging(__name__)

LOGGER.info("Message with UTC timestamp")

Características:

  • Marcas de tiempo UTC en formato ISO 8601

  • Configurable mediante la variable de entorno LOG_LEVEL

  • Salida a stdout para recolección de registros Docker

3.2. Módulo suscriptor

Se conecta al bróker global WIS2 mediante MQTT y procesa las notificaciones entrantes.

3.2.1. Punto de entrada

modules/subscriber/subscriber/manager.py
def run_manager():
    # 1. Create MQTT subscriber
    mqtt_subscriber = Subscriber(**broker_config)

    # 2. Create Redis command listener
    redis_listener = CommandListener(
        subscriber=mqtt_subscriber,
        channel=COMMAND_CHANNEL
    )

    # 3. Start MQTT in separate thread
    mqtt_thread = threading.Thread(target=mqtt_subscriber.start, daemon=True)
    mqtt_thread.start()

    # 4. Start Redis listener (blocks)
    redis_listener.start()

3.2.2. Suscriptor MQTT

modules/subscriber/subscriber/subscriber.py
class Subscriber:
    def __init__(self, host, port, uid, pwd, protocol, session):
        # Configure MQTT client with callbacks
        self.client = mqtt.Client(...)
        self.client.on_message = self._on_message

    def _on_message(self, client, userdata, msg):
        # Parse notification, create Celery task
        job = {
            "topic": msg.topic,
            "target": target,
            "filters": filters,
            "payload": json.loads(msg.payload)
        }
        workflow = wis2_download(job)
        workflow.apply_async()

    def subscribe(self, topic, target, filters):
        self.client.subscribe(topic, qos=0)
        self.active_subscriptions[topic] = {...}

    def unsubscribe(self, topic):
        self.client.unsubscribe(topic)
        del self.active_subscriptions[topic]

3.2.3. Oyente de comandos

modules/subscriber/subscriber/command_listener.py
class CommandListener(threading.Thread):
    def __init__(self, subscriber, channel):
        self.subscriber = subscriber
        self.pubsub = get_redis_client().pubsub()

    def run(self):
        self.pubsub.subscribe(self.channel)
        while not self.stop_event.is_set():
            message = self.pubsub.get_message()
            if message:
                self._process_command(message)

    def _process_command(self, message):
        command = json.loads(message['data'])
        if command['action'] == 'subscribe':
            self.subscriber.subscribe(
                command['topic'],
                command['save_path'],
                command['filters']
            )
        elif command['action'] == 'unsubscribe':
            self.subscriber.unsubscribe(command['topic'])

3.3. Módulo gestor de suscripciones

API REST Flask para gestionar suscripciones.

3.3.1. Aplicación Flask

modules/subscription_manager/subscription_manager/app.py
@app.post('/subscriptions')
def add_subscription():
    data = get_json()
    topic = normalise_topic(data.get('topic'))
    target = normalise_path(data.get('target', ''))
    filters = data.get('filters', {})

    # Publish command to subscriber via Redis PubSub
    command = {
        "action": "subscribe",
        "topic": topic,
        "save_path": target,
        "filters": filters
    }
    publish_command(command, COMMAND_CHANNEL)

    # Persist to Redis for durability
    persist_subscription(topic, target, filters)

    return jsonify({"status": "accepted", ...}), 202

3.3.2. Punto de acceso de métricas

Las métricas se almacenan en Redis mediante operaciones atómicas HINCRBYFLOAT, lo que las hace seguras en múltiples contenedores de workers Celery. El punto de acceso lee de Redis y genera el formato de texto Prometheus:

@app.route('/metrics')
def expose_metrics():
    from shared.redis_metrics import generate_prometheus_text, set_gauge
    set_gauge('celery_queue_length', {}, get_queue_length())
    return Response(generate_prometheus_text(), mimetype='text/plain')

Incremente un contador desde cualquier worker:

from shared import incr_counter
incr_counter('downloads_total', {'cache': hostname, 'media_type': mime_type})

3.4. Módulo gestor de tareas

Workers Celery para descargar y procesar archivos.

3.4.1. Configuración de Celery

modules/task_manager/task_manager/worker.py
app = Celery('tasks',
             broker=CELERY_BROKER_URL,
             result_backend=CELERY_RESULT_BACKEND)

# Autodescubrimiento de tareas
app.autodiscover_tasks(['task_manager.tasks', 'task_manager.tasks.wis2'])

3.4.2. Tarea de descarga

modules/task_manager/task_manager/tasks/wis2.py
@app.task(bind=True)
@metrics_collector
def download_from_wis2(self, job):
    result = {...}  # Initialize result dict

    # 1. Extract identifiers
    message_id = job['payload']['id']
    data_id = job['payload']['properties']['data_id']
    filehash = job['payload']['properties']['integrity']['value']

    # 2. Deduplication check
    for key, type in [(message_id, 'by-msg-id'), ...]:
        if get_status(key, type) == STATUS_SUCCESS:
            result['status'] = STATUS_SKIPPED
            return result

    # 3. Acquire lock
    lock_acquired = redis_client.set(lock_key, 1, nx=True, ex=LOCK_EXPIRE)
    if not lock_acquired:
        raise self.retry(countdown=10, max_retries=10)

    # 4. Download file
    response = _pool.request('GET', download_url, ...)

    # 5. Verify hash
    if hash_expected and hash_base64 != hash_expected:
        result['status'] = STATUS_FAILED
        return result

    # 6. Check media type filter
    if not _is_allowed_media_type(file_type, filters):
        result['status'] = STATUS_SKIPPED
        return result

    # 7. Save file
    output_path.write_bytes(data)
    result['status'] = STATUS_SUCCESS
    return result

3.4.3. Cadenas de flujo de trabajo

modules/task_manager/task_manager/workflows/init.py
def wis2_download(args):
    # Chain: download -> decode/ingest
    workflow = download_from_wis2.s(args) | decode_and_ingest.s()
    return workflow

3.4.4. Planificador (tareas periódicas)

Una aplicación Celery separada (scheduler.py) gestiona las tareas de mantenimiento mediante Celery Beat. Utiliza las bases de datos Redis 2/3 para no interferir con las colas del worker de descarga (bases de datos 0/1). Las tareas periódicas se definen en tasks/scheduled_tasks.py:

Tarea Intervalo Propósito

check_disk_space

Cada 5 min

Establece los indicadores disk_total_bytes, disk_used_bytes, disk_free_bytes

clean_directory

Cada 10 min

Elimina archivos más antiguos que DOWNLOAD_RETENTION_PERIOD días; decrementa disk_downloads_bytes por cada archivo eliminado

recalibrate_downloads_size

Diariamente

os.walk completo para corregir cualquier desviación en el indicador disk_downloads_bytes

El planificador se inicia como dos servicios Docker Compose: celery-scheduler-workers (ejecuta las tareas) y celery-beats (las activa según el calendario).

4. Módulo de interfaz

La interfaz es una aplicación web NiceGUI servida en el puerto 8080. Es la interfaz principal para que los usuarios descubran conjuntos de datos, exploren la jerarquía de temas WIS2 y creen suscripciones.

4.1. Estructura del módulo

modules/ui/
├── main.py                  # Application entry point, page route, AppState
├── layout.py                # PageLayout builder
├── config.py                # Environment variable config (SUBSCRIPTION_MANAGER_URL etc.)
├── data.py                  # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│   └── base.css             # All custom CSS (inside @layer components)
├── components/
│   ├── header.py            # WMO banner + toolbar + language selector
│   ├── footer.py            # Footer bar
│   ├── navigation_drawer.py # Left nav drawer with view links
│   └── page_body.py         # Main content area + right sidebar
├── i18n/                    # Internationalisation
│   ├── __init__.py          # t(), current_lang(), is_rtl(), LANGUAGES
│   ├── en.py                # English strings (source of truth)
│   ├── fr.py                # French
│   ├── es.py                # Spanish
│   ├── ar.py                # Arabic (RTL)
│   ├── zh.py                # Chinese
│   └── ru.py                # Russian
├── models/
│   └── wcmp2.py             # WCMP2Record dataclass (GeoJSON Feature)
└── views/
    ├── dashboard.py         # Grafana iframe embed
    ├── catalogue.py         # Catalogue search + result cards
    ├── tree.py              # Topic hierarchy tree
    ├── subscriptions.py     # Active subscriptions list
    ├── settings.py          # GDC status + refresh trigger
    └── shared.py            # Shared sidebar logic (on_topics_picked, confirm_subscribe, show_metadata)

4.2. Capa de datos (data.py)

La capa de datos es responsable de obtener registros WCMP2 de los tres catálogos de descubrimiento globales WIS2 (GDC), fusionarlos y construir dos cachés a nivel de módulo que leen todas las vistas.

4.2.1. Fuentes GDC

Los registros se obtienen de tres puntos de acceso GDC públicos al inicio:

Nombre corto URL

CMA

https://gdc.wis.cma.cn

DWD

https://wis2.dwd.de/gdc

ECCC

https://wis2-gdc.weather.gc.ca

Las respuestas JSON brutas se almacenan en caché en Redis bajo las claves gdc:cache:CMA, gdc:cache:DWD y gdc:cache:ECCC con un TTL controlado por GDC_CACHE_TTL_SECONDS (6 horas por defecto). Redis es opcional — si no está disponible, los datos se obtienen por HTTP en cada inicio.

4.2.2. Registros fusionados

Después de cargar todas las fuentes GDC, _build_merged_records() deduplica los registros por ID entre catálogos:

  • Los registros presentes en un solo catálogo aparecen una vez con una única entrada source_gdcs.

  • Los registros presentes en múltiples catálogos se fusionan: source_gdcs lista todos los catálogos contribuyentes.

  • has_discrepancy se establece en True si las propiedades, geometría o enlaces difieren entre catálogos.

  • Los enlaces se unen entre catálogos para que los datos de canal presentes en cualquier GDC se conserven en el registro fusionado.

El resultado se almacena en la lista _merged_records a nivel de módulo y se devuelve por merged_records().

4.2.3. Jerarquía de temas

_build_topic_hierarchy() itera sobre _merged_records e inserta el primer canal MQTT cache/ de cada registro en un dict anidado:

{
  "cache": {
    "children": {
      "a": {
        "children": {
          "wis2": {
            "children": {
              "de-dwd": {
                "children": {
                  ...
                  "synop": {
                    "datasets": [<WCMP2Record>, ...]
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

Un nodo puede contener tanto "children" como "datasets" si un canal es prefijo de otro.

La jerarquía se almacena en _topic_hierarchy y se devuelve por topic_hierarchy(). La consume tree.py para construir el widget ui.tree, y get_datasets_for_channel() para resolver conjuntos de datos para un tema dado.

4.2.4. Funciones clave

Función Descripción

merged_records() → list[MergedRecord]

Devuelve la lista de registros fusionados en caché

topic_hierarchy() → dict

Devuelve la jerarquía de temas en caché

get_datasets_for_channel(channel) → list[WCMP2Record]

Elimina el /# final, navega la jerarquía y recopila recursivamente todos los conjuntos de datos de ese nodo y sus descendientes

scrape_all(force=False)

Obtiene datos GDC (caché Redis primero, luego HTTP), reconstruye _merged_records y _topic_hierarchy

4.3. Modelo WCMP2 (models/wcmp2.py)

WCMP2Record es una representación dataclass de una Feature GeoJSON de metadatos de descubrimiento WIS2.

from models.wcmp2 import WCMP2Record

rec = WCMP2Record.from_dict(feature_dict)

rec.id                   # str - unique dataset identifier
rec.title                # str | None
rec.description          # str | None
rec.keywords             # list[str]
rec.wmo_data_policy      # 'core' | 'recommended' | None
rec.geometry             # Geometry | None  (.type, .coordinates)
rec.links                # list[Link]  (.channel, .href, .rel, .extra)
rec.mqtt_channels        # list[str]  - channels from all links

Link.extra es un dict que captura claves fuera del esquema de la respuesta GDC (ej. filters específicos del GDC usados para campos de filtro personalizados en el panel lateral de suscripción).

4.4. AppState

Cada sesión del navegador tiene su propia instancia AppState (definida en main.py):

class AppState:
    def __init__(self):
        self.selected_topics: list[str] = []
        self.current_view: str = 'dashboard'

selected_topics contiene los temas MQTT actualmente seleccionados en la interfaz y controla el panel lateral derecho. current_view rastrea el nombre de la vista activa y se persiste en app.storage.user['current_view'] antes de cualquier recarga de página (ej. al cambiar de idioma) para que el usuario regrese a la misma vista.

4.5. Vistas

Todas las vistas siguen la misma convención: una función render(container, …​) que construye los elementos NiceGUI dentro del elemento contenedor proporcionado. render siempre es una función simple (no asíncrona) para que pueda ser llamada sincrónicamente desde show_view() en main.py.

views/catalogue.py proporciona búsqueda de texto completo en merged_records() con funciones de filtro puras:

Función Descripción

filter_feature(record, query)

Hace coincidir la consulta con ID, título, descripción, palabras clave y conceptos temáticos

filter_by_data_policy(record, policy)

Hace coincidir 'core', 'recommended' o 'all'

filter_by_keywords(record, keywords)

Palabras clave separadas por comas; todas deben aparecer en el registro

filter_by_bbox(record, bbox)

Intersección geométrica Shapely con el bounding box dado

Las tarjetas de resultados son renderizadas por update_search_results(), que también gestiona la paginación.

4.5.2. Vista de árbol

views/tree.py convierte topic_hierarchy() en nodos ui.tree mediante _to_tree_nodes() (recursivo, ordenado alfabéticamente). La selección usa on_select para imponer la selección de un único nodo — on_tick no se usa porque entra en conflicto con la sincronización interna de estado de NiceGUI.

4.5.3. Panel lateral compartido (views/shared.py)

on_topics_picked(e, state, layout, is_page_selection, dataset_id) es el manejador central para la selección desde catálogo y árbol. Él:

  1. Actualiza state.selected_topics

  2. Construye el panel lateral derecho con entrada de directorio de guardado, filtros de conjuntos de datos/tipos de medios/bbox/fecha y filtros personalizados opcionales

  3. Conecta el botón Suscribirse a confirm_subscribe()

confirm_subscribe() muestra un diálogo con la carga útil JSON completa antes de llamar a subscribe_to_topics().

show_metadata(dataset_id) busca el registro en merged_records() y renderiza un diálogo de detalles con un mapa Leaflet interactivo.

4.6. Agregar una nueva vista

  1. Cree modules/ui/views/myview.py con una función render(container), usando t() para todas las cadenas visibles al usuario

  2. Agregue una entrada de navegación en components/navigation_drawer.py — la lista NAV_ITEMS acepta una tupla (view_id, label_key, icon) donde label_key es una clave i18n nav.*

  3. Agregue una rama en el despachador show_view() en main.py

  4. Agregue cualquier nueva clase CSS en assets/base.css dentro del bloque @layer components { …​ }

  5. Agregue todas las nuevas claves de cadenas en i18n/en.py y en cada otro archivo de idioma — ver Internacionalización (i18n)

4.7. Internacionalización (i18n)

La interfaz soporta múltiples idiomas mediante un sistema i18n ligero basado en diccionarios en modules/ui/i18n/.

4.7.1. Idiomas compatibles

Código Idioma Dirección

en

English

LTR

fr

Français

LTR

es

Español

LTR

ar

العربية

RTL

zh

中文

LTR

ru

Русский

LTR

El idioma activo se almacena por sesión de navegador en app.storage.user['lang'] y por defecto es inglés. Los usuarios cambian de idioma mediante el selector en la barra de herramientas del encabezado; la página se recarga para aplicar el cambio.

4.7.2. Cómo funciona t()

from i18n import t

# Búsqueda simple
label = t('nav.dashboard')                      # → 'Dashboard'

# Con interpolación (usa str.format)
label = t('subscriptions.folder', path='./')    # → 'Folder: ./'

La cadena de búsqueda es:

  1. Archivo del idioma elegido

  2. Inglés (en.py) — alternativa si la clave falta en el idioma elegido

  3. La cadena de la clave misma — alternativa si falta del inglés también (visible en desarrollo)

t() lee app.storage.user['lang'] de la sesión de solicitud NiceGUI actual. Siempre llámelo en tiempo de renderizado (dentro de un manejador @ui.page o un callback de evento NiceGUI), nunca en tiempo de importación del módulo.
Las cadenas que contienen caracteres { o } literales (por ejemplo, ejemplos JSON en texto de ayuda) deben usar llaves simples tal cual. No use el escape {{ / }}t() solo llama a .format(**kwargs) cuando se pasan argumentos de palabra clave, por lo que {{ se mostraría literalmente.

4.7.3. Convenciones de nomenclatura de claves

Las claves usan espacios de nombres separados por puntos:

Prefijo Alcance Ejemplo

nav.*

Etiquetas del cajón de navegación

nav.dashboard

btn.*

Etiquetas de botones

btn.subscribe

sidebar.*

Panel lateral de suscripción

sidebar.save_directory

catalogue.*

Vista de catálogo

catalogue.search_label

tree.*

Vista de árbol

tree.filter_label

subscriptions.*

Vista Gestionar suscripciones

subscriptions.folder

settings.*

Vista Configuración

settings.title

manual.*

Vista Suscripción manual

manual.topic_label

manual.val.*

Mensajes de validación Suscripción manual

manual.val.topic_required

dialog.*

Títulos de diálogo

dialog.confirm_title

metadata.*

Diálogo de metadatos

metadata.title

validation.*

Mensajes de validación compartidos

validation.date_format

footer.*

Pie de página

footer.copyright

aria.*

Etiquetas de accesibilidad ARIA

aria.toggle_nav

4.7.4. Compatibilidad con RTL

El árabe (ar) es de derecha a izquierda. Cuando se selecciona árabe, is_rtl() devuelve True y el manejador on_connect() de main.py inyecta dir="rtl" en el elemento <html> mediante JavaScript, activando el modo de diseño RTL de Quasar. Las anulaciones CSS personalizadas en assets/base.css (bajo el bloque de comentarios RTL overrides) corrigen el relleno físico de Quasar en .q-page-container y reposicionan el panel lateral de suscripción.

Para agregar otro idioma RTL, añada su código a RTL_LANGUAGES en init.py — no se necesitan otros cambios.

4.7.5. Agregar un nuevo idioma

Paso 1 — Crear el archivo de idioma

Copie modules/ui/i18n/en.py a modules/ui/i18n/{code}.py y traduzca todos los valores. Conserve cada clave; no elimine ninguna. Preserve los nombres {placeholder} exactamente como aparecen en la fuente en inglés:

# modules/ui/i18n/de.py
"""German strings."""

STRINGS: dict[str, str] = {
    'nav.dashboard':        'Dashboard',
    'nav.catalogue':        'Katalogsuche',
    'nav.tree':             'Baumsuche',
    'nav.manual':           'Manuell abonnieren',
    'nav.manage':           'Abonnements verwalten',
    'nav.settings':         'Einstellungen',
    # ... all remaining keys ...
    'subscriptions.folder': 'Ordner: {path}',   # {path} must be preserved
    'settings.records':     '{name}: {count} Einträge',
}
Paso 2 — Registrar el idioma en init.py
from . import ar, de, en, es, fr, ru, zh        (1)

LANGUAGES: dict[str, str] = {
    'en': 'English',
    'fr': 'Français',
    'es': 'Español',
    'ar': 'العربية',
    'zh': '中文',
    'ru': 'Русский',
    'de': 'Deutsch',                             (2)
}

_STRINGS: dict[str, dict[str, str]] = {
    'en': en.STRINGS,
    'fr': fr.STRINGS,
    'es': es.STRINGS,
    'ar': ar.STRINGS,
    'zh': zh.STRINGS,
    'ru': ru.STRINGS,
    'de': de.STRINGS,                            (3)
}
1 Agregar el import
2 Agregar el nombre de visualización (en el script nativo)
3 Agregar el mapeo de cadenas
Paso 3 — Marcar como RTL si es necesario

Si el idioma es de derecha a izquierda (ej. hebreo he, persa fa):

RTL_LANGUAGES: frozenset[str] = frozenset({'ar', 'he'})
Paso 4 — Reiniciar la interfaz
docker-compose restart wis2downloader-ui

El nuevo idioma aparecerá inmediatamente en el selector de idioma del encabezado.

Las traducciones no inglesas existentes son generadas automáticamente y se proporcionan solo como punto de partida. Todas las cadenas deben ser revisadas por un hablante nativo antes del despliegue en producción, con especial atención a los términos del dominio WMO/meteorológico (WIS2, BUFR, GRIB, caché global, etc.) que tienen traducciones establecidas en los documentos oficiales de WMO.

4.7.6. Agregar nuevas cadenas traducibles

Al agregar texto de interfaz a cualquier vista o componente:

Paso 1 — Agregar en en.py (fuente de verdad)
# modules/ui/i18n/en.py
'myview.title':       'My New View',
'myview.description': 'Showing results for {topic}.',
Paso 2 — Agregar en todos los demás archivos de idioma

Copie el valor en inglés como marcador de posición si una traducción no está disponible aún. t() recurre al inglés automáticamente, pero tener la clave presente evita vacíos en las herramientas:

# fr.py / es.py / ar.py / zh.py / ru.py
'myview.title':       'My New View',         # TODO: translate
'myview.description': 'Showing results for {topic}.',
Paso 3 — Usar t() en la vista
from i18n import t

def render(container):
    with container:
        ui.label(t('myview.title')).classes('page-title')
        ui.label(t('myview.description', topic=selected_topic))

5. Comunicación entre servicios

5.1. Redis PubSub (Comandos)

La comunicación gestor de suscripciones → suscriptor usa Redis PubSub en el canal subscription_commands:

// Channel: subscription_commands

// Subscribe command
{
  "action": "subscribe",
  "topic": "cache/a/wis2/+/data/#",
  "save_path": "all-data",
  "filters": {"accepted_media_types": ["application/bufr"]}
}

// Unsubscribe command
{
  "action": "unsubscribe",
  "topic": "cache/a/wis2/+/data/#"
}

5.2. Tareas Celery (Descargas)

La comunicación suscriptor → worker Celery usa la cola de tareas Celery:

job = {
    "topic": "cache/a/wis2/de-dwd/data/...",
    "target": "dwd-data",
    "filters": {"accepted_media_types": [...]},
    "_broker": "globalbroker.meteo.fr",
    "_received": "2026-01-28 10:15:30",
    "_queued": "2026-01-28 10:15:30",
    "payload": {
        "id": "...",
        "properties": {
            "data_id": "...",
            "metadata_id": "...",
            "integrity": {"method": "sha512", "value": "..."}
        },
        "links": [
            {"rel": "canonical", "href": "https://..."}
        ]
    }
}

5.3. Claves Redis

Patrón de clave Propósito

global:subscriptions

Hash de todas las suscripciones (sub_id → JSON {id, topic, save_path, filter})

wis2:notification:status:{type}:{id}

Seguimiento de deduplicación (tipos: by-msg-id, by-data-id, by-hash)

wis2:notification:data:lock:{id}

Bloqueo distribuido que previene descargas concurrentes duplicadas

gdc:cache:{name}

JSON de catálogo GDC en caché (CMA, DWD, ECCC); TTL definido por GDC_CACHE_TTL_SECONDS

wis2:metrics:{metric_name}

Hash de métricas Prometheus (campo = dict JSON de etiquetas, valor = contador/indicador flotante)

subscriber:health:{id}

Latido de estado del suscriptor

celery

Cola de tareas Celery (predeterminada)

6. Extender el sistema

6.1. Agregar una nueva condición de coincidencia al motor de filtros

El motor de filtros se encuentra en modules/shared/shared/filters.py. Cada condición de coincidencia es una clave en el objeto de coincidencia despachada por _evaluate_match().

Para agregar una nueva condición integrada (por ejemplo, coincidencia en un nuevo campo de metadatos station_id):

  1. Agregue el campo a MatchContext en filters.py:

    @dataclass
    class MatchContext:
        ...
        station_id: str | None = None
  2. Rellénelo en _build_context() en wis2.py (antes y/o después de la descarga).

  3. Agregue una rama en _evaluate_match():

    if 'station_id' in match:
        return _match_string_field(ctx.station_id, match['station_id'])
  4. Documente el nuevo campo en openapi.yml y en la referencia de filtros de la guía del usuario.

No es necesario realizar cambios en app.py, command_listener.py ni subscriber.py — el objeto de filtro se transmite sin modificaciones y se evalúa en el momento de la descarga.

6.2. Agregar una nueva tarea

  1. Cree la tarea en modules/task_manager/task_manager/tasks/

  2. Registre en autodiscover de worker.py

  3. Agregue al flujo de trabajo si es necesario

# modules/task_manager/task_manager/tasks/custom.py
from task_manager.worker import app

@app.task
def my_custom_task(data):
    # Process data
    return result

6.3. Agregar una nueva métrica

Las métricas se almacenan en Redis a través de shared.redis_metrics. Para agregar una nueva métrica:

  1. Regístrela en el dict METRICS en modules/shared/shared/redis_metrics.py:

    # short name → (prometheus type, help text)
    METRICS: dict[str, tuple[str, str]] = {
        ...
        'my_counter_total': ('counter', 'Description of the counter.'),
        'my_gauge':         ('gauge',   'Description of the gauge.'),
    }
  2. Increméntela desde cualquier servicio (contador) o establezca su valor (indicador):

    from shared import incr_counter, set_gauge
    
    # Contador (por ej. en una tarea Celery)
    incr_counter('my_counter_total', {'label1': 'value', 'label2': 'value'})
    
    # Medidor (por ej. en una tarea programada)
    set_gauge('my_gauge', {'label1': 'value'}, 42.0)

Solo las métricas registradas en METRICS aparecen en la salida de /metrics.

7. Pruebas

7.1. Ejecutar pruebas

# Instalar dependencias de prueba
pip install pytest pytest-cov

# Ejecutar pruebas
pytest modules/

# Con cobertura
pytest --cov=modules modules/

7.2. Pruebas manuales

# Iniciar servicios
docker-compose up -d

# Crear suscripción de prueba
curl -X POST http://localhost:5002/subscriptions \
  -H "Content-Type: application/json" \
  -d '{"topic": "cache/a/wis2/de-dwd/data/#", "target": "test"}'

# Monitorizar registros
docker-compose logs -f subscriber celery

# Verificar descargas
ls -la downloads/test/

8. Configuración del entorno de desarrollo

8.1. Desarrollo local

# Crear entorno virtual
python -m venv venv
source venv/bin/activate

# Instalar módulos en modo editable
pip install -e modules/shared
pip install -e modules/task_manager
pip install -e modules/subscriber
pip install -e modules/subscription_manager

# Iniciar Redis (instancia única para desarrollo)
docker run -d -p 6379:6379 redis:7.2-alpine redis-server --requirepass devpassword

# Configurar entorno
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=devpassword
export FLASK_SECRET_KEY=dev-secret-key
export LOG_LEVEL=DEBUG

# Ejecutar el gestor de suscripciones
python -m subscription_manager.app

# Ejecutar el suscriptor (en otra terminal)
export GLOBAL_BROKER_HOST=globalbroker.meteo.fr
python -m subscriber.manager

# Ejecutar el worker de Celery (en otra terminal)
celery -A task_manager.worker worker --loglevel=DEBUG

8.2. Build de Docker

# Construir todas las imágenes
docker-compose build

# Construir servicio específico
docker-compose build celery

# Reconstruir sin caché
docker-compose build --no-cache

9. Depuración

9.1. Ver tareas Celery

# Inspeccionar tareas activas
docker exec -it wis2downloader-celery-1 \
  celery -A task_manager.worker inspect active

# Inspeccionar tareas reservadas
docker exec -it wis2downloader-celery-1 \
  celery -A task_manager.worker inspect reserved

9.2. Inspección de Redis

# Conectar a Redis (usar contraseña del entorno)
docker exec -it redis redis-cli -a $REDIS_PASSWORD

# Listar suscripciones
HGETALL global:subscriptions

# Verificar longitud de la cola
LLEN celery

# Ver claves de deduplicación
KEYS wis2:notifications:*

# Ver caché del catálogo GDC (poblado por la interfaz al inicio)
KEYS gdc:cache:*
TTL gdc:cache:CMA
Todos los comandos Redis requieren autenticación. El indicador -a $REDIS_PASSWORD pasa la contraseña desde su entorno.

9.3. Depuración de MQTT

# Suscribirse a un tema manualmente (mosquitto-clients)
mosquitto_sub -h globalbroker.meteo.fr -p 443 \
  -t 'cache/a/wis2/de-dwd/data/#' \
  -u everyone -P everyone \
  --capath /etc/ssl/certs