1. Architecture Overview

WIS2 Downloader is a distributed system with four main components:

                         Docker Compose
  ┌──────────────────────────────────────────────────────────┐
  │                                                           │
  │  Web UI (NiceGUI :8080)                                   │
  │    - reads GDC catalogue data from Redis cache            │
  │    - posts subscriptions to Subscription Manager API      │
  │                          │                                │
  │                          ▼                                │
  │  Subscription Manager (Flask API :5001)                   │
  │    - persists subscriptions in Redis                      │
  │    - publishes subscribe/unsubscribe commands via PubSub  │
  │                          │                                │
  │              Redis PubSub (commands)                      │
  │                          │                                │
  │                          ▼                                │
  │  Subscriber (MQTT Client)                                 │
  │    - connects to WIS2 Global Broker                       │
  │    - on notification: queues Celery download task         │
  │                          │                                │
  │              Redis (Celery queue + dedup state)           │
  │                          │                                │
  │                          ▼                                │
  │  Celery Workers                                           │
  │    - download file, verify hash, apply filters, save      │
  │                                                           │
  └──────────────────────────────────────────────────────────┘

1.1. Data Flow

  1. On startup, the Web UI fetches WCMP2 records from the three GDC endpoints (or Redis cache) and builds an in-memory topic hierarchy

  2. User browses the Catalogue or Tree view and selects a topic in the Web UI

  3. User clicks Subscribe → Web UI POSTs a subscription to the Subscription Manager REST API

  4. Subscription Manager persists the subscription in Redis and publishes a subscribe command to the Redis PubSub channel

  5. Subscriber receives the command and subscribes to the MQTT topic on the WIS2 Global Broker

  6. When a WIS2 notification arrives, Subscriber queues a Celery download task

  7. Celery worker downloads the file, verifies the hash, applies filters, and saves to disk

2. Module Structure

modules/
├── shared/                 # Shared utilities
│   └── shared/
│       ├── __init__.py
│       ├── logging.py      # Centralized logging
│       └── redis_client.py # Redis client singleton
│
├── subscriber/             # MQTT subscriber service
│   └── subscriber/
│       ├── __init__.py
│       ├── manager.py      # Entry point, thread management
│       ├── subscriber.py   # MQTT client wrapper
│       └── command_listener.py  # Redis PubSub listener
│
├── subscription_manager/   # REST API service
│   └── subscription_manager/
│       ├── __init__.py
│       ├── app.py          # Flask application
│       └── static/
│           └── openapi.yml # API specification
│
├── task_manager/           # Celery tasks
│   └── task_manager/
│       ├── __init__.py
│       ├── worker.py       # Celery app for download workers
│       ├── scheduler.py    # Separate Celery app for periodic/housekeeping tasks
│       ├── tasks/
│       │   ├── wis2.py              # Download task
│       │   └── scheduled_tasks.py  # Periodic tasks (disk check, file cleanup)
│       └── workflows/
│           └── __init__.py # Task chains
│
└── ui/                     # NiceGUI web interface (port 8080)
    ├── main.py             # Entry point, page route, AppState
    ├── layout.py           # PageLayout builder
    ├── config.py           # Environment variable config
    ├── data.py             # GDC data layer — fetch, merge, cache, hierarchy
    ├── assets/
    │   └── base.css        # All custom CSS
    ├── components/         # Reusable layout components
    ├── i18n/               # Internationalisation
    │   ├── __init__.py     # t(), current_lang(), is_rtl(), LANGUAGES
    │   ├── en.py           # English strings (source of truth)
    │   ├── fr.py           # French
    │   ├── es.py           # Spanish
    │   ├── ar.py           # Arabic (RTL)
    │   ├── zh.py           # Chinese
    │   └── ru.py           # Russian
    ├── models/
    │   └── wcmp2.py        # WCMP2Record dataclass
    └── views/              # Page views (catalogue, tree, subscriptions, etc.)

3. Module Details

3.1. Shared Module

Provides common utilities used across all services.

3.1.1. Redis Client

modules/shared/shared/redis_client.py
from shared import get_redis_client

# Get Redis client (singleton)
redis = get_redis_client()

