1. Descripción general de la arquitectura
WIS2 Downloader es un sistema distribuido con cuatro componentes principales:
Docker Compose
┌──────────────────────────────────────────────────────────┐
│ │
│ Web UI (NiceGUI :8080) │
│ - reads GDC catalogue data from Redis cache │
│ - posts subscriptions to Subscription Manager API │
│ │ │
│ ▼ │
│ Subscription Manager (Flask API :5001) │
│ - persists subscriptions in Redis │
│ - publishes subscribe/unsubscribe commands via PubSub │
│ │ │
│ Redis PubSub (commands) │
│ │ │
│ ▼ │
│ Subscriber (MQTT Client) │
│ - connects to WIS2 Global Broker │
│ - on notification: queues Celery download task │
│ │ │
│ Redis (Celery queue + dedup state) │
│ │ │
│ ▼ │
│ Celery Workers │
│ - download file, verify hash, apply filters, save │
│ │
└──────────────────────────────────────────────────────────┘
1.1. Flujo de datos
-
Al inicio, la interfaz web obtiene registros WCMP2 desde los tres puntos de acceso GDC (o caché Redis) y construye una jerarquía de temas en memoria
-
El usuario explora la vista Catálogo o Árbol y selecciona un tema en la interfaz web
-
El usuario hace clic en Suscribirse → la interfaz web envía un POST de suscripción a la API REST del gestor de suscripciones
-
El gestor de suscripciones persiste la suscripción en Redis y publica un comando de suscripción en el canal PubSub de Redis
-
El suscriptor recibe el comando y se suscribe al tema MQTT en el bróker global WIS2
-
Cuando llega una notificación WIS2, el suscriptor pone en cola una tarea de descarga Celery
-
El worker Celery descarga el archivo, verifica el hash, aplica filtros y guarda en disco
2. Estructura de módulos
modules/
├── shared/ # Shared utilities
│ └── shared/
│ ├── __init__.py
│ ├── logging.py # Centralized logging
│ └── redis_client.py # Redis client singleton
│
├── subscriber/ # MQTT subscriber service
│ └── subscriber/
│ ├── __init__.py
│ ├── manager.py # Entry point, thread management
│ ├── subscriber.py # MQTT client wrapper
│ └── command_listener.py # Redis PubSub listener
│
├── subscription_manager/ # REST API service
│ └── subscription_manager/
│ ├── __init__.py
│ ├── app.py # Flask application
│ └── static/
│ └── openapi.yml # API specification
│
├── task_manager/ # Celery tasks
│ └── task_manager/
│ ├── __init__.py
│ ├── worker.py # Celery app configuration
│ ├── tasks/
│ │ └── wis2.py # Download tasks
│ └── workflows/
│ └── __init__.py # Task chains
│
└── ui/ # NiceGUI web interface (port 8080)
├── main.py # Entry point, page route, AppState
├── layout.py # PageLayout builder
├── config.py # Environment variable config
├── data.py # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│ └── base.css # All custom CSS
├── components/ # Reusable layout components
├── i18n/ # Internationalisation
│ ├── __init__.py # t(), current_lang(), is_rtl(), LANGUAGES
│ ├── en.py # English strings (source of truth)
│ ├── fr.py # French
│ ├── es.py # Spanish
│ ├── ar.py # Arabic (RTL)
│ ├── zh.py # Chinese
│ └── ru.py # Russian
├── models/
│ └── wcmp2.py # WCMP2Record dataclass
└── views/ # Page views (catalogue, tree, subscriptions, etc.)
3. Detalles de módulos
3.1. Módulo compartido
Proporciona utilidades comunes usadas en todos los servicios.
3.1.1. Cliente Redis
from shared import get_redis_client
# Obtener el cliente Redis (singleton)
redis = get_redis_client()
# Usar como cliente Redis normal
redis.set('key', 'value')
redis.get('key')
El cliente:
-
Se conecta directamente al servidor Redis
-
Almacena en caché la conexión (patrón singleton)
-
Incluye lógica de reintento para fallos transitorios
3.1.2. Registro
from shared import setup_logging
# Configurar el registrador raíz (llamar una vez al inicio)
setup_logging()
# Obtener un registrador específico del módulo
LOGGER = setup_logging(__name__)
LOGGER.info("Message with UTC timestamp")
Características:
-
Marcas de tiempo UTC en formato ISO 8601
-
Configurable mediante la variable de entorno
LOG_LEVEL -
Salida a stdout para recolección de registros Docker
3.2. Módulo suscriptor
Se conecta al bróker global WIS2 mediante MQTT y procesa las notificaciones entrantes.
3.2.1. Punto de entrada
def run_manager():
# 1. Create MQTT subscriber
mqtt_subscriber = Subscriber(**broker_config)
# 2. Create Redis command listener
redis_listener = CommandListener(
subscriber=mqtt_subscriber,
channel=COMMAND_CHANNEL
)
# 3. Start MQTT in separate thread
mqtt_thread = threading.Thread(target=mqtt_subscriber.start, daemon=True)
mqtt_thread.start()
# 4. Start Redis listener (blocks)
redis_listener.start()
3.2.2. Suscriptor MQTT
class Subscriber:
def __init__(self, host, port, uid, pwd, protocol, session):
# Configure MQTT client with callbacks
self.client = mqtt.Client(...)
self.client.on_message = self._on_message
def _on_message(self, client, userdata, msg):
# Parse notification, create Celery task
job = {
"topic": msg.topic,
"target": target,
"filters": filters,
"payload": json.loads(msg.payload)
}
workflow = wis2_download(job)
workflow.apply_async()
def subscribe(self, topic, target, filters):
self.client.subscribe(topic, qos=0)
self.active_subscriptions[topic] = {...}
def unsubscribe(self, topic):
self.client.unsubscribe(topic)
del self.active_subscriptions[topic]
3.2.3. Oyente de comandos
class CommandListener(threading.Thread):
def __init__(self, subscriber, channel):
self.subscriber = subscriber
self.pubsub = get_redis_client().pubsub()
def run(self):
self.pubsub.subscribe(self.channel)
while not self.stop_event.is_set():
message = self.pubsub.get_message()
if message:
self._process_command(message)
def _process_command(self, message):
command = json.loads(message['data'])
if command['action'] == 'subscribe':
self.subscriber.subscribe(
command['topic'],
command['save_path'],
command['filters']
)
elif command['action'] == 'unsubscribe':
self.subscriber.unsubscribe(command['topic'])
3.3. Módulo gestor de suscripciones
API REST Flask para gestionar suscripciones.
3.3.1. Aplicación Flask
@app.post('/subscriptions')
def add_subscription():
data = get_json()
topic = normalise_topic(data.get('topic'))
target = normalise_path(data.get('target', ''))
filters = data.get('filters', {})
# Publish command to subscriber via Redis PubSub
command = {
"action": "subscribe",
"topic": topic,
"save_path": target,
"filters": filters
}
publish_command(command, COMMAND_CHANNEL)
# Persist to Redis for durability
persist_subscription(topic, target, filters)
return jsonify({"status": "accepted", ...}), 202
3.3.2. Punto de acceso de métricas
Las métricas se almacenan en Redis mediante operaciones atómicas HINCRBYFLOAT, lo que las hace seguras en múltiples contenedores de workers Celery. El punto de acceso lee de Redis y genera el formato de texto Prometheus:
@app.route('/metrics')
def expose_metrics():
from shared.redis_metrics import generate_prometheus_text, set_gauge
set_gauge('celery_queue_length', {}, get_queue_length())
return Response(generate_prometheus_text(), mimetype='text/plain')
Incremente un contador desde cualquier worker:
from shared import incr_counter
incr_counter('downloads_total', {'cache': hostname, 'media_type': mime_type})
3.4. Módulo gestor de tareas
Workers Celery para descargar y procesar archivos.
3.4.1. Configuración de Celery
app = Celery('tasks',
broker=CELERY_BROKER_URL,
result_backend=CELERY_RESULT_BACKEND)
# Autodescubrimiento de tareas
app.autodiscover_tasks(['task_manager.tasks', 'task_manager.tasks.wis2'])
3.4.2. Tarea de descarga
@app.task(bind=True)
@metrics_collector
def download_from_wis2(self, job):
result = {...} # Initialize result dict
# 1. Extract identifiers
message_id = job['payload']['id']
data_id = job['payload']['properties']['data_id']
filehash = job['payload']['properties']['integrity']['value']
# 2. Deduplication check
for key, type in [(message_id, 'by-msg-id'), ...]:
if get_status(key, type) == STATUS_SUCCESS:
result['status'] = STATUS_SKIPPED
return result
# 3. Acquire lock
lock_acquired = redis_client.set(lock_key, 1, nx=True, ex=LOCK_EXPIRE)
if not lock_acquired:
raise self.retry(countdown=10, max_retries=10)
# 4. Download file
response = _pool.request('GET', download_url, ...)
# 5. Verify hash
if hash_expected and hash_base64 != hash_expected:
result['status'] = STATUS_FAILED
return result
# 6. Check media type filter
if not _is_allowed_media_type(file_type, filters):
result['status'] = STATUS_SKIPPED
return result
# 7. Save file
output_path.write_bytes(data)
result['status'] = STATUS_SUCCESS
return result
3.4.3. Cadenas de flujo de trabajo
def wis2_download(args):
# Chain: download -> decode/ingest
workflow = download_from_wis2.s(args) | decode_and_ingest.s()
return workflow
3.4.4. Planificador (tareas periódicas)
Una aplicación Celery separada (scheduler.py) gestiona las tareas de mantenimiento mediante Celery Beat. Utiliza las bases de datos Redis 2/3 para no interferir con las colas del worker de descarga (bases de datos 0/1). Las tareas periódicas se definen en tasks/scheduled_tasks.py:
| Tarea | Intervalo | Propósito |
|---|---|---|
|
Cada 5 min |
Establece los indicadores |
|
Cada 10 min |
Elimina archivos más antiguos que |
|
Diariamente |
|
El planificador se inicia como dos servicios Docker Compose: celery-scheduler-workers (ejecuta las tareas) y celery-beats (las activa según el calendario).
4. Módulo de interfaz
La interfaz es una aplicación web NiceGUI servida en el puerto 8080. Es la interfaz principal para que los usuarios descubran conjuntos de datos, exploren la jerarquía de temas WIS2 y creen suscripciones.
4.1. Estructura del módulo
modules/ui/
├── main.py # Application entry point, page route, AppState
├── layout.py # PageLayout builder
├── config.py # Environment variable config (SUBSCRIPTION_MANAGER_URL etc.)
├── data.py # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│ └── base.css # All custom CSS (inside @layer components)
├── components/
│ ├── header.py # WMO banner + toolbar + language selector
│ ├── footer.py # Footer bar
│ ├── navigation_drawer.py # Left nav drawer with view links
│ └── page_body.py # Main content area + right sidebar
├── i18n/ # Internationalisation
│ ├── __init__.py # t(), current_lang(), is_rtl(), LANGUAGES
│ ├── en.py # English strings (source of truth)
│ ├── fr.py # French
│ ├── es.py # Spanish
│ ├── ar.py # Arabic (RTL)
│ ├── zh.py # Chinese
│ └── ru.py # Russian
├── models/
│ └── wcmp2.py # WCMP2Record dataclass (GeoJSON Feature)
└── views/
├── dashboard.py # Grafana iframe embed
├── catalogue.py # Catalogue search + result cards
├── tree.py # Topic hierarchy tree
├── subscriptions.py # Active subscriptions list
├── settings.py # GDC status + refresh trigger
└── shared.py # Shared sidebar logic (on_topics_picked, confirm_subscribe, show_metadata)
4.2. Capa de datos (data.py)
La capa de datos es responsable de obtener registros WCMP2 de los tres catálogos de descubrimiento globales WIS2 (GDC), fusionarlos y construir dos cachés a nivel de módulo que leen todas las vistas.
4.2.1. Fuentes GDC
Los registros se obtienen de tres puntos de acceso GDC públicos al inicio:
| Nombre corto | URL |
|---|---|
|
|
|
|
|
Las respuestas JSON brutas se almacenan en caché en Redis bajo las claves gdc:cache:CMA, gdc:cache:DWD y gdc:cache:ECCC con un TTL controlado por GDC_CACHE_TTL_SECONDS (6 horas por defecto). Redis es opcional — si no está disponible, los datos se obtienen por HTTP en cada inicio.
4.2.2. Registros fusionados
Después de cargar todas las fuentes GDC, _build_merged_records() deduplica los registros por ID entre catálogos:
-
Los registros presentes en un solo catálogo aparecen una vez con una única entrada
source_gdcs. -
Los registros presentes en múltiples catálogos se fusionan:
source_gdcslista todos los catálogos contribuyentes. -
has_discrepancyse establece enTruesi las propiedades, geometría o enlaces difieren entre catálogos. -
Los enlaces se unen entre catálogos para que los datos de canal presentes en cualquier GDC se conserven en el registro fusionado.
El resultado se almacena en la lista _merged_records a nivel de módulo y se devuelve por merged_records().
4.2.3. Jerarquía de temas
_build_topic_hierarchy() itera sobre _merged_records e inserta el primer canal MQTT cache/ de cada registro en un dict anidado:
{
"cache": {
"children": {
"a": {
"children": {
"wis2": {
"children": {
"de-dwd": {
"children": {
...
"synop": {
"datasets": [<WCMP2Record>, ...]
}
}
}
}
}
}
}
}
}
}
Un nodo puede contener tanto "children" como "datasets" si un canal es prefijo de otro.
La jerarquía se almacena en _topic_hierarchy y se devuelve por topic_hierarchy(). La consume tree.py para construir el widget ui.tree, y get_datasets_for_channel() para resolver conjuntos de datos para un tema dado.
4.2.4. Funciones clave
| Función | Descripción |
|---|---|
|
Devuelve la lista de registros fusionados en caché |
|
Devuelve la jerarquía de temas en caché |
|
Elimina el |
|
Obtiene datos GDC (caché Redis primero, luego HTTP), reconstruye |
4.3. Modelo WCMP2 (models/wcmp2.py)
WCMP2Record es una representación dataclass de una Feature GeoJSON de metadatos de descubrimiento WIS2.
from models.wcmp2 import WCMP2Record
rec = WCMP2Record.from_dict(feature_dict)
rec.id # str - unique dataset identifier
rec.title # str | None
rec.description # str | None
rec.keywords # list[str]
rec.wmo_data_policy # 'core' | 'recommended' | None
rec.geometry # Geometry | None (.type, .coordinates)
rec.links # list[Link] (.channel, .href, .rel, .extra)
rec.mqtt_channels # list[str] - channels from all links
Link.extra es un dict que captura claves fuera del esquema de la respuesta GDC (ej. filters específicos del GDC usados para campos de filtro personalizados en el panel lateral de suscripción).
4.4. AppState
Cada sesión del navegador tiene su propia instancia AppState (definida en main.py):
class AppState:
def __init__(self):
self.selected_topics: list[str] = []
self.current_view: str = 'dashboard'
selected_topics contiene los temas MQTT actualmente seleccionados en la interfaz y controla el panel lateral derecho. current_view rastrea el nombre de la vista activa y se persiste en app.storage.user['current_view'] antes de cualquier recarga de página (ej. al cambiar de idioma) para que el usuario regrese a la misma vista.
4.5. Vistas
Todas las vistas siguen la misma convención: una función render(container, …) que construye los elementos NiceGUI dentro del elemento contenedor proporcionado. render siempre es una función simple (no asíncrona) para que pueda ser llamada sincrónicamente desde show_view() en main.py.
4.5.1. Vista de catálogo
views/catalogue.py proporciona búsqueda de texto completo en merged_records() con funciones de filtro puras:
| Función | Descripción |
|---|---|
|
Hace coincidir la consulta con ID, título, descripción, palabras clave y conceptos temáticos |
|
Hace coincidir |
|
Palabras clave separadas por comas; todas deben aparecer en el registro |
|
Intersección geométrica Shapely con el bounding box dado |
Las tarjetas de resultados son renderizadas por update_search_results(), que también gestiona la paginación.
4.5.2. Vista de árbol
views/tree.py convierte topic_hierarchy() en nodos ui.tree mediante _to_tree_nodes() (recursivo, ordenado alfabéticamente). La selección usa on_select para imponer la selección de un único nodo — on_tick no se usa porque entra en conflicto con la sincronización interna de estado de NiceGUI.
4.5.3. Panel lateral compartido (views/shared.py)
on_topics_picked(e, state, layout, is_page_selection, dataset_id) es el manejador central para la selección desde catálogo y árbol. Él:
-
Actualiza
state.selected_topics -
Construye el panel lateral derecho con entrada de directorio de guardado, filtros de conjuntos de datos/tipos de medios/bbox/fecha y filtros personalizados opcionales
-
Conecta el botón Suscribirse a
confirm_subscribe()
confirm_subscribe() muestra un diálogo con la carga útil JSON completa antes de llamar a subscribe_to_topics().
show_metadata(dataset_id) busca el registro en merged_records() y renderiza un diálogo de detalles con un mapa Leaflet interactivo.
4.6. Agregar una nueva vista
-
Cree
modules/ui/views/myview.pycon una funciónrender(container), usandot()para todas las cadenas visibles al usuario -
Agregue una entrada de navegación en
components/navigation_drawer.py— la listaNAV_ITEMSacepta una tupla(view_id, label_key, icon)dondelabel_keyes una clave i18nnav.* -
Agregue una rama en el despachador
show_view()enmain.py -
Agregue cualquier nueva clase CSS en
assets/base.cssdentro del bloque@layer components { … } -
Agregue todas las nuevas claves de cadenas en
i18n/en.pyy en cada otro archivo de idioma — ver Internacionalización (i18n)
4.7. Internacionalización (i18n)
La interfaz soporta múltiples idiomas mediante un sistema i18n ligero basado en diccionarios en modules/ui/i18n/.
4.7.1. Idiomas compatibles
| Código | Idioma | Dirección |
|---|---|---|
|
English |
LTR |
|
Français |
LTR |
|
Español |
LTR |
|
العربية |
RTL |
|
中文 |
LTR |
|
Русский |
LTR |
El idioma activo se almacena por sesión de navegador en app.storage.user['lang'] y por defecto es inglés. Los usuarios cambian de idioma mediante el selector en la barra de herramientas del encabezado; la página se recarga para aplicar el cambio.
4.7.2. Cómo funciona t()
from i18n import t
# Búsqueda simple
label = t('nav.dashboard') # → 'Dashboard'
# Con interpolación (usa str.format)
label = t('subscriptions.folder', path='./') # → 'Folder: ./'
La cadena de búsqueda es:
-
Archivo del idioma elegido
-
Inglés (
en.py) — alternativa si la clave falta en el idioma elegido -
La cadena de la clave misma — alternativa si falta del inglés también (visible en desarrollo)
t() lee app.storage.user['lang'] de la sesión de solicitud NiceGUI actual. Siempre llámelo en tiempo de renderizado (dentro de un manejador @ui.page o un callback de evento NiceGUI), nunca en tiempo de importación del módulo.
|
Las cadenas que contienen caracteres { o } literales (por ejemplo, ejemplos JSON en texto de ayuda) deben usar llaves simples tal cual. No use el escape {{ / }} — t() solo llama a .format(**kwargs) cuando se pasan argumentos de palabra clave, por lo que {{ se mostraría literalmente.
|
4.7.3. Convenciones de nomenclatura de claves
Las claves usan espacios de nombres separados por puntos:
| Prefijo | Alcance | Ejemplo |
|---|---|---|
|
Etiquetas del cajón de navegación |
|
|
Etiquetas de botones |
|
|
Panel lateral de suscripción |
|
|
Vista de catálogo |
|
|
Vista de árbol |
|
|
Vista Gestionar suscripciones |
|
|
Vista Configuración |
|
|
Vista Suscripción manual |
|
|
Mensajes de validación Suscripción manual |
|
|
Títulos de diálogo |
|
|
Diálogo de metadatos |
|
|
Mensajes de validación compartidos |
|
|
Pie de página |
|
|
Etiquetas de accesibilidad ARIA |
|
4.7.4. Compatibilidad con RTL
El árabe (ar) es de derecha a izquierda. Cuando se selecciona árabe, is_rtl() devuelve True y el manejador on_connect() de main.py inyecta dir="rtl" en el elemento <html> mediante JavaScript, activando el modo de diseño RTL de Quasar. Las anulaciones CSS personalizadas en assets/base.css (bajo el bloque de comentarios RTL overrides) corrigen el relleno físico de Quasar en .q-page-container y reposicionan el panel lateral de suscripción.
Para agregar otro idioma RTL, añada su código a RTL_LANGUAGES en init.py — no se necesitan otros cambios.
4.7.5. Agregar un nuevo idioma
Copie modules/ui/i18n/en.py a modules/ui/i18n/{code}.py y traduzca todos los valores. Conserve cada clave; no elimine ninguna. Preserve los nombres {placeholder} exactamente como aparecen en la fuente en inglés:
# modules/ui/i18n/de.py
"""German strings."""
STRINGS: dict[str, str] = {
'nav.dashboard': 'Dashboard',
'nav.catalogue': 'Katalogsuche',
'nav.tree': 'Baumsuche',
'nav.manual': 'Manuell abonnieren',
'nav.manage': 'Abonnements verwalten',
'nav.settings': 'Einstellungen',
# ... all remaining keys ...
'subscriptions.folder': 'Ordner: {path}', # {path} must be preserved
'settings.records': '{name}: {count} Einträge',
}
init.pyfrom . import ar, de, en, es, fr, ru, zh (1)
LANGUAGES: dict[str, str] = {
'en': 'English',
'fr': 'Français',
'es': 'Español',
'ar': 'العربية',
'zh': '中文',
'ru': 'Русский',
'de': 'Deutsch', (2)
}
_STRINGS: dict[str, dict[str, str]] = {
'en': en.STRINGS,
'fr': fr.STRINGS,
'es': es.STRINGS,
'ar': ar.STRINGS,
'zh': zh.STRINGS,
'ru': ru.STRINGS,
'de': de.STRINGS, (3)
}
| 1 | Agregar el import |
| 2 | Agregar el nombre de visualización (en el script nativo) |
| 3 | Agregar el mapeo de cadenas |
Si el idioma es de derecha a izquierda (ej. hebreo he, persa fa):
RTL_LANGUAGES: frozenset[str] = frozenset({'ar', 'he'})
docker-compose restart wis2downloader-ui
El nuevo idioma aparecerá inmediatamente en el selector de idioma del encabezado.
| Las traducciones no inglesas existentes son generadas automáticamente y se proporcionan solo como punto de partida. Todas las cadenas deben ser revisadas por un hablante nativo antes del despliegue en producción, con especial atención a los términos del dominio WMO/meteorológico (WIS2, BUFR, GRIB, caché global, etc.) que tienen traducciones establecidas en los documentos oficiales de WMO. |
4.7.6. Agregar nuevas cadenas traducibles
Al agregar texto de interfaz a cualquier vista o componente:
en.py (fuente de verdad)# modules/ui/i18n/en.py
'myview.title': 'My New View',
'myview.description': 'Showing results for {topic}.',
Copie el valor en inglés como marcador de posición si una traducción no está disponible aún. t() recurre al inglés automáticamente, pero tener la clave presente evita vacíos en las herramientas:
# fr.py / es.py / ar.py / zh.py / ru.py
'myview.title': 'My New View', # TODO: translate
'myview.description': 'Showing results for {topic}.',
t() en la vistafrom i18n import t
def render(container):
with container:
ui.label(t('myview.title')).classes('page-title')
ui.label(t('myview.description', topic=selected_topic))
5. Comunicación entre servicios
5.1. Redis PubSub (Comandos)
La comunicación gestor de suscripciones → suscriptor usa Redis PubSub en el canal subscription_commands:
// Channel: subscription_commands
// Subscribe command
{
"action": "subscribe",
"topic": "cache/a/wis2/+/data/#",
"save_path": "all-data",
"filters": {"accepted_media_types": ["application/bufr"]}
}
// Unsubscribe command
{
"action": "unsubscribe",
"topic": "cache/a/wis2/+/data/#"
}
5.2. Tareas Celery (Descargas)
La comunicación suscriptor → worker Celery usa la cola de tareas Celery:
job = {
"topic": "cache/a/wis2/de-dwd/data/...",
"target": "dwd-data",
"filters": {"accepted_media_types": [...]},
"_broker": "globalbroker.meteo.fr",
"_received": "2026-01-28 10:15:30",
"_queued": "2026-01-28 10:15:30",
"payload": {
"id": "...",
"properties": {
"data_id": "...",
"metadata_id": "...",
"integrity": {"method": "sha512", "value": "..."}
},
"links": [
{"rel": "canonical", "href": "https://..."}
]
}
}
5.3. Claves Redis
| Patrón de clave | Propósito |
|---|---|
|
Hash de todas las suscripciones (sub_id → JSON |
|
Seguimiento de deduplicación (tipos: |
|
Bloqueo distribuido que previene descargas concurrentes duplicadas |
|
JSON de catálogo GDC en caché (CMA, DWD, ECCC); TTL definido por |
|
Hash de métricas Prometheus (campo = dict JSON de etiquetas, valor = contador/indicador flotante) |
|
Latido de estado del suscriptor |
|
Cola de tareas Celery (predeterminada) |
6. Extender el sistema
6.1. Agregar una nueva condición de coincidencia al motor de filtros
El motor de filtros se encuentra en modules/shared/shared/filters.py. Cada condición de coincidencia es una clave en el objeto de coincidencia despachada por _evaluate_match().
Para agregar una nueva condición integrada (por ejemplo, coincidencia en un nuevo campo de metadatos station_id):
-
Agregue el campo a
MatchContextenfilters.py:@dataclass class MatchContext: ... station_id: str | None = None -
Rellénelo en
_build_context()enwis2.py(antes y/o después de la descarga). -
Agregue una rama en
_evaluate_match():if 'station_id' in match: return _match_string_field(ctx.station_id, match['station_id']) -
Documente el nuevo campo en
openapi.ymly en la referencia de filtros de la guía del usuario.
No es necesario realizar cambios en app.py, command_listener.py ni subscriber.py — el objeto de filtro se transmite sin modificaciones y se evalúa en el momento de la descarga.
6.2. Agregar una nueva tarea
-
Cree la tarea en
modules/task_manager/task_manager/tasks/ -
Registre en autodiscover de
worker.py -
Agregue al flujo de trabajo si es necesario
# modules/task_manager/task_manager/tasks/custom.py
from task_manager.worker import app
@app.task
def my_custom_task(data):
# Process data
return result
6.3. Agregar una nueva métrica
Las métricas se almacenan en Redis a través de shared.redis_metrics. Para agregar una nueva métrica:
-
Regístrela en el dict
METRICSenmodules/shared/shared/redis_metrics.py:# short name → (prometheus type, help text) METRICS: dict[str, tuple[str, str]] = { ... 'my_counter_total': ('counter', 'Description of the counter.'), 'my_gauge': ('gauge', 'Description of the gauge.'), } -
Increméntela desde cualquier servicio (contador) o establezca su valor (indicador):
from shared import incr_counter, set_gauge # Contador (por ej. en una tarea Celery) incr_counter('my_counter_total', {'label1': 'value', 'label2': 'value'}) # Medidor (por ej. en una tarea programada) set_gauge('my_gauge', {'label1': 'value'}, 42.0)
Solo las métricas registradas en METRICS aparecen en la salida de /metrics.
7. Pruebas
7.1. Ejecutar pruebas
# Instalar dependencias de prueba
pip install pytest pytest-cov
# Ejecutar pruebas
pytest modules/
# Con cobertura
pytest --cov=modules modules/
7.2. Pruebas manuales
# Iniciar servicios
docker-compose up -d
# Crear suscripción de prueba
curl -X POST http://localhost:5002/subscriptions \
-H "Content-Type: application/json" \
-d '{"topic": "cache/a/wis2/de-dwd/data/#", "target": "test"}'
# Monitorizar registros
docker-compose logs -f subscriber celery
# Verificar descargas
ls -la downloads/test/
8. Configuración del entorno de desarrollo
8.1. Desarrollo local
# Crear entorno virtual
python -m venv venv
source venv/bin/activate
# Instalar módulos en modo editable
pip install -e modules/shared
pip install -e modules/task_manager
pip install -e modules/subscriber
pip install -e modules/subscription_manager
# Iniciar Redis (instancia única para desarrollo)
docker run -d -p 6379:6379 redis:7.2-alpine redis-server --requirepass devpassword
# Configurar entorno
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=devpassword
export FLASK_SECRET_KEY=dev-secret-key
export LOG_LEVEL=DEBUG
# Ejecutar el gestor de suscripciones
python -m subscription_manager.app
# Ejecutar el suscriptor (en otra terminal)
export GLOBAL_BROKER_HOST=globalbroker.meteo.fr
python -m subscriber.manager
# Ejecutar el worker de Celery (en otra terminal)
celery -A task_manager.worker worker --loglevel=DEBUG
8.2. Build de Docker
# Construir todas las imágenes
docker-compose build
# Construir servicio específico
docker-compose build celery
# Reconstruir sin caché
docker-compose build --no-cache
9. Depuración
9.1. Ver tareas Celery
# Inspeccionar tareas activas
docker exec -it wis2downloader-celery-1 \
celery -A task_manager.worker inspect active
# Inspeccionar tareas reservadas
docker exec -it wis2downloader-celery-1 \
celery -A task_manager.worker inspect reserved
9.2. Inspección de Redis
# Conectar a Redis (usar contraseña del entorno)
docker exec -it redis redis-cli -a $REDIS_PASSWORD
# Listar suscripciones
HGETALL global:subscriptions
# Verificar longitud de la cola
LLEN celery
# Ver claves de deduplicación
KEYS wis2:notifications:*
# Ver caché del catálogo GDC (poblado por la interfaz al inicio)
KEYS gdc:cache:*
TTL gdc:cache:CMA
Todos los comandos Redis requieren autenticación. El indicador -a $REDIS_PASSWORD pasa la contraseña desde su entorno.
|
9.3. Depuración de MQTT
# Suscribirse a un tema manualmente (mosquitto-clients)
mosquitto_sub -h globalbroker.meteo.fr -p 443 \
-t 'cache/a/wis2/de-dwd/data/#' \
-u everyone -P everyone \
--capath /etc/ssl/certs