|
Ce document a été traduit automatiquement depuis l’anglais par Claude AI. Les termes du domaine WMO/météorologique doivent être vérifiés par un locuteur natif avant toute utilisation en production. Consultez le lien:../en/[document original en anglais] pour la version faisant autorité. |
1. Aperçu de l’architecture
WIS2 Downloader est un système distribué composé de quatre composants principaux :
Docker Compose
┌──────────────────────────────────────────────────────────┐
│ │
│ Web UI (NiceGUI :8080) │
│ - reads GDC catalogue data from Redis cache │
│ - posts subscriptions to Subscription Manager API │
│ │ │
│ ▼ │
│ Subscription Manager (Flask API :5001) │
│ - persists subscriptions in Redis │
│ - publishes subscribe/unsubscribe commands via PubSub │
│ │ │
│ Redis PubSub (commands) │
│ │ │
│ ▼ │
│ Subscriber (MQTT Client) │
│ - connects to WIS2 Global Broker │
│ - on notification: queues Celery download task │
│ │ │
│ Redis (Celery queue + dedup state) │
│ │ │
│ ▼ │
│ Celery Workers │
│ - download file, verify hash, apply filters, save │
│ │
└──────────────────────────────────────────────────────────┘
1.1. Flux de données
-
Au démarrage, l’interface web récupère les enregistrements WCMP2 depuis les trois points de terminaison GDC (ou le cache Redis) et construit une hiérarchie de rubriques en mémoire
-
L’utilisateur parcourt la vue Catalogue ou Arborescence et sélectionne une rubrique dans l’interface web
-
L’utilisateur clique sur S’abonner → l’interface web envoie un POST d’abonnement à l’API REST du gestionnaire d’abonnements
-
Le gestionnaire d’abonnements persiste l’abonnement dans Redis et publie une commande d’abonnement sur le canal PubSub Redis
-
L’abonné reçoit la commande et s’abonne à la rubrique MQTT sur le courtier global WIS2
-
Lorsqu’une notification WIS2 arrive, l’abonné met en file d’attente une tâche de téléchargement Celery
-
Le worker Celery télécharge le fichier, vérifie le hash, applique les filtres et sauvegarde sur disque
2. Structure des modules
modules/
├── shared/ # Shared utilities
│ └── shared/
│ ├── __init__.py
│ ├── logging.py # Centralized logging
│ └── redis_client.py # Redis client singleton
│
├── subscriber/ # MQTT subscriber service
│ └── subscriber/
│ ├── __init__.py
│ ├── manager.py # Entry point, thread management
│ ├── subscriber.py # MQTT client wrapper
│ └── command_listener.py # Redis PubSub listener
│
├── subscription_manager/ # REST API service
│ └── subscription_manager/
│ ├── __init__.py
│ ├── app.py # Flask application
│ └── static/
│ └── openapi.yml # API specification
│
├── task_manager/ # Celery tasks
│ └── task_manager/
│ ├── __init__.py
│ ├── worker.py # Celery app configuration
│ ├── tasks/
│ │ └── wis2.py # Download tasks
│ └── workflows/
│ └── __init__.py # Task chains
│
└── ui/ # NiceGUI web interface (port 8080)
├── main.py # Entry point, page route, AppState
├── layout.py # PageLayout builder
├── config.py # Environment variable config
├── data.py # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│ └── base.css # All custom CSS
├── components/ # Reusable layout components
├── i18n/ # Internationalisation
│ ├── __init__.py # t(), current_lang(), is_rtl(), LANGUAGES
│ ├── en.py # English strings (source of truth)
│ ├── fr.py # French
│ ├── es.py # Spanish
│ ├── ar.py # Arabic (RTL)
│ ├── zh.py # Chinese
│ └── ru.py # Russian
├── models/
│ └── wcmp2.py # WCMP2Record dataclass
└── views/ # Page views (catalogue, tree, subscriptions, etc.)
3. Détails des modules
3.1. Module partagé
Fournit des utilitaires communs utilisés dans tous les services.
3.1.1. Client Redis
from shared import get_redis_client
# Obtenir le client Redis (singleton)
redis = get_redis_client()
# Utiliser comme un client Redis normal
redis.set('key', 'value')
redis.get('key')
Le client :
-
Se connecte directement au serveur Redis
-
Met en cache la connexion (modèle singleton)
-
Inclut une logique de réessai pour les échecs transitoires
3.1.2. Journalisation
from shared import setup_logging
# Configurer le logger racine (appeler une fois au démarrage)
setup_logging()
# Obtenir un logger spécifique au module
LOGGER = setup_logging(__name__)
LOGGER.info("Message with UTC timestamp")
Fonctionnalités :
-
Horodatages UTC au format ISO 8601
-
Configurable via la variable d’environnement
LOG_LEVEL -
Sortie vers stdout pour la collecte des journaux Docker
3.2. Module abonné
Se connecte au courtier global WIS2 via MQTT et traite les notifications entrantes.
3.2.1. Point d’entrée
def run_manager():
# 1. Create MQTT subscriber
mqtt_subscriber = Subscriber(**broker_config)
# 2. Create Redis command listener
redis_listener = CommandListener(
subscriber=mqtt_subscriber,
channel=COMMAND_CHANNEL
)
# 3. Start MQTT in separate thread
mqtt_thread = threading.Thread(target=mqtt_subscriber.start, daemon=True)
mqtt_thread.start()
# 4. Start Redis listener (blocks)
redis_listener.start()
3.2.2. Abonné MQTT
class Subscriber:
def __init__(self, host, port, uid, pwd, protocol, session):
# Configure MQTT client with callbacks
self.client = mqtt.Client(...)
self.client.on_message = self._on_message
def _on_message(self, client, userdata, msg):
# Parse notification, create Celery task
job = {
"topic": msg.topic,
"target": target,
"filters": filters,
"payload": json.loads(msg.payload)
}
workflow = wis2_download(job)
workflow.apply_async()
def subscribe(self, topic, target, filters):
self.client.subscribe(topic, qos=0)
self.active_subscriptions[topic] = {...}
def unsubscribe(self, topic):
self.client.unsubscribe(topic)
del self.active_subscriptions[topic]
3.2.3. Écouteur de commandes
class CommandListener(threading.Thread):
def __init__(self, subscriber, channel):
self.subscriber = subscriber
self.pubsub = get_redis_client().pubsub()
def run(self):
self.pubsub.subscribe(self.channel)
while not self.stop_event.is_set():
message = self.pubsub.get_message()
if message:
self._process_command(message)
def _process_command(self, message):
command = json.loads(message['data'])
if command['action'] == 'subscribe':
self.subscriber.subscribe(
command['topic'],
command['save_path'],
command['filters']
)
elif command['action'] == 'unsubscribe':
self.subscriber.unsubscribe(command['topic'])
3.3. Module gestionnaire d’abonnements
API REST Flask pour gérer les abonnements.
3.3.1. Application Flask
@app.post('/subscriptions')
def add_subscription():
data = get_json()
topic = normalise_topic(data.get('topic'))
target = normalise_path(data.get('target', ''))
filters = data.get('filters', {})
# Publish command to subscriber via Redis PubSub
command = {
"action": "subscribe",
"topic": topic,
"save_path": target,
"filters": filters
}
publish_command(command, COMMAND_CHANNEL)
# Persist to Redis for durability
persist_subscription(topic, target, filters)
return jsonify({"status": "accepted", ...}), 202
3.3.2. Point de terminaison des métriques
Les métriques sont stockées dans Redis via des opérations HINCRBYFLOAT atomiques, ce qui les rend sûres pour plusieurs conteneurs de workers Celery. Le point de terminaison lit depuis Redis et génère le format texte Prometheus :
@app.route('/metrics')
def expose_metrics():
from shared.redis_metrics import generate_prometheus_text, set_gauge
set_gauge('celery_queue_length', {}, get_queue_length())
return Response(generate_prometheus_text(), mimetype='text/plain')
Incrémenter un compteur depuis n’importe quel worker :
from shared import incr_counter
incr_counter('downloads_total', {'cache': hostname, 'media_type': mime_type})
3.4. Module gestionnaire de tâches
Workers Celery pour télécharger et traiter les fichiers.
3.4.1. Configuration Celery
app = Celery('tasks',
broker=CELERY_BROKER_URL,
result_backend=CELERY_RESULT_BACKEND)
# Découverte automatique des tâches
app.autodiscover_tasks(['task_manager.tasks', 'task_manager.tasks.wis2'])
3.4.2. Tâche de téléchargement
@app.task(bind=True)
@metrics_collector
def download_from_wis2(self, job):
result = {...} # Initialize result dict
# 1. Extract identifiers
message_id = job['payload']['id']
data_id = job['payload']['properties']['data_id']
filehash = job['payload']['properties']['integrity']['value']
# 2. Deduplication check
for key, type in [(message_id, 'by-msg-id'), ...]:
if get_status(key, type) == STATUS_SUCCESS:
result['status'] = STATUS_SKIPPED
return result
# 3. Acquire lock
lock_acquired = redis_client.set(lock_key, 1, nx=True, ex=LOCK_EXPIRE)
if not lock_acquired:
raise self.retry(countdown=10, max_retries=10)
# 4. Download file
response = _pool.request('GET', download_url, ...)
# 5. Verify hash
if hash_expected and hash_base64 != hash_expected:
result['status'] = STATUS_FAILED
return result
# 6. Check media type filter
if not _is_allowed_media_type(file_type, filters):
result['status'] = STATUS_SKIPPED
return result
# 7. Save file
output_path.write_bytes(data)
result['status'] = STATUS_SUCCESS
return result
3.4.3. Chaînes de flux de travail
def wis2_download(args):
# Chain: download -> decode/ingest
workflow = download_from_wis2.s(args) | decode_and_ingest.s()
return workflow
3.4.4. Planificateur (tâches périodiques)
Une application Celery distincte (scheduler.py) gère les tâches de maintenance via Celery Beat. Elle utilise les bases de données Redis 2/3 pour éviter d’interférer avec les files d’attente du worker de téléchargement (bases de données 0/1). Les tâches périodiques sont définies dans tasks/scheduled_tasks.py :
| Tâche | Intervalle | Objectif |
|---|---|---|
|
Toutes les 5 min |
Définit les jauges |
|
Toutes les 10 min |
Supprime les fichiers plus anciens que |
|
Quotidiennement |
Parcours complet via |
Le planificateur est démarré en tant que deux services Docker Compose : celery-scheduler-workers (exécute les tâches) et celery-beats (les déclenche selon le calendrier).
4. Module d’interface
L’interface est une application web NiceGUI servie sur le port 8080. C’est l’interface principale destinée aux utilisateurs pour découvrir des jeux de données, parcourir la hiérarchie des rubriques WIS2 et créer des abonnements.
4.1. Structure du module
modules/ui/
├── main.py # Application entry point, page route, AppState
├── layout.py # PageLayout builder
├── config.py # Environment variable config (SUBSCRIPTION_MANAGER_URL etc.)
├── data.py # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│ └── base.css # All custom CSS (inside @layer components)
├── components/
│ ├── header.py # WMO banner + toolbar + language selector
│ ├── footer.py # Footer bar
│ ├── navigation_drawer.py # Left nav drawer with view links
│ └── page_body.py # Main content area + right sidebar
├── i18n/ # Internationalisation
│ ├── __init__.py # t(), current_lang(), is_rtl(), LANGUAGES
│ ├── en.py # English strings (source of truth)
│ ├── fr.py # French
│ ├── es.py # Spanish
│ ├── ar.py # Arabic (RTL)
│ ├── zh.py # Chinese
│ └── ru.py # Russian
├── models/
│ └── wcmp2.py # WCMP2Record dataclass (GeoJSON Feature)
└── views/
├── dashboard.py # Grafana iframe embed
├── catalogue.py # Catalogue search + result cards
├── tree.py # Topic hierarchy tree
├── subscriptions.py # Active subscriptions list
├── settings.py # GDC status + refresh trigger
└── shared.py # Shared sidebar logic (on_topics_picked, confirm_subscribe, show_metadata)
4.2. Couche de données (data.py)
La couche de données est responsable de la récupération des enregistrements WCMP2 depuis les trois catalogues de découverte mondiaux WIS2 (GDC), de leur fusion et de la construction de deux caches au niveau module que toutes les vues lisent.
4.2.1. Sources GDC
Les enregistrements sont récupérés depuis trois points de terminaison GDC publics au démarrage :
| Nom court | URL |
|---|---|
|
|
|
|
|
Les réponses JSON brutes sont mises en cache dans Redis sous les clés gdc:cache:CMA, gdc:cache:DWD et gdc:cache:ECCC avec un TTL contrôlé par GDC_CACHE_TTL_SECONDS (6 heures par défaut). Redis est optionnel — s’il n’est pas disponible, les données sont récupérées via HTTP à chaque démarrage.
4.2.2. Enregistrements fusionnés
Après le chargement de toutes les sources GDC, _build_merged_records() déduplique les enregistrements par ID entre les catalogues :
-
Les enregistrements présents dans un seul catalogue apparaissent une fois avec une seule entrée
source_gdcs. -
Les enregistrements présents dans plusieurs catalogues sont fusionnés :
source_gdcsliste tous les catalogues contributeurs. -
has_discrepancyest défini àTruesi les propriétés, la géométrie ou les liens diffèrent entre les catalogues. -
Les liens sont unionisés entre les catalogues afin que les données de canal présentes dans l’un quelconque des GDC soient conservées dans l’enregistrement fusionné.
Le résultat est stocké dans la liste _merged_records au niveau module et retourné par merged_records().
4.2.3. Hiérarchie des rubriques
_build_topic_hierarchy() itère sur _merged_records et insère le premier canal MQTT cache/ de chaque enregistrement dans un dict imbriqué :
{
"cache": {
"children": {
"a": {
"children": {
"wis2": {
"children": {
"de-dwd": {
"children": {
...
"synop": {
"datasets": [<WCMP2Record>, ...]
}
}
}
}
}
}
}
}
}
}
Un nœud peut contenir à la fois "children" et "datasets" si un canal est un préfixe d’un autre.
La hiérarchie est stockée dans _topic_hierarchy et retournée par topic_hierarchy(). Elle est consommée par tree.py pour construire le widget ui.tree, et par get_datasets_for_channel() pour résoudre les jeux de données pour une rubrique donnée.
4.2.4. Fonctions clés
| Fonction | Description |
|---|---|
|
Retourne la liste d’enregistrements fusionnés mise en cache |
|
Retourne la hiérarchie de rubriques mise en cache |
|
Supprime le |
|
Récupère les données GDC (cache Redis en premier, puis HTTP), reconstruit |
4.3. Modèle WCMP2 (models/wcmp2.py)
WCMP2Record est une représentation dataclass d’une Feature GeoJSON de métadonnées de découverte WIS2.
from models.wcmp2 import WCMP2Record
rec = WCMP2Record.from_dict(feature_dict)
rec.id # str - unique dataset identifier
rec.title # str | None
rec.description # str | None
rec.keywords # list[str]
rec.wmo_data_policy # 'core' | 'recommended' | None
rec.geometry # Geometry | None (.type, .coordinates)
rec.links # list[Link] (.channel, .href, .rel, .extra)
rec.mqtt_channels # list[str] - channels from all links
Link.extra est un dict capturant les clés hors-schéma de la réponse GDC (ex. filters spécifiques au GDC utilisés pour les champs de filtre personnalisés dans le panneau d’abonnement).
4.4. AppState
Chaque session de navigateur possède sa propre instance AppState (définie dans main.py) :
class AppState:
def __init__(self):
self.selected_topics: list[str] = []
self.current_view: str = 'dashboard'
selected_topics contient les rubriques MQTT actuellement sélectionnées dans l’interface et pilote le panneau latéral droit. current_view suit le nom de la vue active et est persisté dans app.storage.user['current_view'] avant tout rechargement de page (ex. lors d’un changement de langue) pour que l’utilisateur soit ramené à la même vue.
4.5. Vues
Toutes les vues suivent la même convention : une fonction render(container, …) qui construit les éléments NiceGUI à l’intérieur de l’élément conteneur fourni. render est toujours une fonction simple (non asynchrone) afin de pouvoir être appelée de manière synchrone depuis show_view() dans main.py.
4.5.1. Vue catalogue
views/catalogue.py fournit une recherche en texte intégral dans merged_records() avec des fonctions de filtre pures :
| Fonction | Description |
|---|---|
|
Fait correspondre la requête avec l’ID, le titre, la description, les mots-clés et les concepts thématiques |
|
Fait correspondre |
|
Mots-clés séparés par des virgules ; tous doivent apparaître dans l’enregistrement |
|
Intersection géométrique Shapely avec le bounding box donné |
Les cartes de résultats sont rendues par update_search_results(), qui gère également la pagination.
4.5.2. Vue arborescence
views/tree.py convertit topic_hierarchy() en nœuds ui.tree via _to_tree_nodes() (récursif, trié alphabétiquement). La sélection utilise on_select pour imposer la sélection d’un nœud unique — on_tick n’est pas utilisé car il entre en conflit avec la synchronisation d’état interne de NiceGUI.
4.5.3. Panneau latéral partagé (views/shared.py)
on_topics_picked(e, state, layout, is_page_selection, dataset_id) est le gestionnaire central pour la sélection depuis le catalogue et l’arborescence. Il :
-
Met à jour
state.selected_topics -
Construit le panneau latéral droit avec la saisie du répertoire de sauvegarde, les filtres jeux de données/types de médias/bbox/date et les filtres personnalisés optionnels
-
Câble le bouton S’abonner à
confirm_subscribe()
confirm_subscribe() affiche un dialogue avec la charge utile JSON complète avant d’appeler subscribe_to_topics().
show_metadata(dataset_id) recherche l’enregistrement dans merged_records() et affiche un dialogue de détails avec une carte Leaflet interactive.
4.6. Ajout d’une nouvelle vue
-
Créez
modules/ui/views/myview.pyavec une fonctionrender(container), en utilisantt()pour toutes les chaînes visibles par l’utilisateur -
Ajoutez une entrée de navigation dans
components/navigation_drawer.py— la listeNAV_ITEMSprend un tuple(view_id, label_key, icon)oùlabel_keyest une clé i18nnav.* -
Ajoutez une branche dans le répartiteur
show_view()dansmain.py -
Ajoutez toutes les nouvelles classes CSS dans
assets/base.cssà l’intérieur du bloc@layer components { … } -
Ajoutez toutes les nouvelles clés de chaînes dans
i18n/en.pyet dans chaque autre fichier de langue — voir Internationalisation (i18n)
4.7. Internationalisation (i18n)
L’interface prend en charge plusieurs langues via un système i18n léger basé sur des dictionnaires dans modules/ui/i18n/.
4.7.1. Langues prises en charge
| Code | Langue | Direction |
|---|---|---|
|
English |
LTR |
|
Français |
LTR |
|
Español |
LTR |
|
العربية |
RTL |
|
中文 |
LTR |
|
Русский |
LTR |
La langue active est stockée par session de navigateur dans app.storage.user['lang'] et par défaut en anglais. Les utilisateurs changent de langue via le sélecteur dans la barre d’outils de l’en-tête ; la page se recharge pour appliquer le changement.
4.7.2. Fonctionnement de t()
from i18n import t
# Recherche simple
label = t('nav.dashboard') # → 'Dashboard'
# Avec interpolation (utilise str.format)
label = t('subscriptions.folder', path='./') # → 'Folder: ./'
La chaîne de recherche est :
-
Fichier de langue choisi
-
Anglais (
en.py) — repli si la clé est absente dans la langue choisie -
La chaîne de clé elle-même — repli si absente en anglais aussi (visible en développement)
t() lit app.storage.user['lang'] depuis la session de requête NiceGUI courante. Appelez-le toujours au moment du rendu (à l’intérieur d’un gestionnaire @ui.page ou d’un callback d’événement NiceGUI), jamais au moment de l’import du module.
|
Les chaînes qui contiennent des accolades { ou } littérales (par exemple, des exemples JSON dans le texte d’aide) doivent utiliser des accolades simples telles quelles. N’utilisez pas l’échappement {{ / }} — t() n’appelle .format(**kwargs) que lorsque des arguments nommés sont passés, donc {{ serait affiché littéralement.
|
4.7.3. Conventions de nommage des clés
Les clés utilisent des espaces de noms séparés par des points :
| Préfixe | Portée | Exemple |
|---|---|---|
|
Étiquettes du tiroir de navigation |
|
|
Étiquettes des boutons |
|
|
Panneau latéral d’abonnement |
|
|
Vue catalogue |
|
|
Vue arborescence |
|
|
Vue Gérer les abonnements |
|
|
Vue Paramètres |
|
|
Vue Abonnement manuel |
|
|
Messages de validation Abonnement manuel |
|
|
Titres de dialogue |
|
|
Dialogue de métadonnées |
|
|
Messages de validation partagés |
|
|
Pied de page |
|
|
Étiquettes d’accessibilité ARIA |
|
4.7.4. Prise en charge RTL
L’arabe (ar) est de droite à gauche. Lorsque l’arabe est sélectionné, is_rtl() retourne True et le gestionnaire on_connect() de main.py injecte dir="rtl" sur l’élément <html> via JavaScript, activant le mode de mise en page RTL de Quasar. Les remplacements CSS personnalisés dans assets/base.css (sous le bloc de commentaires RTL overrides) corrigent le rembourrage physique de Quasar sur .q-page-container et repositionnent le panneau latéral d’abonnement.
Pour ajouter une autre langue RTL, ajoutez son code à RTL_LANGUAGES dans init.py — aucune autre modification n’est nécessaire.
4.7.5. Ajout d’une nouvelle langue
Copiez modules/ui/i18n/en.py vers modules/ui/i18n/{code}.py et traduisez toutes les valeurs. Conservez chaque clé ; n’en supprimez aucune. Préservez exactement les noms {placeholder} tels qu’ils apparaissent dans la source anglaise :
# modules/ui/i18n/de.py
"""German strings."""
STRINGS: dict[str, str] = {
'nav.dashboard': 'Dashboard',
'nav.catalogue': 'Katalogsuche',
'nav.tree': 'Baumsuche',
'nav.manual': 'Manuell abonnieren',
'nav.manage': 'Abonnements verwalten',
'nav.settings': 'Einstellungen',
# ... all remaining keys ...
'subscriptions.folder': 'Ordner: {path}', # {path} must be preserved
'settings.records': '{name}: {count} Einträge',
}
init.pyfrom . import ar, de, en, es, fr, ru, zh (1)
LANGUAGES: dict[str, str] = {
'en': 'English',
'fr': 'Français',
'es': 'Español',
'ar': 'العربية',
'zh': '中文',
'ru': 'Русский',
'de': 'Deutsch', (2)
}
_STRINGS: dict[str, dict[str, str]] = {
'en': en.STRINGS,
'fr': fr.STRINGS,
'es': es.STRINGS,
'ar': ar.STRINGS,
'zh': zh.STRINGS,
'ru': ru.STRINGS,
'de': de.STRINGS, (3)
}
| 1 | Ajouter l’import |
| 2 | Ajouter le nom d’affichage (dans le script natif) |
| 3 | Ajouter le mappage des chaînes |
Si la langue est de droite à gauche (ex. hébreu he, persan fa) :
RTL_LANGUAGES: frozenset[str] = frozenset({'ar', 'he'})
docker-compose restart wis2downloader-ui
La nouvelle langue apparaîtra immédiatement dans le sélecteur de langue de l’en-tête.
| Les traductions non-anglaises existantes sont générées automatiquement et fournies uniquement comme point de départ. Toutes les chaînes doivent être vérifiées par un locuteur natif avant le déploiement en production, avec une attention particulière aux termes du domaine WMO/météorologique (WIS2, BUFR, GRIB, cache global, etc.) qui ont des traductions établies dans les documents officiels de l’WMO. |
4.7.6. Ajout de nouvelles chaînes traduisibles
Lors de l’ajout de texte à n’importe quelle vue ou composant :
en.py (source de vérité)# modules/ui/i18n/en.py
'myview.title': 'My New View',
'myview.description': 'Showing results for {topic}.',
Copiez la valeur anglaise comme espace réservé si une traduction n’est pas encore disponible. t() revient automatiquement à l’anglais, mais avoir la clé présente évite les lacunes dans les outils :
# fr.py / es.py / ar.py / zh.py / ru.py
'myview.title': 'My New View', # TODO: translate
'myview.description': 'Showing results for {topic}.',
t() dans la vuefrom i18n import t
def render(container):
with container:
ui.label(t('myview.title')).classes('page-title')
ui.label(t('myview.description', topic=selected_topic))
5. Communication inter-services
5.1. Redis PubSub (Commandes)
La communication gestionnaire d’abonnements → abonné utilise Redis PubSub sur le canal subscription_commands :
// Channel: subscription_commands
// Subscribe command
{
"action": "subscribe",
"topic": "cache/a/wis2/+/data/#",
"save_path": "all-data",
"filters": {"accepted_media_types": ["application/bufr"]}
}
// Unsubscribe command
{
"action": "unsubscribe",
"topic": "cache/a/wis2/+/data/#"
}
5.2. Tâches Celery (Téléchargements)
La communication abonné → worker Celery utilise la file d’attente de tâches Celery :
job = {
"topic": "cache/a/wis2/de-dwd/data/...",
"target": "dwd-data",
"filters": {"accepted_media_types": [...]},
"_broker": "globalbroker.meteo.fr",
"_received": "2026-01-28 10:15:30",
"_queued": "2026-01-28 10:15:30",
"payload": {
"id": "...",
"properties": {
"data_id": "...",
"metadata_id": "...",
"integrity": {"method": "sha512", "value": "..."}
},
"links": [
{"rel": "canonical", "href": "https://..."}
]
}
}
5.3. Clés Redis
| Modèle de clé | Objectif |
|---|---|
|
Hash de tous les abonnements (sub_id → JSON |
|
Suivi de déduplication (types : |
|
Verrou distribué empêchant les téléchargements en double concurrents |
|
JSON du catalogue GDC en cache (CMA, DWD, ECCC) ; TTL défini par |
|
Hash des métriques Prometheus (champ = dict JSON d’étiquettes, valeur = compteur/jauge flottant) |
|
Signal de santé de l’abonné |
|
File d’attente de tâches Celery (par défaut) |
6. Extension du système
6.1. Ajout d’une nouvelle condition de correspondance dans le moteur de filtres
Le moteur de filtres se trouve dans modules/shared/shared/filters.py. Chaque condition de correspondance est une clé dans l’objet match distribué par _evaluate_match().
Pour ajouter une nouvelle condition intégrée (ex. correspondance sur un nouveau champ de métadonnées station_id) :
-
Ajoutez le champ à
MatchContextdansfilters.py:@dataclass class MatchContext: ... station_id: str | None = None -
Renseignez-le dans
_build_context()danswis2.py(avant et/ou après le téléchargement). -
Ajoutez une branche dans
_evaluate_match():if 'station_id' in match: return _match_string_field(ctx.station_id, match['station_id']) -
Documentez le nouveau champ dans
openapi.ymlet dans la référence de filtre du guide utilisateur.
Aucune modification n’est nécessaire dans app.py, command_listener.py ou subscriber.py — l’objet filtre est transmis tel quel et évalué au moment du téléchargement.
6.2. Ajout d’une nouvelle tâche
-
Créez une tâche dans
modules/task_manager/task_manager/tasks/ -
Enregistrez dans
worker.pyautodiscover -
Ajoutez au flux de travail si nécessaire
# modules/task_manager/task_manager/tasks/custom.py
from task_manager.worker import app
@app.task
def my_custom_task(data):
# Process data
return result
6.3. Ajout d’une nouvelle métrique
Les métriques sont stockées dans Redis via shared.redis_metrics. Pour ajouter une nouvelle métrique :
-
Enregistrez-la dans le dict
METRICSdansmodules/shared/shared/redis_metrics.py:# short name → (prometheus type, help text) METRICS: dict[str, tuple[str, str]] = { ... 'my_counter_total': ('counter', 'Description of the counter.'), 'my_gauge': ('gauge', 'Description of the gauge.'), } -
Incrémentez-la depuis n’importe quel service (compteur) ou définissez-la (jauge) :
from shared import incr_counter, set_gauge # Compteur (ex. dans une tâche Celery) incr_counter('my_counter_total', {'label1': 'value', 'label2': 'value'}) # Jauge (ex. dans une tâche planifiée) set_gauge('my_gauge', {'label1': 'value'}, 42.0)
Seules les métriques enregistrées dans METRICS apparaissent dans la sortie /metrics.
7. Tests
7.1. Exécution des tests
# Installer les dépendances de test
pip install pytest pytest-cov
# Exécuter les tests
pytest modules/
# Avec couverture
pytest --cov=modules modules/
7.2. Tests manuels
# Démarrer les services
docker-compose up -d
# Créer un abonnement de test
curl -X POST http://localhost:5002/subscriptions \
-H "Content-Type: application/json" \
-d '{"topic": "cache/a/wis2/de-dwd/data/#", "target": "test"}'
# Surveiller les journaux
docker-compose logs -f subscriber celery
# Vérifier les téléchargements
ls -la downloads/test/
8. Configuration du développement
8.1. Développement local
# Créer un environnement virtuel
python -m venv venv
source venv/bin/activate
# Installer les modules en mode éditable
pip install -e modules/shared
pip install -e modules/task_manager
pip install -e modules/subscriber
pip install -e modules/subscription_manager
# Démarrer Redis (instance unique pour le développement)
docker run -d -p 6379:6379 redis:7.2-alpine redis-server --requirepass devpassword
# Configurer l'environnement
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=devpassword
export FLASK_SECRET_KEY=dev-secret-key
export LOG_LEVEL=DEBUG
# Exécuter le gestionnaire d'abonnements
python -m subscription_manager.app
# Exécuter le souscripteur (dans un autre terminal)
export GLOBAL_BROKER_HOST=globalbroker.meteo.fr
python -m subscriber.manager
# Exécuter le worker Celery (dans un autre terminal)
celery -A task_manager.worker worker --loglevel=DEBUG
8.2. Build Docker
# Construire toutes les images
docker-compose build
# Construire un service spécifique
docker-compose build celery
# Reconstruire sans cache
docker-compose build --no-cache
9. Débogage
9.1. Voir les tâches Celery
# Inspecter les tâches actives
docker exec -it wis2downloader-celery-1 \
celery -A task_manager.worker inspect active
# Inspecter les tâches réservées
docker exec -it wis2downloader-celery-1 \
celery -A task_manager.worker inspect reserved
9.2. Inspection Redis
# Se connecter à Redis (utiliser le mot de passe de l'environnement)
docker exec -it redis redis-cli -a $REDIS_PASSWORD
# Lister les abonnements
HGETALL global:subscriptions
# Vérifier la longueur de la file d'attente
LLEN celery
# Voir les clés de déduplication
KEYS wis2:notifications:*
# Voir le cache du catalogue GDC (rempli par l'interface au démarrage)
KEYS gdc:cache:*
TTL gdc:cache:CMA
Toutes les commandes Redis nécessitent une authentification. Le drapeau -a $REDIS_PASSWORD transmet le mot de passe depuis votre environnement.
|
9.3. Débogage MQTT
# S'abonner manuellement à un sujet (mosquitto-clients)
mosquitto_sub -h globalbroker.meteo.fr -p 443 \
-t 'cache/a/wis2/de-dwd/data/#' \
-u everyone -P everyone \
--capath /etc/ssl/certs