# Use like normal Redis client
redis.set('key', 'value')
redis.get('key')

The client:

  • Connects directly to Redis server

  • Caches connection (singleton pattern)

  • Includes retry logic for transient failures

3.1.2. Logging

modules/shared/shared/logging.py
from shared import setup_logging

# Configure root logger (call once at startup)
setup_logging()

# Get module-specific logger
LOGGER = setup_logging(__name__)

LOGGER.info("Message with UTC timestamp")

Features:

  • UTC timestamps in ISO 8601 format

  • Configurable via LOG_LEVEL environment variable

  • Outputs to stdout for Docker log collection

3.2. Subscriber Module

Connects to WIS2 Global Broker via MQTT and processes incoming notifications.

3.2.1. Entry Point

modules/subscriber/subscriber/manager.py
def run_manager():
    # 1. Create MQTT subscriber
    mqtt_subscriber = Subscriber(**broker_config)

    # 2. Create Redis command listener
    redis_listener = CommandListener(
        subscriber=mqtt_subscriber,
        channel=COMMAND_CHANNEL
    )

    # 3. Start MQTT in separate thread
    mqtt_thread = threading.Thread(target=mqtt_subscriber.start, daemon=True)
    mqtt_thread.start()

    # 4. Start Redis listener (blocks)
    redis_listener.start()

3.2.2. MQTT Subscriber

modules/subscriber/subscriber/subscriber.py
class Subscriber:
    def __init__(self, host, port, uid, pwd, protocol, session):
        # Configure MQTT client with callbacks
        self.client = mqtt.Client(...)
        self.client.on_message = self._on_message

    def _on_message(self, client, userdata, msg):
        # Parse notification, create Celery task
        job = {
            "topic": msg.topic,
            "target": target,
            "filters": filters,
            "payload": json.loads(msg.payload)
        }
        workflow = wis2_download(job)
        workflow.apply_async()

    def subscribe(self, topic, subscriptions):
        # subscriptions: {sub_id: {id, save_path, filter}}
        self.client.subscribe(topic, qos=0)
        self.active_subscriptions[topic] = {
            "pattern": topic,
            "subscriptions": subscriptions
        }

    def add_subscription(self, topic, sub_id, save_path, filter_config):
        self.active_subscriptions[topic]["subscriptions"][sub_id] = {
            "id": sub_id, "save_path": save_path, "filter": filter_config
        }

    def remove_subscription(self, topic, sub_id):
        self.active_subscriptions[topic]["subscriptions"].pop(sub_id, None)

    def unsubscribe(self, topic):
        self.client.unsubscribe(topic)
        del self.active_subscriptions[topic]

3.2.3. Command Listener

modules/subscriber/subscriber/command_listener.py
class CommandListener(threading.Thread):
    def __init__(self, subscriber, channel):
        self.subscriber = subscriber
        self.pubsub = get_redis_client().pubsub()

    def run(self):
        self.pubsub.subscribe(self.channel)
        while not self.stop_event.is_set():
            message = self.pubsub.get_message()
            if message:
                self._process_command(message)

    def _process_command(self, message):
        command = json.loads(message['data'])
        action = command['action']
        if action == 'subscribe':
            self.subscriber.subscribe(command['topic'], command['subscriptions'])
        elif action == 'add_subscription':
            self.subscriber.add_subscription(
                command['topic'], command['sub_id'],
                command['save_path'], command['filter']
            )
        elif action == 'remove_subscription':
            self.subscriber.remove_subscription(command['topic'], command['sub_id'])
        elif action == 'unsubscribe':
            self.subscriber.unsubscribe(command['topic'])
        elif action == 'update_subscription':
            self.subscriber.add_subscription(
                command['topic'], command['sub_id'],
                command['save_path'], command['filter']
            )

3.3. Subscription Manager Module

Flask REST API for managing subscriptions.

3.3.1. Flask Application

