1. Architecture Overview
WIS2 Downloader is a distributed system with four main components:
Docker Compose
┌──────────────────────────────────────────────────────────┐
│ │
│ Web UI (NiceGUI :8080) │
│ - reads GDC catalogue data from Redis cache │
│ - posts subscriptions to Subscription Manager API │
│ │ │
│ ▼ │
│ Subscription Manager (Flask API :5001) │
│ - persists subscriptions in Redis │
│ - publishes subscribe/unsubscribe commands via PubSub │
│ │ │
│ Redis PubSub (commands) │
│ │ │
│ ▼ │
│ Subscriber (MQTT Client) │
│ - connects to WIS2 Global Broker │
│ - on notification: queues Celery download task │
│ │ │
│ Redis (Celery queue + dedup state) │
│ │ │
│ ▼ │
│ Celery Workers │
│ - download file, verify hash, apply filters, save │
│ │
└──────────────────────────────────────────────────────────┘
1.1. Data Flow
-
On startup, the Web UI fetches WCMP2 records from the three GDC endpoints (or Redis cache) and builds an in-memory topic hierarchy
-
User browses the Catalogue or Tree view and selects a topic in the Web UI
-
User clicks Subscribe → Web UI POSTs a subscription to the Subscription Manager REST API
-
Subscription Manager persists the subscription in Redis and publishes a subscribe command to the Redis PubSub channel
-
Subscriber receives the command and subscribes to the MQTT topic on the WIS2 Global Broker
-
When a WIS2 notification arrives, Subscriber queues a Celery download task
-
Celery worker downloads the file, verifies the hash, applies filters, and saves to disk
2. Module Structure
modules/
├── shared/ # Shared utilities
│ └── shared/
│ ├── __init__.py
│ ├── logging.py # Centralized logging
│ └── redis_client.py # Redis client singleton
│
├── subscriber/ # MQTT subscriber service
│ └── subscriber/
│ ├── __init__.py
│ ├── manager.py # Entry point, thread management
│ ├── subscriber.py # MQTT client wrapper
│ └── command_listener.py # Redis PubSub listener
│
├── subscription_manager/ # REST API service
│ └── subscription_manager/
│ ├── __init__.py
│ ├── app.py # Flask application
│ └── static/
│ └── openapi.yml # API specification
│
├── task_manager/ # Celery tasks
│ └── task_manager/
│ ├── __init__.py
│ ├── worker.py # Celery app for download workers
│ ├── scheduler.py # Separate Celery app for periodic/housekeeping tasks
│ ├── tasks/
│ │ ├── wis2.py # Download task
│ │ └── scheduled_tasks.py # Periodic tasks (disk check, file cleanup)
│ └── workflows/
│ └── __init__.py # Task chains
│
└── ui/ # NiceGUI web interface (port 8080)
├── main.py # Entry point, page route, AppState
├── layout.py # PageLayout builder
├── config.py # Environment variable config
├── data.py # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│ └── base.css # All custom CSS
├── components/ # Reusable layout components
├── i18n/ # Internationalisation
│ ├── __init__.py # t(), current_lang(), is_rtl(), LANGUAGES
│ ├── en.py # English strings (source of truth)
│ ├── fr.py # French
│ ├── es.py # Spanish
│ ├── ar.py # Arabic (RTL)
│ ├── zh.py # Chinese
│ └── ru.py # Russian
├── models/
│ └── wcmp2.py # WCMP2Record dataclass
└── views/ # Page views (catalogue, tree, subscriptions, etc.)
3. Module Details
3.1. Shared Module
Provides common utilities used across all services.
3.1.1. Redis Client
from shared import get_redis_client
# Get Redis client (singleton)
redis = get_redis_client()
# Use like normal Redis client
redis.set('key', 'value')
redis.get('key')
The client:
-
Connects directly to Redis server
-
Caches connection (singleton pattern)
-
Includes retry logic for transient failures
3.1.2. Logging
from shared import setup_logging
# Configure root logger (call once at startup)
setup_logging()
# Get module-specific logger
LOGGER = setup_logging(__name__)
LOGGER.info("Message with UTC timestamp")
Features:
-
UTC timestamps in ISO 8601 format
-
Configurable via
LOG_LEVELenvironment variable -
Outputs to stdout for Docker log collection
3.2. Subscriber Module
Connects to WIS2 Global Broker via MQTT and processes incoming notifications.
3.2.1. Entry Point
def run_manager():
# 1. Create MQTT subscriber
mqtt_subscriber = Subscriber(**broker_config)
# 2. Create Redis command listener
redis_listener = CommandListener(
subscriber=mqtt_subscriber,
channel=COMMAND_CHANNEL
)
# 3. Start MQTT in separate thread
mqtt_thread = threading.Thread(target=mqtt_subscriber.start, daemon=True)
mqtt_thread.start()
# 4. Start Redis listener (blocks)
redis_listener.start()
3.2.2. MQTT Subscriber
class Subscriber:
def __init__(self, host, port, uid, pwd, protocol, session):
# Configure MQTT client with callbacks
self.client = mqtt.Client(...)
self.client.on_message = self._on_message
def _on_message(self, client, userdata, msg):
# Parse notification, create Celery task
job = {
"topic": msg.topic,
"target": target,
"filters": filters,
"payload": json.loads(msg.payload)
}
workflow = wis2_download(job)
workflow.apply_async()
def subscribe(self, topic, subscriptions):
# subscriptions: {sub_id: {id, save_path, filter}}
self.client.subscribe(topic, qos=0)
self.active_subscriptions[topic] = {
"pattern": topic,
"subscriptions": subscriptions
}
def add_subscription(self, topic, sub_id, save_path, filter_config):
self.active_subscriptions[topic]["subscriptions"][sub_id] = {
"id": sub_id, "save_path": save_path, "filter": filter_config
}
def remove_subscription(self, topic, sub_id):
self.active_subscriptions[topic]["subscriptions"].pop(sub_id, None)
def unsubscribe(self, topic):
self.client.unsubscribe(topic)
del self.active_subscriptions[topic]
3.2.3. Command Listener
class CommandListener(threading.Thread):
def __init__(self, subscriber, channel):
self.subscriber = subscriber
self.pubsub = get_redis_client().pubsub()
def run(self):
self.pubsub.subscribe(self.channel)
while not self.stop_event.is_set():
message = self.pubsub.get_message()
if message:
self._process_command(message)
def _process_command(self, message):
command = json.loads(message['data'])
action = command['action']
if action == 'subscribe':
self.subscriber.subscribe(command['topic'], command['subscriptions'])
elif action == 'add_subscription':
self.subscriber.add_subscription(
command['topic'], command['sub_id'],
command['save_path'], command['filter']
)
elif action == 'remove_subscription':
self.subscriber.remove_subscription(command['topic'], command['sub_id'])
elif action == 'unsubscribe':
self.subscriber.unsubscribe(command['topic'])
elif action == 'update_subscription':
self.subscriber.add_subscription(
command['topic'], command['sub_id'],
command['save_path'], command['filter']
)
3.3. Subscription Manager Module
Flask REST API for managing subscriptions.
3.3.1. Flask Application
@app.post('/subscriptions')
def add_subscription():
data = get_json()
topic = normalise_topic(data.get('topic'))
save_path = normalise_path(data.get('target', ''))
filter_config = data.get('filter') or data.get('filters') or {}
sub_id = str(uuid4())
sub_data = {'id': sub_id, 'topic': topic, 'save_path': save_path, 'filter': filter_config}
existing = _subs_for_topic(topic, _get_all_subscriptions())
is_new_topic = len(existing) == 0
_persist_subscription(sub_id, sub_data)
if is_new_topic:
# First sub for this topic — open MQTT connection
command = {"action": "subscribe", "topic": topic,
"subscriptions": {sub_id: sub_data}}
else:
# Topic already open — just register the new destination
command = {"action": "add_subscription", "topic": topic,
"sub_id": sub_id, "save_path": save_path, "filter": filter_config}
publish_command(command)
return jsonify(sub_data), 201
3.3.2. Metrics Endpoint
Metrics are stored in Redis using atomic HINCRBYFLOAT operations, making them safe across multiple Celery worker containers. The endpoint reads from Redis and renders Prometheus text format:
@app.route('/metrics')
def expose_metrics():
# Update live gauge before exposing
queue_length = redis_client.llen(CELERY_DEFAULT_QUEUE)
set_gauge('celery_queue_length', {'queue_name': CELERY_DEFAULT_QUEUE}, queue_length)
return Response(generate_prometheus_text(), mimetype="text/plain")
Increment a counter from any worker:
from shared import incr_counter
incr_counter('downloads_total', {'cache': hostname, 'media_type': mime_type})
3.4. Task Manager Module
Celery workers for downloading and processing files.
3.4.1. Celery Configuration
app = Celery('tasks',
broker=CELERY_BROKER_URL,
result_backend=CELERY_RESULT_BACKEND)
# Auto-discover tasks
app.autodiscover_tasks(['task_manager.tasks', 'task_manager.tasks.wis2'])
3.4.2. Download Task
@app.task(bind=True)
@metrics_collector
def download_from_wis2(self, job):
result = {...} # Initialize result dict
# 1. Extract identifiers
message_id = job['payload']['id']
data_id = job['payload']['properties']['data_id']
filehash = job['payload']['properties']['integrity']['value']
# 2. Deduplication check
for key, type in [(message_id, 'by-msg-id'), ...]:
if get_status(key, type) == STATUS_SUCCESS:
result['status'] = STATUS_SKIPPED
return result
# 3. Acquire lock
lock_acquired = redis_client.set(lock_key, 1, nx=True, ex=LOCK_EXPIRE)
if not lock_acquired:
raise self.retry(countdown=10, max_retries=10)
# 4. Download file
response = _pool.request('GET', download_url, ...)
# 5. Pre-download filter pass (no media_type/size yet)
pre_action, pre_reason = _apply_job_filter(job, ctx_pre)
if pre_action == 'reject':
result['status'] = STATUS_SKIPPED
return result
# 6. Download file
response = _pool.request('GET', download_url, ...)
# 7. Verify hash
if hash_expected and hash_base64 != hash_expected:
result['status'] = STATUS_FAILED
return result
# 8. Post-download filter pass (media_type and size now known)
post_action, post_reason = _apply_job_filter(job, ctx_post)
if post_action == 'reject':
result['status'] = STATUS_SKIPPED
return result
# 9. Save file
output_path.write_bytes(data)
result['status'] = STATUS_SUCCESS
return result
3.4.3. Workflow Chains
def wis2_download(args):
workflow = download_from_wis2.s(args)
return workflow
3.4.4. Scheduler (Periodic Tasks)
A separate Celery app (scheduler.py) handles housekeeping using Celery Beat. It uses Redis databases 2/3 to avoid interfering with the download worker queues (databases 0/1). Periodic tasks are defined in tasks/scheduled_tasks.py:
| Task | Interval | Purpose |
|---|---|---|
|
Every 5 min |
Sets |
|
Every 10 min |
Deletes files older than |
|
Daily |
Full |
The scheduler is started as two Docker Compose services: celery-scheduler-workers (runs tasks) and celery-beats (triggers them on schedule).
4. UI Module
The UI is a NiceGUI web application served at port 8080. It is the primary user-facing interface for discovering datasets, browsing the WIS2 topic hierarchy, and creating subscriptions.
4.1. Module Structure
modules/ui/
├── main.py # Application entry point, page route, AppState
├── layout.py # PageLayout builder
├── config.py # Environment variable config (SUBSCRIPTION_MANAGER_URL etc.)
├── data.py # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│ └── base.css # All custom CSS (inside @layer components)
├── components/
│ ├── header.py # WMO banner + toolbar + language selector
│ ├── footer.py # Footer bar
│ ├── navigation_drawer.py # Left nav drawer with view links
│ └── page_body.py # Main content area + right sidebar
├── i18n/ # Internationalisation
│ ├── __init__.py # t(), current_lang(), is_rtl(), LANGUAGES
│ ├── en.py # English strings (source of truth)
│ ├── fr.py # French
│ ├── es.py # Spanish
│ ├── ar.py # Arabic (RTL)
│ ├── zh.py # Chinese
│ └── ru.py # Russian
├── models/
│ └── wcmp2.py # WCMP2Record dataclass (GeoJSON Feature)
└── views/
├── dashboard.py # Grafana iframe embed
├── catalogue.py # Catalogue search + result cards
├── tree.py # Topic hierarchy tree
├── subscriptions.py # Active subscriptions list
├── settings.py # GDC status + refresh trigger
└── shared.py # Shared sidebar logic (on_topics_picked, confirm_subscribe, show_metadata)
4.2. Data Layer (data.py)
The data layer is responsible for fetching WCMP2 records from the three WIS2 Global Discovery Catalogues (GDC), merging them, and building two module-level caches that all views read from.
4.2.1. GDC Sources
Records are fetched from three public GDC endpoints at startup:
| Short name | URL |
|---|---|
|
|
|
|
|
Raw JSON responses are cached in Redis under keys gdc:cache:CMA, gdc:cache:DWD, and gdc:cache:ECCC with a TTL controlled by GDC_CACHE_TTL_SECONDS (default 6 hours). Redis is optional — if unavailable, data is fetched over HTTP on every startup.
4.2.2. Merged Records
After all GDC sources are loaded, _build_merged_records() deduplicates records by ID across catalogues:
-
Records present in only one catalogue appear once with a single
source_gdcsentry. -
Records present in multiple catalogues are merged:
source_gdcslists all contributing catalogues. -
has_discrepancyis set toTrueif properties, geometry, or links differ between catalogues. -
Links are unioned across catalogues so that channel data present in any one GDC is preserved on the merged record.
The result is stored in the module-level _merged_records list and returned by merged_records().
4.2.3. Topic Hierarchy
_build_topic_hierarchy() iterates _merged_records and inserts each record’s first cache/ MQTT channel into a nested dict:
{
"cache": {
"children": {
"a": {
"children": {
"wis2": {
"children": {
"de-dwd": {
"children": {
...
"synop": {
"datasets": [<WCMP2Record>, ...]
}
}
}
}
}
}
}
}
}
}
A node may contain both "children" and "datasets" if one channel is a prefix of another.
The hierarchy is stored in _topic_hierarchy and returned by topic_hierarchy(). It is consumed by tree.py to build the ui.tree widget, and by get_datasets_for_channel() to resolve datasets for a given topic.
4.2.4. Key Functions
| Function | Description |
|---|---|
|
Returns the cached merged record list |
|
Returns the cached topic hierarchy |
|
Strips trailing |
|
Fetches GDC data (Redis cache first, then HTTP), rebuilds |
4.3. WCMP2 Model (models/wcmp2.py)
WCMP2Record is a dataclass representation of a WIS2 Discovery Metadata GeoJSON Feature.
from models.wcmp2 import WCMP2Record
rec = WCMP2Record.from_dict(feature_dict)
rec.id # str - unique dataset identifier
rec.title # str | None
rec.description # str | None
rec.keywords # list[str]
rec.wmo_data_policy # 'core' | 'recommended' | None
rec.geometry # Geometry | None (.type, .coordinates)
rec.links # list[Link] (.channel, .href, .rel, .extra)
rec.mqtt_channels # list[str] - channels from all links
Link.extra is a dict capturing non-schema keys from the GDC response (e.g. GDC-specific filters used for custom filter fields in the subscription sidebar).
4.4. AppState
Each browser session has its own AppState instance (defined in main.py):
class AppState:
def __init__(self):
self.selected_topics: list[str] = []
self.current_view: str = 'dashboard'
selected_topics holds the MQTT topics currently selected in the UI and drives the right sidebar. current_view tracks the active view name and is persisted to app.storage.user['current_view'] before any page reload (e.g. on language change) so the user is returned to the same view.
4.5. Views
All views follow the same convention: a render(container, …) function that builds the NiceGUI elements inside the provided container element. render is always a plain (non-async) function so it can be called synchronously from show_view() in main.py.
4.5.1. Catalogue View
views/catalogue.py provides full-text search across merged_records() with pure filter functions:
| Function | Description |
|---|---|
|
Matches query against ID, title, description, keywords, and theme concepts |
|
Matches |
|
Comma-separated keywords; all must appear in the record |
|
Shapely geometry intersection with the given bounding box |
Result cards are rendered by update_search_results(), which is also responsible for pagination.
4.5.2. Tree View
views/tree.py converts topic_hierarchy() into ui.tree nodes via _to_tree_nodes() (recursive, alphabetically sorted). Selection uses on_select to enforce single-node selection — on_tick is not used because it conflicts with NiceGUI’s internal state synchronisation.
4.5.3. Shared Sidebar (views/shared.py)
on_topics_picked(e, state, layout, is_page_selection, dataset_id) is the central handler for both catalogue and tree selection. It:
-
Updates
state.selected_topics -
Builds the right sidebar with save-directory input, dataset/media-type/bbox/date filters, and optional custom filters
-
Wires the Subscribe button to
confirm_subscribe()
confirm_subscribe() shows a dialog with the full JSON payload before calling subscribe_to_topics().
show_metadata(dataset_id) looks up the record from merged_records() and renders a details dialog with an interactive Leaflet map.
4.6. Adding a New View
-
Create
modules/ui/views/myview.pywith arender(container)function, usingt()for all user-visible strings -
Add a navigation entry in
components/navigation_drawer.py— theNAV_ITEMSlist takes a(view_id, label_key, icon)tuple wherelabel_keyis anav.*i18n key -
Add a branch in the
show_view()dispatcher inmain.py -
Add any new CSS classes to
assets/base.cssinside the@layer components { … }block -
Add all new string keys to
i18n/en.pyand to each other language file — see Internationalisation (i18n)
4.7. Internationalisation (i18n)
The UI supports multiple interface languages via a lightweight dictionary-based i18n system in modules/ui/i18n/.
4.7.1. Supported Languages
| Code | Language | Direction |
|---|---|---|
|
English |
LTR |
|
Français |
LTR |
|
Español |
LTR |
|
العربية |
RTL |
|
中文 |
LTR |
|
Русский |
LTR |
The active language is stored per browser session in app.storage.user['lang'] and defaults to English. Users change language via the selector in the header toolbar; the page reloads to apply the change.
4.7.2. How t() Works
from i18n import t
# Simple lookup
label = t('nav.dashboard') # → 'Dashboard'
# With interpolation (uses str.format)
label = t('subscriptions.folder', path='./') # → 'Folder: ./'
The lookup chain is:
-
Chosen language file
-
English (
en.py) — fallback if key is missing in the chosen language -
The key string itself — fallback if missing from English too (visible in development)
t() reads app.storage.user['lang'] from the current NiceGUI request session. Always call it at render time (inside a @ui.page handler or a NiceGUI event callback), never at module import time.
|
Strings that contain literal { or } characters (for example, JSON examples in hint text) must use single braces as-is. Do not use {{ / }} escaping — t() only calls .format(**kwargs) when keyword arguments are passed, so {{ would be displayed literally.
|
4.7.3. Key Naming Conventions
Keys use dot-separated namespaces:
| Prefix | Scope | Example |
|---|---|---|
|
Navigation drawer labels |
|
|
Button labels |
|
|
Subscription sidebar |
|
|
Catalogue view |
|
|
Tree view |
|
|
Manage Subscriptions view |
|
|
Settings view |
|
|
Manual Subscribe view |
|
|
Manual Subscribe validation messages |
|
|
Dialog titles |
|
|
Metadata dialog |
|
|
Shared validation messages |
|
|
Footer |
|
|
ARIA accessibility labels |
|
4.7.4. RTL Support
Arabic (ar) is right-to-left. When Arabic is selected, is_rtl() returns True and main.py’s `on_connect() handler injects dir="rtl" on the <html> element via JavaScript, activating Quasar’s RTL layout mode. Custom CSS overrides in assets/base.css (under the RTL overrides comment block) correct Quasar’s physical padding on .q-page-container and reposition the subscription sidebar.
To add another RTL language, add its code to RTL_LANGUAGES in init.py — no other changes are needed.
4.7.5. Adding a New Language
Copy modules/ui/i18n/en.py to modules/ui/i18n/{code}.py and translate all values. Keep every key; do not remove any. Preserve {placeholder} names exactly as they appear in the English source:
# modules/ui/i18n/de.py
"""German strings."""
STRINGS: dict[str, str] = {
'nav.dashboard': 'Dashboard',
'nav.catalogue': 'Katalogsuche',
'nav.tree': 'Baumsuche',
'nav.manual': 'Manuell abonnieren',
'nav.manage': 'Abonnements verwalten',
'nav.settings': 'Einstellungen',
# ... all remaining keys ...
'subscriptions.folder': 'Ordner: {path}', # {path} must be preserved
'settings.records': '{name}: {count} Einträge',
}
init.pyfrom . import ar, de, en, es, fr, ru, zh (1)
LANGUAGES: dict[str, str] = {
'en': 'English',
'fr': 'Français',
'es': 'Español',
'ar': 'العربية',
'zh': '中文',
'ru': 'Русский',
'de': 'Deutsch', (2)
}
_STRINGS: dict[str, dict[str, str]] = {
'en': en.STRINGS,
'fr': fr.STRINGS,
'es': es.STRINGS,
'ar': ar.STRINGS,
'zh': zh.STRINGS,
'ru': ru.STRINGS,
'de': de.STRINGS, (3)
}
| 1 | Add the import |
| 2 | Add the display name (in the native script) |
| 3 | Add the strings mapping |
If the language is right-to-left (e.g. Hebrew he, Persian fa):
RTL_LANGUAGES: frozenset[str] = frozenset({'ar', 'he'})
docker-compose restart wis2downloader-ui
The new language will appear immediately in the header language selector.
| The existing non-English translations are machine-generated and provided as a starting point only. All strings should be reviewed by a native speaker before production deployment, with particular attention to WMO/meteorological domain terms (WIS2, BUFR, GRIB, Global Cache, etc.) which have established translations in WMO official documents. |
4.7.6. Adding New Translatable Strings
When adding UI text to any view or component:
en.py (source of truth)# modules/ui/i18n/en.py
'myview.title': 'My New View',
'myview.description': 'Showing results for {topic}.',
Copy the English value as a placeholder if a translation is not yet available. t() falls back to English automatically, but having the key present avoids gaps in tooling:
# fr.py / es.py / ar.py / zh.py / ru.py
'myview.title': 'My New View', # TODO: translate
'myview.description': 'Showing results for {topic}.',
t() in the viewfrom i18n import t
def render(container):
with container:
ui.label(t('myview.title')).classes('page-title')
ui.label(t('myview.description', topic=selected_topic))
5. Inter-Service Communication
5.1. Redis PubSub (Commands)
Subscription Manager → Subscriber communication uses Redis PubSub on channel subscription_commands:
// First subscription for a topic — opens MQTT connection
{"action": "subscribe", "topic": "cache/a/wis2/+/data/#",
"subscriptions": {"<uuid>": {"id": "<uuid>", "save_path": "all-data", "filter": {}}}}
// Additional subscription for an already-open topic
{"action": "add_subscription", "topic": "cache/a/wis2/+/data/#",
"sub_id": "<uuid>", "save_path": "grib-data", "filter": {"rules": [...]}}
// Remove one subscription (MQTT stays open)
{"action": "remove_subscription", "topic": "cache/a/wis2/+/data/#", "sub_id": "<uuid>"}
// Last subscription removed — closes MQTT connection
{"action": "unsubscribe", "topic": "cache/a/wis2/+/data/#"}
// Update save_path or filter for an existing subscription
{"action": "update_subscription", "topic": "cache/a/wis2/+/data/#",
"sub_id": "<uuid>", "save_path": "new-path", "filter": {}}
5.2. Celery Tasks (Downloads)
Subscriber → Celery Worker communication uses Celery task queue:
job = {
"topic": "cache/a/wis2/de-dwd/data/...",
"target": "dwd-data",
"filter": {"name": "...", "rules": [...]}, # subscription-level filter
"_broker": "globalbroker.meteo.fr",
"_received": "2026-01-28 10:15:30",
"_queued": "2026-01-28 10:15:30",
"payload": {
"id": "...",
"properties": {
"data_id": "...",
"metadata_id": "...",
"integrity": {"method": "sha512", "value": "..."}
},
"links": [
{"rel": "canonical", "href": "https://..."}
]
}
}
5.3. Redis Keys
| Key Pattern | Purpose |
|---|---|
|
Hash of all subscriptions (sub_id → JSON |
|
Deduplication tracking (types: |
|
Distributed lock preventing concurrent duplicate downloads |
|
Cached GDC catalogue JSON (CMA, DWD, ECCC); TTL set by |
|
Prometheus metrics hash (field = JSON label dict, value = float counter/gauge) |
|
Subscriber health heartbeat |
|
Celery task queue (default) |
6. Extending the System
6.1. Adding a New Match Condition to the Filter Engine
The filter engine lives in modules/shared/shared/filters.py. Each match condition is a key in the match object dispatched by _evaluate_match().
To add a new built-in condition (e.g. matching on a new metadata field station_id):
-
Add the field to
MatchContextinfilters.py:@dataclass class MatchContext: ... station_id: str | None = None -
Populate it in
_build_context()inwis2.py(pre- and/or post-download). -
Add a branch in
_evaluate_match():if 'station_id' in match: return _match_string_field(ctx.station_id, match['station_id']) -
Document the new field in
openapi.ymland in the filter reference in the user guide.
No changes are needed in app.py, command_listener.py, or subscriber.py — the filter object is passed through unchanged and evaluated at download time.
6.2. Adding a New Task
-
Create task in
modules/task_manager/task_manager/tasks/ -
Register in
worker.pyautodiscover -
Add to workflow if needed
# modules/task_manager/task_manager/tasks/custom.py
from task_manager.worker import app
@app.task
def my_custom_task(data):
# Process data
return result
6.3. Adding a New Metric
Metrics are stored in Redis via shared.redis_metrics. To add a new metric:
-
Register it in the
METRICSdict inmodules/shared/shared/redis_metrics.py:# short name → (prometheus type, help text) METRICS: dict[str, tuple[str, str]] = { ... 'my_counter_total': ('counter', 'Description of the counter.'), 'my_gauge': ('gauge', 'Description of the gauge.'), } -
Increment it from any service (counter) or set it (gauge):
from shared import incr_counter, set_gauge # Counter (e.g. in a Celery task) incr_counter('my_counter_total', {'label1': 'value', 'label2': 'value'}) # Gauge (e.g. in a scheduled task) set_gauge('my_gauge', {'label1': 'value'}, 42.0)
Only metrics registered in METRICS appear in the /metrics output.
7. Testing
7.1. Running Tests
# Install test dependencies
pip install pytest pytest-cov
# Run tests
pytest modules/
# With coverage
pytest --cov=modules modules/
7.2. Manual Testing
# Start services
docker-compose up -d
# Create test subscription
curl -X POST http://localhost:5002/subscriptions \
-H "Content-Type: application/json" \
-d '{"topic": "cache/a/wis2/de-dwd/data/#", "target": "test"}'
# Watch logs
docker-compose logs -f subscriber celery
# Check downloads
ls -la downloads/test/
8. Development Setup
8.1. Local Development
# Create virtual environment
python -m venv venv
source venv/bin/activate
# Install modules in editable mode
pip install -e modules/shared
pip install -e modules/task_manager
pip install -e modules/subscriber
pip install -e modules/subscription_manager
# Start Redis (single instance for dev)
docker run -d -p 6379:6379 redis:7.2-alpine redis-server --requirepass devpassword
# Set environment
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=devpassword
export FLASK_SECRET_KEY=dev-secret-key
export LOG_LEVEL=DEBUG
# Run subscription manager
python -m subscription_manager.app
# Run subscriber (in another terminal)
export GLOBAL_BROKER_HOST=globalbroker.meteo.fr
python -m subscriber.manager
# Run celery worker (in another terminal)
celery -A task_manager.worker worker --loglevel=DEBUG
8.2. Docker Build
# Build all images
docker-compose build
# Build specific service
docker-compose build celery
# Rebuild without cache
docker-compose build --no-cache
9. Debugging
9.1. View Celery Tasks
# Inspect active tasks
docker exec -it wis2downloader-celery-1 \
celery -A task_manager.worker inspect active
# Inspect reserved tasks
docker exec -it wis2downloader-celery-1 \
celery -A task_manager.worker inspect reserved
9.2. Redis Inspection
# Connect to Redis (use password from environment)
docker exec -it redis redis-cli -a $REDIS_PASSWORD
# List subscriptions
HGETALL global:subscriptions
# Check queue length
LLEN celery
# View deduplication keys
KEYS wis2:notifications:*
# View GDC catalogue cache (populated by the UI on startup)
KEYS gdc:cache:*
TTL gdc:cache:CMA
All Redis commands require authentication. The -a $REDIS_PASSWORD flag passes the password from your environment.
|
9.3. MQTT Debugging
# Subscribe to topic manually (mosquitto-clients)
mosquitto_sub -h globalbroker.meteo.fr -p 443 \
-t 'cache/a/wis2/de-dwd/data/#' \
-u everyone -P everyone \
--capath /etc/ssl/certs