modules/subscription_manager/subscription_manager/app.py
@app.post('/subscriptions')
def add_subscription():
    data = get_json()
    topic = normalise_topic(data.get('topic'))
    save_path = normalise_path(data.get('target', ''))
    filter_config = data.get('filter') or data.get('filters') or {}

    sub_id = str(uuid4())
    sub_data = {'id': sub_id, 'topic': topic, 'save_path': save_path, 'filter': filter_config}

    existing = _subs_for_topic(topic, _get_all_subscriptions())
    is_new_topic = len(existing) == 0

    _persist_subscription(sub_id, sub_data)

    if is_new_topic:
        # First sub for this topic — open MQTT connection
        command = {"action": "subscribe", "topic": topic,
                   "subscriptions": {sub_id: sub_data}}
    else:
        # Topic already open — just register the new destination
        command = {"action": "add_subscription", "topic": topic,
                   "sub_id": sub_id, "save_path": save_path, "filter": filter_config}

    publish_command(command)
    return jsonify(sub_data), 201

3.3.2. Metrics Endpoint

Metrics are stored in Redis using atomic HINCRBYFLOAT operations, making them safe across multiple Celery worker containers. The endpoint reads from Redis and renders Prometheus text format:

@app.route('/metrics')
def expose_metrics():
    # Update live gauge before exposing
    queue_length = redis_client.llen(CELERY_DEFAULT_QUEUE)
    set_gauge('celery_queue_length', {'queue_name': CELERY_DEFAULT_QUEUE}, queue_length)
    return Response(generate_prometheus_text(), mimetype="text/plain")

Increment a counter from any worker:

from shared import incr_counter
incr_counter('downloads_total', {'cache': hostname, 'media_type': mime_type})

3.4. Task Manager Module

Celery workers for downloading and processing files.

3.4.1. Celery Configuration

modules/task_manager/task_manager/worker.py
app = Celery('tasks',
             broker=CELERY_BROKER_URL,
             result_backend=CELERY_RESULT_BACKEND)

# Auto-discover tasks
app.autodiscover_tasks(['task_manager.tasks', 'task_manager.tasks.wis2'])

3.4.2. Download Task

modules/task_manager/task_manager/tasks/wis2.py
@app.task(bind=True)
@metrics_collector
def download_from_wis2(self, job):
    result = {...}  # Initialize result dict

    # 1. Extract identifiers
    message_id = job['payload']['id']
    data_id = job['payload']['properties']['data_id']
    filehash = job['payload']['properties']['integrity']['value']

    # 2. Deduplication check
    for key, type in [(message_id, 'by-msg-id'), ...]:
        if get_status(key, type) == STATUS_SUCCESS:
            result['status'] = STATUS_SKIPPED
            return result

    # 3. Acquire lock
    lock_acquired = redis_client.set(lock_key, 1, nx=True, ex=LOCK_EXPIRE)
    if not lock_acquired:
        raise self.retry(countdown=10, max_retries=10)

    # 4. Download file
    response = _pool.request('GET', download_url, ...)

    # 5. Pre-download filter pass (no media_type/size yet)
    pre_action, pre_reason = _apply_job_filter(job, ctx_pre)
    if pre_action == 'reject':
        result['status'] = STATUS_SKIPPED
        return result

    # 6. Download file
    response = _pool.request('GET', download_url, ...)

    # 7. Verify hash
    if hash_expected and hash_base64 != hash_expected:
        result['status'] = STATUS_FAILED
        return result

    # 8. Post-download filter pass (media_type and size now known)
    post_action, post_reason = _apply_job_filter(job, ctx_post)
    if post_action == 'reject':
        result['status'] = STATUS_SKIPPED
        return result

    # 9. Save file
    output_path.write_bytes(data)
    result['status'] = STATUS_SUCCESS
    return result

3.4.3. Workflow Chains

modules/task_manager/task_manager/workflows/init.py
def wis2_download(args):
    workflow = download_from_wis2.s(args)
    return workflow

3.4.4. Scheduler (Periodic Tasks)

A separate Celery app (scheduler.py) handles housekeeping using Celery Beat. It uses Redis databases 2/3 to avoid interfering with the download worker queues (databases 0/1). Periodic tasks are defined in tasks/scheduled_tasks.py:

Task Interval Purpose

check_disk_space

Every 5 min

Sets disk_total_bytes, disk_used_bytes, disk_free_bytes gauges

clean_directory

Every 10 min

Deletes files older than DOWNLOAD_RETENTION_PERIOD days; decrements disk_downloads_bytes per deleted file

recalibrate_downloads_size

Daily

Full os.walk to correct any drift in the disk_downloads_bytes gauge

The scheduler is started as two Docker Compose services: celery-scheduler-workers (runs tasks) and celery-beats (triggers them on schedule).

4. UI Module

The UI is a NiceGUI web application served at port 8080. It is the primary user-facing interface for discovering datasets, browsing the WIS2 topic hierarchy, and creating subscriptions.

4.1. Module Structure

modules/ui/
├── main.py                  # Application entry point, page route, AppState
├── layout.py                # PageLayout builder
├── config.py                # Environment variable config (SUBSCRIPTION_MANAGER_URL etc.)
├── data.py                  # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│   └── base.css             # All custom CSS (inside @layer components)
├── components/
│   ├── header.py            # WMO banner + toolbar + language selector
│   ├── footer.py            # Footer bar
│   ├── navigation_drawer.py # Left nav drawer with view links
│   └── page_body.py         # Main content area + right sidebar
├── i18n/                    # Internationalisation
│   ├── __init__.py          # t(), current_lang(), is_rtl(), LANGUAGES
│   ├── en.py                # English strings (source of truth)
│   ├── fr.py                # French
│   ├── es.py                # Spanish
│   ├── ar.py                # Arabic (RTL)
│   ├── zh.py                # Chinese
│   └── ru.py                # Russian
├── models/
│   └── wcmp2.py             # WCMP2Record dataclass (GeoJSON Feature)
└── views/
    ├── dashboard.py         # Grafana iframe embed
    ├── catalogue.py         # Catalogue search + result cards
    ├── tree.py              # Topic hierarchy tree
    ├── subscriptions.py     # Active subscriptions list
    ├── settings.py          # GDC status + refresh trigger
    └── shared.py            # Shared sidebar logic (on_topics_picked, confirm_subscribe, show_metadata)

4.2. Data Layer (data.py)

The data layer is responsible for fetching WCMP2 records from the three WIS2 Global Discovery Catalogues (GDC), merging them, and building two module-level caches that all views read from.

4.2.1. GDC Sources

Records are fetched from three public GDC endpoints at startup:

Short name URL

CMA

https://gdc.wis.cma.cn

DWD

https://wis2.dwd.de/gdc

ECCC

https://wis2-gdc.weather.gc.ca

Raw JSON responses are cached in Redis under keys gdc:cache:CMA, gdc:cache:DWD, and gdc:cache:ECCC with a TTL controlled by GDC_CACHE_TTL_SECONDS (default 6 hours). Redis is optional — if unavailable, data is fetched over HTTP on every startup.

4.2.2. Merged Records

After all GDC sources are loaded, _build_merged_records() deduplicates records by ID across catalogues:

  • Records present in only one catalogue appear once with a single source_gdcs entry.

  • Records present in multiple catalogues are merged: source_gdcs lists all contributing catalogues.

  • has_discrepancy is set to True if properties, geometry, or links differ between catalogues.

  • Links are unioned across catalogues so that channel data present in any one GDC is preserved on the merged record.

The result is stored in the module-level _merged_records list and returned by merged_records().

4.2.3. Topic Hierarchy

_build_topic_hierarchy() iterates _merged_records and inserts each record’s first cache/ MQTT channel into a nested dict:

{
  "cache": {
    "children": {
      "a": {
        "children": {
          "wis2": {
            "children": {
              "de-dwd": {
                "children": {
                  ...
                  "synop": {
                    "datasets": [<WCMP2Record>, ...]
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

A node may contain both "children" and "datasets" if one channel is a prefix of another.

The hierarchy is stored in _topic_hierarchy and returned by topic_hierarchy(). It is consumed by tree.py to build the ui.tree widget, and by get_datasets_for_channel() to resolve datasets for a given topic.

4.2.4. Key Functions

Function Description

merged_records() → list[MergedRecord]

Returns the cached merged record list

topic_hierarchy() → dict

Returns the cached topic hierarchy

get_datasets_for_channel(channel) → list[WCMP2Record]

Strips trailing /#, navigates the hierarchy, and recursively collects all datasets from that node and its descendants

scrape_all(force=False)

Fetches GDC data (Redis cache first, then HTTP), rebuilds _merged_records and _topic_hierarchy

4.3. WCMP2 Model (models/wcmp2.py)

WCMP2Record is a dataclass representation of a WIS2 Discovery Metadata GeoJSON Feature.

from models.wcmp2 import WCMP2Record

rec = WCMP2Record.from_dict(feature_dict)

rec.id                   # str - unique dataset identifier
rec.title                # str | None
rec.description          # str | None
rec.keywords             # list[str]
rec.wmo_data_policy      # 'core' | 'recommended' | None
rec.geometry             # Geometry | None  (.type, .coordinates)
rec.links                # list[Link]  (.channel, .href, .rel, .extra)
rec.mqtt_channels        # list[str]  - channels from all links

Link.extra is a dict capturing non-schema keys from the GDC response (e.g. GDC-specific filters used for custom filter fields in the subscription sidebar).

4.4. AppState

Each browser session has its own AppState instance (defined in main.py):

class AppState:
    def __init__(self):
        self.selected_topics: list[str] = []
        self.current_view: str = 'dashboard'

selected_topics holds the MQTT topics currently selected in the UI and drives the right sidebar. current_view tracks the active view name and is persisted to app.storage.user['current_view'] before any page reload (e.g. on language change) so the user is returned to the same view.

4.5. Views

All views follow the same convention: a render(container, …​) function that builds the NiceGUI elements inside the provided container element. render is always a plain (non-async) function so it can be called synchronously from show_view() in main.py.

4.5.1. Catalogue View

views/catalogue.py provides full-text search across merged_records() with pure filter functions:

Function Description

filter_feature(record, query)

Matches query against ID, title, description, keywords, and theme concepts

filter_by_data_policy(record, policy)

Matches 'core', 'recommended', or 'all'

filter_by_keywords(record, keywords)

Comma-separated keywords; all must appear in the record

filter_by_bbox(record, bbox)

Shapely geometry intersection with the given bounding box

Result cards are rendered by update_search_results(), which is also responsible for pagination.

4.5.2. Tree View

views/tree.py converts topic_hierarchy() into ui.tree nodes via _to_tree_nodes() (recursive, alphabetically sorted). Selection uses on_select to enforce single-node selection — on_tick is not used because it conflicts with NiceGUI’s internal state synchronisation.

4.5.3. Shared Sidebar (views/shared.py)

on_topics_picked(e, state, layout, is_page_selection, dataset_id) is the central handler for both catalogue and tree selection. It:

  1. Updates state.selected_topics

  2. Builds the right sidebar with save-directory input, dataset/media-type/bbox/date filters, and optional custom filters

  3. Wires the Subscribe button to confirm_subscribe()

confirm_subscribe() shows a dialog with the full JSON payload before calling subscribe_to_topics().

show_metadata(dataset_id) looks up the record from merged_records() and renders a details dialog with an interactive Leaflet map.

4.6. Adding a New View

  1. Create modules/ui/views/myview.py with a render(container) function, using t() for all user-visible strings

  2. Add a navigation entry in components/navigation_drawer.py — the NAV_ITEMS list takes a (view_id, label_key, icon) tuple where label_key is a nav.* i18n key

  3. Add a branch in the show_view() dispatcher in main.py

  4. Add any new CSS classes to assets/base.css inside the @layer components { …​ } block

  5. Add all new string keys to i18n/en.py and to each other language file — see Internationalisation (i18n)

4.7. Internationalisation (i18n)

The UI supports multiple interface languages via a lightweight dictionary-based i18n system in modules/ui/i18n/.

4.7.1. Supported Languages

Code Language Direction

en

English

LTR

fr

Français

LTR

es

Español

LTR

ar

العربية

RTL

zh

中文

LTR

ru

Русский

LTR

The active language is stored per browser session in app.storage.user['lang'] and defaults to English. Users change language via the selector in the header toolbar; the page reloads to apply the change.

4.7.2. How t() Works

from i18n import t

# Simple lookup
label = t('nav.dashboard')                      # → 'Dashboard'

# With interpolation (uses str.format)
label = t('subscriptions.folder', path='./')    # → 'Folder: ./'

The lookup chain is:

  1. Chosen language file

  2. English (en.py) — fallback if key is missing in the chosen language

  3. The key string itself — fallback if missing from English too (visible in development)

t() reads app.storage.user['lang'] from the current NiceGUI request session. Always call it at render time (inside a @ui.page handler or a NiceGUI event callback), never at module import time.
Strings that contain literal { or } characters (for example, JSON examples in hint text) must use single braces as-is. Do not use {{ / }} escaping — t() only calls .format(**kwargs) when keyword arguments are passed, so {{ would be displayed literally.

4.7.3. Key Naming Conventions

Keys use dot-separated namespaces:

Prefix Scope Example

nav.*

Navigation drawer labels

nav.dashboard

btn.*

Button labels

btn.subscribe

sidebar.*

Subscription sidebar

sidebar.save_directory

catalogue.*

Catalogue view

catalogue.search_label

tree.*

Tree view

tree.filter_label

subscriptions.*

Manage Subscriptions view

subscriptions.folder

settings.*

Settings view

settings.title

manual.*

Manual Subscribe view

manual.topic_label

manual.val.*

Manual Subscribe validation messages

manual.val.topic_required

dialog.*

Dialog titles

dialog.confirm_title

metadata.*

Metadata dialog

metadata.title

validation.*

Shared validation messages

validation.date_format

footer.*

Footer

footer.copyright

aria.*

ARIA accessibility labels

aria.toggle_nav

4.7.4. RTL Support

Arabic (ar) is right-to-left. When Arabic is selected, is_rtl() returns True and main.py’s `on_connect() handler injects dir="rtl" on the <html> element via JavaScript, activating Quasar’s RTL layout mode. Custom CSS overrides in assets/base.css (under the RTL overrides comment block) correct Quasar’s physical padding on .q-page-container and reposition the subscription sidebar.

To add another RTL language, add its code to RTL_LANGUAGES in init.py — no other changes are needed.

4.7.5. Adding a New Language

Step 1 — Create the language file

Copy modules/ui/i18n/en.py to modules/ui/i18n/{code}.py and translate all values. Keep every key; do not remove any. Preserve {placeholder} names exactly as they appear in the English source:

# modules/ui/i18n/de.py
"""German strings."""

STRINGS: dict[str, str] = {
    'nav.dashboard':        'Dashboard',
    'nav.catalogue':        'Katalogsuche',
    'nav.tree':             'Baumsuche',
    'nav.manual':           'Manuell abonnieren',
    'nav.manage':           'Abonnements verwalten',
    'nav.settings':         'Einstellungen',
    # ... all remaining keys ...
    'subscriptions.folder': 'Ordner: {path}',   # {path} must be preserved
    'settings.records':     '{name}: {count} Einträge',
}
Step 2 — Register the language in init.py
from . import ar, de, en, es, fr, ru, zh        (1)

LANGUAGES: dict[str, str] = {
    'en': 'English',
    'fr': 'Français',
    'es': 'Español',
    'ar': 'العربية',
    'zh': '中文',
    'ru': 'Русский',
    'de': 'Deutsch',                             (2)
}

_STRINGS: dict[str, dict[str, str]] = {
    'en': en.STRINGS,
    'fr': fr.STRINGS,
    'es': es.STRINGS,
    'ar': ar.STRINGS,
    'zh': zh.STRINGS,
    'ru': ru.STRINGS,
    'de': de.STRINGS,                            (3)
}
1 Add the import
2 Add the display name (in the native script)
3 Add the strings mapping
Step 3 — Mark as RTL if required

If the language is right-to-left (e.g. Hebrew he, Persian fa):

RTL_LANGUAGES: frozenset[str] = frozenset({'ar', 'he'})
Step 4 — Restart the UI
docker-compose restart wis2downloader-ui

The new language will appear immediately in the header language selector.

The existing non-English translations are machine-generated and provided as a starting point only. All strings should be reviewed by a native speaker before production deployment, with particular attention to WMO/meteorological domain terms (WIS2, BUFR, GRIB, Global Cache, etc.) which have established translations in WMO official documents.

4.7.6. Adding New Translatable Strings

When adding UI text to any view or component:

Step 1 — Add to en.py (source of truth)
# modules/ui/i18n/en.py
'myview.title':       'My New View',
'myview.description': 'Showing results for {topic}.',
Step 2 — Add to all other language files

Copy the English value as a placeholder if a translation is not yet available. t() falls back to English automatically, but having the key present avoids gaps in tooling:

# fr.py / es.py / ar.py / zh.py / ru.py
'myview.title':       'My New View',         # TODO: translate
'myview.description': 'Showing results for {topic}.',
Step 3 — Use t() in the view
from i18n import t

def render(container):
    with container:
        ui.label(t('myview.title')).classes('page-title')
        ui.label(t('myview.description', topic=selected_topic))

5. Inter-Service Communication

5.1. Redis PubSub (Commands)

Subscription Manager → Subscriber communication uses Redis PubSub on channel subscription_commands:

// First subscription for a topic — opens MQTT connection
{"action": "subscribe", "topic": "cache/a/wis2/+/data/#",
 "subscriptions": {"<uuid>": {"id": "<uuid>", "save_path": "all-data", "filter": {}}}}

// Additional subscription for an already-open topic
{"action": "add_subscription", "topic": "cache/a/wis2/+/data/#",
 "sub_id": "<uuid>", "save_path": "grib-data", "filter": {"rules": [...]}}

// Remove one subscription (MQTT stays open)
{"action": "remove_subscription", "topic": "cache/a/wis2/+/data/#", "sub_id": "<uuid>"}

// Last subscription removed — closes MQTT connection
{"action": "unsubscribe", "topic": "cache/a/wis2/+/data/#"}

// Update save_path or filter for an existing subscription
{"action": "update_subscription", "topic": "cache/a/wis2/+/data/#",
 "sub_id": "<uuid>", "save_path": "new-path", "filter": {}}

5.2. Celery Tasks (Downloads)

Subscriber → Celery Worker communication uses Celery task queue:

job = {
    "topic": "cache/a/wis2/de-dwd/data/...",
    "target": "dwd-data",
    "filter": {"name": "...", "rules": [...]},   # subscription-level filter
    "_broker": "globalbroker.meteo.fr",
    "_received": "2026-01-28 10:15:30",
    "_queued": "2026-01-28 10:15:30",
    "payload": {
        "id": "...",
        "properties": {
            "data_id": "...",
            "metadata_id": "...",
            "integrity": {"method": "sha512", "value": "..."}
        },
        "links": [
            {"rel": "canonical", "href": "https://..."}
        ]
    }
}

5.3. Redis Keys

Key Pattern Purpose

global:subscriptions

Hash of all subscriptions (sub_id → JSON {id, topic, save_path, filter})

wis2:notification:status:{type}:{id}

Deduplication tracking (types: by-msg-id, by-data-id, by-hash)

wis2:notification:data:lock:{id}

Distributed lock preventing concurrent duplicate downloads

gdc:cache:{name}

Cached GDC catalogue JSON (CMA, DWD, ECCC); TTL set by GDC_CACHE_TTL_SECONDS

wis2:metrics:{metric_name}

Prometheus metrics hash (field = JSON label dict, value = float counter/gauge)

subscriber:health:{id}

Subscriber health heartbeat

celery

Celery task queue (default)

6. Extending the System

6.1. Adding a New Match Condition to the Filter Engine

The filter engine lives in modules/shared/shared/filters.py. Each match condition is a key in the match object dispatched by _evaluate_match().

To add a new built-in condition (e.g. matching on a new metadata field station_id):

  1. Add the field to MatchContext in filters.py:

    @dataclass
    class MatchContext:
        ...
        station_id: str | None = None
  2. Populate it in _build_context() in wis2.py (pre- and/or post-download).

  3. Add a branch in _evaluate_match():

    if 'station_id' in match:
        return _match_string_field(ctx.station_id, match['station_id'])
  4. Document the new field in openapi.yml and in the filter reference in the user guide.

No changes are needed in app.py, command_listener.py, or subscriber.py — the filter object is passed through unchanged and evaluated at download time.

6.2. Adding a New Task

  1. Create task in modules/task_manager/task_manager/tasks/

  2. Register in worker.py autodiscover

  3. Add to workflow if needed

# modules/task_manager/task_manager/tasks/custom.py
from task_manager.worker import app

@app.task
def my_custom_task(data):
    # Process data
    return result

6.3. Adding a New Metric

Metrics are stored in Redis via shared.redis_metrics. To add a new metric:

  1. Register it in the METRICS dict in modules/shared/shared/redis_metrics.py:

    # short name → (prometheus type, help text)
    METRICS: dict[str, tuple[str, str]] = {
        ...
        'my_counter_total': ('counter', 'Description of the counter.'),
        'my_gauge':         ('gauge',   'Description of the gauge.'),
    }
  2. Increment it from any service (counter) or set it (gauge):

    from shared import incr_counter, set_gauge
    
    # Counter (e.g. in a Celery task)
    incr_counter('my_counter_total', {'label1': 'value', 'label2': 'value'})
    
    # Gauge (e.g. in a scheduled task)
    set_gauge('my_gauge', {'label1': 'value'}, 42.0)

Only metrics registered in METRICS appear in the /metrics output.

7. Testing

7.1. Running Tests

# Install test dependencies
pip install pytest pytest-cov

# Run tests
pytest modules/

# With coverage
pytest --cov=modules modules/

7.2. Manual Testing

# Start services
docker-compose up -d

# Create test subscription
curl -X POST http://localhost:5002/subscriptions \
  -H "Content-Type: application/json" \
  -d '{"topic": "cache/a/wis2/de-dwd/data/#", "target": "test"}'

# Watch logs
docker-compose logs -f subscriber celery

# Check downloads
ls -la downloads/test/

8. Development Setup

8.1. Local Development

# Create virtual environment
python -m venv venv
source venv/bin/activate

# Install modules in editable mode
pip install -e modules/shared
pip install -e modules/task_manager
pip install -e modules/subscriber
pip install -e modules/subscription_manager

# Start Redis (single instance for dev)
docker run -d -p 6379:6379 redis:7.2-alpine redis-server --requirepass devpassword

# Set environment
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=devpassword
export FLASK_SECRET_KEY=dev-secret-key
export LOG_LEVEL=DEBUG

# Run subscription manager
python -m subscription_manager.app

# Run subscriber (in another terminal)
export GLOBAL_BROKER_HOST=globalbroker.meteo.fr
python -m subscriber.manager

# Run celery worker (in another terminal)
celery -A task_manager.worker worker --loglevel=DEBUG

8.2. Docker Build

# Build all images
docker-compose build

# Build specific service
docker-compose build celery

# Rebuild without cache
docker-compose build --no-cache

9. Debugging

9.1. View Celery Tasks

# Inspect active tasks
docker exec -it wis2downloader-celery-1 \
  celery -A task_manager.worker inspect active

# Inspect reserved tasks
docker exec -it wis2downloader-celery-1 \
  celery -A task_manager.worker inspect reserved

9.2. Redis Inspection

# Connect to Redis (use password from environment)
docker exec -it redis redis-cli -a $REDIS_PASSWORD

# List subscriptions
HGETALL global:subscriptions

# Check queue length
LLEN celery

# View deduplication keys
KEYS wis2:notifications:*

# View GDC catalogue cache (populated by the UI on startup)
KEYS gdc:cache:*
TTL gdc:cache:CMA
All Redis commands require authentication. The -a $REDIS_PASSWORD flag passes the password from your environment.

9.3. MQTT Debugging

# Subscribe to topic manually (mosquitto-clients)
mosquitto_sub -h globalbroker.meteo.fr -p 443 \
  -t 'cache/a/wis2/de-dwd/data/#' \
  -u everyone -P everyone \
  --capath /etc/ssl/certs