本文档由 Claude AI 从英文自动翻译。 在生产环境使用前,WMO/气象领域术语应由母语使用者进行审核。 请参阅 英文原版 获取权威版本。

1. 架构概述

WIS2 Downloader 是一个由四个主要组件构成的分布式系统:

                         Docker Compose
  ┌──────────────────────────────────────────────────────────┐
  │                                                           │
  │  Web UI (NiceGUI :8080)                                   │
  │    - reads GDC catalogue data from Redis cache            │
  │    - posts subscriptions to Subscription Manager API      │
  │                          │                                │
  │                          ▼                                │
  │  Subscription Manager (Flask API :5001)                   │
  │    - persists subscriptions in Redis                      │
  │    - publishes subscribe/unsubscribe commands via PubSub  │
  │                          │                                │
  │              Redis PubSub (commands)                      │
  │                          │                                │
  │                          ▼                                │
  │  Subscriber (MQTT Client)                                 │
  │    - connects to WIS2 Global Broker                       │
  │    - on notification: queues Celery download task         │
  │                          │                                │
  │              Redis (Celery queue + dedup state)           │
  │                          │                                │
  │                          ▼                                │
  │  Celery Workers                                           │
  │    - download file, verify hash, apply filters, save      │
  │                                                           │
  └──────────────────────────────────────────────────────────┘

1.1. 数据流

  1. 启动时,Web 界面从三个 GDC 端点(或 Redis 缓存)获取 WCMP2 记录,并在内存中构建主题层次结构

  2. 用户在 Web 界面浏览目录或树状视图并选择主题

  3. 用户点击 订阅 → Web 界面向 Subscription Manager REST API 发送 POST 请求

  4. Subscription Manager 将订阅持久化到 Redis,并向 Redis PubSub 频道发布订阅命令

  5. Subscriber 接收命令并在 WIS2 Global Broker 上订阅 MQTT 主题

  6. 当 WIS2 通知到达时,Subscriber 将 Celery 下载任务加入队列

  7. Celery 工作器下载文件、验证哈希、应用过滤器并保存到磁盘

2. 模块结构

modules/
├── shared/                 # Shared utilities
│   └── shared/
│       ├── __init__.py
│       ├── logging.py      # Centralized logging
│       └── redis_client.py # Redis client singleton
│
├── subscriber/             # MQTT subscriber service
│   └── subscriber/
│       ├── __init__.py
│       ├── manager.py      # Entry point, thread management
│       ├── subscriber.py   # MQTT client wrapper
│       └── command_listener.py  # Redis PubSub listener
│
├── subscription_manager/   # REST API service
│   └── subscription_manager/
│       ├── __init__.py
│       ├── app.py          # Flask application
│       └── static/
│           └── openapi.yml # API specification
│
├── task_manager/           # Celery tasks
│   └── task_manager/
│       ├── __init__.py
│       ├── worker.py       # Celery app configuration
│       ├── tasks/
│       │   └── wis2.py     # Download tasks
│       └── workflows/
│           └── __init__.py # Task chains
│
└── ui/                     # NiceGUI web interface (port 8080)
    ├── main.py             # Entry point, page route, AppState
    ├── layout.py           # PageLayout builder
    ├── config.py           # Environment variable config
    ├── data.py             # GDC data layer — fetch, merge, cache, hierarchy
    ├── assets/
    │   └── base.css        # All custom CSS
    ├── components/         # Reusable layout components
    ├── i18n/               # Internationalisation
    │   ├── __init__.py     # t(), current_lang(), is_rtl(), LANGUAGES
    │   ├── en.py           # English strings (source of truth)
    │   ├── fr.py           # French
    │   ├── es.py           # Spanish
    │   ├── ar.py           # Arabic (RTL)
    │   ├── zh.py           # Chinese
    │   └── ru.py           # Russian
    ├── models/
    │   └── wcmp2.py        # WCMP2Record dataclass
    └── views/              # Page views (catalogue, tree, subscriptions, etc.)

3. 模块详情

3.1. 共享模块

提供所有服务使用的公共实用工具。

3.1.1. Redis 客户端

modules/shared/shared/redis_client.py
from shared import get_redis_client

# 获取 Redis 客户端(单例模式)
redis = get_redis_client()

# 像普通 Redis 客户端一样使用
redis.set('key', 'value')
redis.get('key')

客户端特性:

  • 直接连接到 Redis 服务器

  • 缓存连接(单例模式)

  • 包含针对瞬时故障的重试逻辑

3.1.2. 日志记录

modules/shared/shared/logging.py
from shared import setup_logging

# 配置根日志记录器(在启动时调用一次)
setup_logging()

# 获取模块专用日志记录器
LOGGER = setup_logging(__name__)

LOGGER.info("Message with UTC timestamp")

功能特性:

  • ISO 8601 格式的 UTC 时间戳

  • 通过 LOG_LEVEL 环境变量配置

  • 输出到 stdout 供 Docker 日志收集

3.2. Subscriber 模块

通过 MQTT 连接到 WIS2 Global Broker 并处理传入通知。

3.2.1. 入口点

modules/subscriber/subscriber/manager.py
def run_manager():
    # 1. Create MQTT subscriber
    mqtt_subscriber = Subscriber(**broker_config)

    # 2. Create Redis command listener
    redis_listener = CommandListener(
        subscriber=mqtt_subscriber,
        channel=COMMAND_CHANNEL
    )

    # 3. Start MQTT in separate thread
    mqtt_thread = threading.Thread(target=mqtt_subscriber.start, daemon=True)
    mqtt_thread.start()

    # 4. Start Redis listener (blocks)
    redis_listener.start()

3.2.2. MQTT 订阅者

modules/subscriber/subscriber/subscriber.py
class Subscriber:
    def __init__(self, host, port, uid, pwd, protocol, session):
        # Configure MQTT client with callbacks
        self.client = mqtt.Client(...)
        self.client.on_message = self._on_message

    def _on_message(self, client, userdata, msg):
        # Parse notification, create Celery task
        job = {
            "topic": msg.topic,
            "target": target,
            "filters": filters,
            "payload": json.loads(msg.payload)
        }
        workflow = wis2_download(job)
        workflow.apply_async()

    def subscribe(self, topic, target, filters):
        self.client.subscribe(topic, qos=0)
        self.active_subscriptions[topic] = {...}

    def unsubscribe(self, topic):
        self.client.unsubscribe(topic)
        del self.active_subscriptions[topic]

3.2.3. 命令监听器

modules/subscriber/subscriber/command_listener.py
class CommandListener(threading.Thread):
    def __init__(self, subscriber, channel):
        self.subscriber = subscriber
        self.pubsub = get_redis_client().pubsub()

    def run(self):
        self.pubsub.subscribe(self.channel)
        while not self.stop_event.is_set():
            message = self.pubsub.get_message()
            if message:
                self._process_command(message)

    def _process_command(self, message):
        command = json.loads(message['data'])
        if command['action'] == 'subscribe':
            self.subscriber.subscribe(
                command['topic'],
                command['save_path'],
                command['filters']
            )
        elif command['action'] == 'unsubscribe':
            self.subscriber.unsubscribe(command['topic'])

3.3. Subscription Manager 模块

用于管理订阅的 Flask REST API。

3.3.1. Flask 应用程序

modules/subscription_manager/subscription_manager/app.py
@app.post('/subscriptions')
def add_subscription():
    data = get_json()
    topic = normalise_topic(data.get('topic'))
    target = normalise_path(data.get('target', ''))
    filters = data.get('filters', {})

    # Publish command to subscriber via Redis PubSub
    command = {
        "action": "subscribe",
        "topic": topic,
        "save_path": target,
        "filters": filters
    }
    publish_command(command, COMMAND_CHANNEL)

    # Persist to Redis for durability
    persist_subscription(topic, target, filters)

    return jsonify({"status": "accepted", ...}), 202

3.3.2. 指标端点

指标使用原子 HINCRBYFLOAT 操作存储在 Redis 中,可在多个 Celery worker 容器之间安全使用。端点从 Redis 读取数据并以 Prometheus 文本格式呈现:

@app.route('/metrics')
def expose_metrics():
    from shared.redis_metrics import generate_prometheus_text, set_gauge
    set_gauge('celery_queue_length', {}, get_queue_length())
    return Response(generate_prometheus_text(), mimetype='text/plain')

从任意 worker 增加计数器:

from shared import incr_counter
incr_counter('downloads_total', {'cache': hostname, 'media_type': mime_type})

3.4. Task Manager 模块

用于下载和处理文件的 Celery 工作器。

3.4.1. Celery 配置

modules/task_manager/task_manager/worker.py
app = Celery('tasks',
             broker=CELERY_BROKER_URL,
             result_backend=CELERY_RESULT_BACKEND)

# 自动发现任务
app.autodiscover_tasks(['task_manager.tasks', 'task_manager.tasks.wis2'])

3.4.2. 下载任务

modules/task_manager/task_manager/tasks/wis2.py
@app.task(bind=True)
@metrics_collector
def download_from_wis2(self, job):
    result = {...}  # Initialize result dict

    # 1. Extract identifiers
    message_id = job['payload']['id']
    data_id = job['payload']['properties']['data_id']
    filehash = job['payload']['properties']['integrity']['value']

    # 2. Deduplication check
    for key, type in [(message_id, 'by-msg-id'), ...]:
        if get_status(key, type) == STATUS_SUCCESS:
            result['status'] = STATUS_SKIPPED
            return result

    # 3. Acquire lock
    lock_acquired = redis_client.set(lock_key, 1, nx=True, ex=LOCK_EXPIRE)
    if not lock_acquired:
        raise self.retry(countdown=10, max_retries=10)

    # 4. Download file
    response = _pool.request('GET', download_url, ...)

    # 5. Verify hash
    if hash_expected and hash_base64 != hash_expected:
        result['status'] = STATUS_FAILED
        return result

    # 6. Check media type filter
    if not _is_allowed_media_type(file_type, filters):
        result['status'] = STATUS_SKIPPED
        return result

    # 7. Save file
    output_path.write_bytes(data)
    result['status'] = STATUS_SUCCESS
    return result

3.4.3. 工作流链

modules/task_manager/task_manager/workflows/init.py
def wis2_download(args):
    # Chain: download -> decode/ingest
    workflow = download_from_wis2.s(args) | decode_and_ingest.s()
    return workflow

3.4.4. 调度器(定期任务)

一个独立的 Celery 应用(scheduler.py)使用 Celery Beat 处理日常维护任务。它使用 Redis 数据库 2/3,以避免干扰下载 worker 队列(数据库 0/1)。定期任务在 tasks/scheduled_tasks.py 中定义:

任务 执行间隔 用途

check_disk_space

每 5 分钟

设置 disk_total_bytesdisk_used_bytesdisk_free_bytes 指标

clean_directory

每 10 分钟

删除超过 DOWNLOAD_RETENTION_PERIOD 天的文件;每删除一个文件则减少 disk_downloads_bytes

recalibrate_downloads_size

每天

完整的 os.walk 以纠正 disk_downloads_bytes 指标的任何偏差

调度器作为两个 Docker Compose 服务启动:celery-scheduler-workers(执行任务)和 celery-beats(按计划触发任务)。

4. 界面模块

界面是一个运行在 8080 端口的 NiceGUI Web 应用程序。它是用户发现数据集、浏览 WIS2 主题层次结构和创建订阅的主要界面。

4.1. 模块结构

modules/ui/
├── main.py                  # Application entry point, page route, AppState
├── layout.py                # PageLayout builder
├── config.py                # Environment variable config (SUBSCRIPTION_MANAGER_URL etc.)
├── data.py                  # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│   └── base.css             # All custom CSS (inside @layer components)
├── components/
│   ├── header.py            # WMO banner + toolbar + language selector
│   ├── footer.py            # Footer bar
│   ├── navigation_drawer.py # Left nav drawer with view links
│   └── page_body.py         # Main content area + right sidebar
├── i18n/                    # Internationalisation
│   ├── __init__.py          # t(), current_lang(), is_rtl(), LANGUAGES
│   ├── en.py                # English strings (source of truth)
│   ├── fr.py                # French
│   ├── es.py                # Spanish
│   ├── ar.py                # Arabic (RTL)
│   ├── zh.py                # Chinese
│   └── ru.py                # Russian
├── models/
│   └── wcmp2.py             # WCMP2Record dataclass (GeoJSON Feature)
└── views/
    ├── dashboard.py         # Grafana iframe embed
    ├── catalogue.py         # Catalogue search + result cards
    ├── tree.py              # Topic hierarchy tree
    ├── subscriptions.py     # Active subscriptions list
    ├── settings.py          # GDC status + refresh trigger
    └── shared.py            # Shared sidebar logic (on_topics_picked, confirm_subscribe, show_metadata)

4.2. 数据层 (data.py)

数据层负责从 WIS2 的三个全球发现目录 (GDC) 获取 WCMP2 记录、合并记录,并构建所有视图读取的两个模块级缓存。

4.2.1. GDC 数据源

启动时从三个公开 GDC 端点获取记录:

简称 URL

CMA

https://gdc.wis.cma.cn

DWD

https://wis2.dwd.de/gdc

ECCC

https://wis2-gdc.weather.gc.ca

原始 JSON 响应以 gdc:cache:CMAgdc:cache:DWDgdc:cache:ECCC 为键缓存在 Redis 中,TTL 由 GDC_CACHE_TTL_SECONDS 控制(默认 6 小时)。Redis 是可选的——如果不可用,每次启动时通过 HTTP 获取数据。

4.2.2. 合并记录

加载所有 GDC 数据源后,_build_merged_records() 按 ID 对各目录间的记录进行去重:

  • 仅存在于一个目录中的记录只出现一次,带单个 source_gdcs 条目。

  • 存在于多个目录中的记录被合并:source_gdcs 列出所有贡献目录。

  • 如果各目录间的属性、几何形状或链接不同,has_discrepancy 被设为 True

  • 跨目录联合链接,确保任一 GDC 中存在的频道数据在合并记录中得以保留。

结果存储在模块级 _merged_records 列表中,由 merged_records() 返回。

4.2.3. 主题层次结构

_build_topic_hierarchy() 遍历 _merged_records,将每条记录的第一个 cache/ MQTT 频道插入嵌套字典:

{
  "cache": {
    "children": {
      "a": {
        "children": {
          "wis2": {
            "children": {
              "de-dwd": {
                "children": {
                  ...
                  "synop": {
                    "datasets": [<WCMP2Record>, ...]
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

如果一个频道是另一个的前缀,节点可以同时包含 "children""datasets"

层次结构存储在 _topic_hierarchy 中,由 topic_hierarchy() 返回。tree.py 使用它构建 ui.tree 组件,get_datasets_for_channel() 使用它解析给定主题的数据集。

4.2.4. 关键函数

函数 描述

merged_records() → list[MergedRecord]

返回缓存的合并记录列表

topic_hierarchy() → dict

返回缓存的主题层次结构

get_datasets_for_channel(channel) → list[WCMP2Record]

去除末尾的 /#,导航层次结构,递归收集该节点及其子节点的所有数据集

scrape_all(force=False)

获取 GDC 数据(优先 Redis 缓存,然后 HTTP),重建 _merged_records_topic_hierarchy

4.3. WCMP2 模型 (models/wcmp2.py)

WCMP2Record 是 WIS2 发现元数据 GeoJSON Feature 的 dataclass 表示。

from models.wcmp2 import WCMP2Record

rec = WCMP2Record.from_dict(feature_dict)

rec.id                   # str - unique dataset identifier
rec.title                # str | None
rec.description          # str | None
rec.keywords             # list[str]
rec.wmo_data_policy      # 'core' | 'recommended' | None
rec.geometry             # Geometry | None  (.type, .coordinates)
rec.links                # list[Link]  (.channel, .href, .rel, .extra)
rec.mqtt_channels        # list[str]  - channels from all links

Link.extra 是一个字典,捕获 GDC 响应中非模式键(例如 GDC 特定的 filters,用于订阅侧边栏的自定义过滤器字段)。

4.4. AppState

每个浏览器会话都有自己的 AppState 实例(在 main.py 中定义):

class AppState:
    def __init__(self):
        self.selected_topics: list[str] = []
        self.current_view: str = 'dashboard'

selected_topics 保存当前在界面中选中的 MQTT 主题,驱动右侧边栏。current_view 跟踪当前视图名称,并在任何页面重新加载(例如语言切换)之前持久化到 app.storage.user['current_view'],以便用户返回到相同的视图。

4.5. 视图

所有视图遵循相同约定:render(container, …​) 函数在提供的容器元素内构建 NiceGUI 元素。render 始终是普通(非异步)函数,以便可以从 main.py 中的 show_view() 同步调用。

4.5.1. 目录视图

views/catalogue.py 通过纯过滤函数对 merged_records() 提供全文搜索:

函数 描述

filter_feature(record, query)

将查询与 ID、标题、描述、关键词和主题概念进行匹配

filter_by_data_policy(record, policy)

匹配 'core''recommended''all'

filter_by_keywords(record, keywords)

逗号分隔的关键词;所有关键词都必须出现在记录中

filter_by_bbox(record, bbox)

Shapely 几何体与给定边界框的交集

结果卡片由 update_search_results() 渲染,该函数也负责分页。

4.5.2. 树状视图

views/tree.py 通过 _to_tree_nodes()(递归,按字母排序)将 topic_hierarchy() 转换为 ui.tree 节点。选择使用 on_select 强制单节点选择——不使用 on_tick,因为它与 NiceGUI 内部状态同步冲突。

4.5.3. 共享侧边栏 (views/shared.py)

on_topics_picked(e, state, layout, is_page_selection, dataset_id) 是目录和树状选择的中央处理器。它:

  1. 更新 state.selected_topics

  2. 构建右侧边栏,包含保存目录输入、数据集/媒体类型/边界框/日期过滤器,以及可选的自定义过滤器

  3. 订阅 按钮绑定到 confirm_subscribe()

confirm_subscribe() 在调用 subscribe_to_topics() 之前显示包含完整 JSON 载荷的对话框。

show_metadata(dataset_id)merged_records() 查找记录,并渲染带有交互式 Leaflet 地图的详情对话框。

4.6. 添加新视图

  1. 创建 modules/ui/views/myview.py,包含 render(container) 函数,对所有用户可见的字符串使用 t()

  2. components/navigation_drawer.py 中添加导航条目——NAV_ITEMS 列表接受 (view_id, label_key, icon) 元组,其中 label_keynav.* i18n 键

  3. main.pyshow_view() 分发器中添加分支

  4. 将任何新 CSS 类添加到 assets/base.css@layer components { …​ } 块中

  5. 将所有新字符串键添加到 i18n/en.py 和每个其他语言文件——参见 国际化 (i18n)

4.7. 国际化 (i18n)

界面通过 modules/ui/i18n/ 中基于字典的轻量级 i18n 系统支持多种界面语言。

4.7.1. 支持的语言

代码 语言 方向

en

English

LTR

fr

Français

LTR

es

Español

LTR

ar

العربية

RTL

zh

中文

LTR

ru

Русский

LTR

活跃语言按浏览器会话存储在 app.storage.user['lang'] 中,默认为英语。用户通过标题工具栏中的选择器更改语言;页面重新加载以应用更改。

4.7.2. t() 的工作原理

from i18n import t

# 简单查找
label = t('nav.dashboard')                      # → 'Dashboard'

# 带插值(使用 str.format)
label = t('subscriptions.folder', path='./')    # → 'Folder: ./'

查找链为:

  1. 所选语言文件

  2. 英语(en.py)——如果所选语言中缺少该键则作为回退

  3. 键字符串本身——如果英语中也缺少则作为回退(在开发中可见)

t() 从当前 NiceGUI 请求会话读取 app.storage.user['lang']。始终在渲染时调用它(在 @ui.page 处理器或 NiceGUI 事件回调内),而不是在模块导入时。
包含字面量 {} 字符的字符串(例如提示文本中的 JSON 示例)必须按原样使用单括号。不要*使用 {{ / }} 转义——t() 仅在传递关键字参数时调用 .format(*kwargs),因此 {{ 会按字面量显示。

4.7.3. 键命名规范

键使用点分隔的命名空间:

前缀 范围 示例

nav.*

导航抽屉标签

nav.dashboard

btn.*

按钮标签

btn.subscribe

sidebar.*

订阅侧边栏

sidebar.save_directory

catalogue.*

目录视图

catalogue.search_label

tree.*

树状视图

tree.filter_label

subscriptions.*

管理订阅视图

subscriptions.folder

settings.*

设置视图

settings.title

manual.*

手动订阅视图

manual.topic_label

manual.val.*

手动订阅验证消息

manual.val.topic_required

dialog.*

对话框标题

dialog.confirm_title

metadata.*

元数据对话框

metadata.title

validation.*

共享验证消息

validation.date_format

footer.*

页脚

footer.copyright

aria.*

ARIA 无障碍标签

aria.toggle_nav

4.7.4. 从右到左 (RTL) 支持

阿拉伯语(ar)是从右到左的语言。选择阿拉伯语时,is_rtl() 返回 Truemain.py 中的 on_connect() 处理器通过 JavaScript 在 <html> 元素上注入 dir="rtl",激活 Quasar 的 RTL 布局模式。assets/base.css 中的自定义 CSS 覆盖(在 RTL overrides 注释块下)修正了 Quasar 在 .q-page-container 上的物理内边距,并重新定位订阅侧边栏。

要添加另一种 RTL 语言,将其代码添加到 init.py 中的 RTL_LANGUAGES——无需其他更改。

4.7.5. 添加新语言

第 1 步——创建语言文件

modules/ui/i18n/en.py 复制到 modules/ui/i18n/{code}.py 并翻译所有值。保留每个键;不要删除任何键。完全按照英语源中出现的方式保留 {placeholder} 名称:

# modules/ui/i18n/de.py
"""German strings."""

STRINGS: dict[str, str] = {
    'nav.dashboard':        'Dashboard',
    'nav.catalogue':        'Katalogsuche',
    'nav.tree':             'Baumsuche',
    'nav.manual':           'Manuell abonnieren',
    'nav.manage':           'Abonnements verwalten',
    'nav.settings':         'Einstellungen',
    # ... all remaining keys ...
    'subscriptions.folder': 'Ordner: {path}',   # {path} must be preserved
    'settings.records':     '{name}: {count} Einträge',
}
第 2 步——在 init.py 中注册语言
from . import ar, de, en, es, fr, ru, zh        (1)

LANGUAGES: dict[str, str] = {
    'en': 'English',
    'fr': 'Français',
    'es': 'Español',
    'ar': 'العربية',
    'zh': '中文',
    'ru': 'Русский',
    'de': 'Deutsch',                             (2)
}

_STRINGS: dict[str, dict[str, str]] = {
    'en': en.STRINGS,
    'fr': fr.STRINGS,
    'es': es.STRINGS,
    'ar': ar.STRINGS,
    'zh': zh.STRINGS,
    'ru': ru.STRINGS,
    'de': de.STRINGS,                            (3)
}
1 添加导入
2 添加显示名称(使用原生文字)
3 添加字符串映射
第 3 步——如需要则标记为 RTL

如果语言是从右到左的(例如希伯来语 he、波斯语 fa):

RTL_LANGUAGES: frozenset[str] = frozenset({'ar', 'he'})
第 4 步——重启界面
docker-compose restart wis2downloader-ui

新语言将立即出现在标题语言选择器中。

现有的非英语翻译是机器生成的,仅作为起点提供。在生产部署前,所有字符串应由母语使用者审核,特别注意 WMO/气象领域术语(WIS2、BUFR、GRIB、Global Cache 等),这些术语在 WMO 官方文件中有既定翻译。

4.7.6. 添加新的可翻译字符串

向任何视图或组件添加界面文本时:

第 1 步——添加到 en.py(真实来源)
# modules/ui/i18n/en.py
'myview.title':       'My New View',
'myview.description': 'Showing results for {topic}.',
第 2 步——添加到所有其他语言文件

如果翻译尚不可用,复制英语值作为占位符。t() 自动回退到英语,但有键存在可避免工具中的空缺:

# fr.py / es.py / ar.py / zh.py / ru.py
'myview.title':       'My New View',         # TODO: translate
'myview.description': 'Showing results for {topic}.',
第 3 步——在视图中使用 t()
from i18n import t

def render(container):
    with container:
        ui.label(t('myview.title')).classes('page-title')
        ui.label(t('myview.description', topic=selected_topic))

5. 服务间通信

5.1. Redis PubSub(命令)

Subscription Manager 与 Subscriber 之间的通信使用 Redis PubSub,频道为 subscription_commands

// Channel: subscription_commands

// Subscribe command
{
  "action": "subscribe",
  "topic": "cache/a/wis2/+/data/#",
  "save_path": "all-data",
  "filters": {"accepted_media_types": ["application/bufr"]}
}

// Unsubscribe command
{
  "action": "unsubscribe",
  "topic": "cache/a/wis2/+/data/#"
}

5.2. Celery 任务(下载)

Subscriber 与 Celery 工作器之间的通信使用 Celery 任务队列:

job = {
    "topic": "cache/a/wis2/de-dwd/data/...",
    "target": "dwd-data",
    "filters": {"accepted_media_types": [...]},
    "_broker": "globalbroker.meteo.fr",
    "_received": "2026-01-28 10:15:30",
    "_queued": "2026-01-28 10:15:30",
    "payload": {
        "id": "...",
        "properties": {
            "data_id": "...",
            "metadata_id": "...",
            "integrity": {"method": "sha512", "value": "..."}
        },
        "links": [
            {"rel": "canonical", "href": "https://..."}
        ]
    }
}

5.3. Redis 键

键模式 用途

global:subscriptions

所有订阅的哈希表(sub_id → JSON {id, topic, save_path, filter}

wis2:notification:status:{type}:{id}

去重跟踪(类型:by-msg-idby-data-idby-hash

wis2:notification:data:lock:{id}

防止并发重复下载的分布式锁

gdc:cache:{name}

已缓存的 GDC 目录 JSON(CMA、DWD、ECCC);TTL 由 GDC_CACHE_TTL_SECONDS 设置

wis2:metrics:{metric_name}

Prometheus 指标哈希(字段 = JSON 标签字典,值 = 浮点计数器/指标)

subscriber:health:{id}

订阅者健康心跳

celery

Celery 任务队列(默认)

6. 扩展系统

6.1. 向过滤器引擎添加新的匹配条件

过滤器引擎位于 modules/shared/shared/filters.py。每个匹配条件是 _evaluate_match() 调度的匹配对象中的一个键。

要添加新的内置条件(例如匹配新的元数据字段 station_id):

  1. filters.pyMatchContext 中添加字段:

    @dataclass
    class MatchContext:
        ...
        station_id: str | None = None
  2. wis2.py_build_context() 中填充该字段(在预下载和/或后下载阶段)。

  3. _evaluate_match() 中添加分支:

    if 'station_id' in match:
        return _match_string_field(ctx.station_id, match['station_id'])
  4. openapi.yml 和用户指南的过滤器参考中记录新字段。

app.pycommand_listener.pysubscriber.py 无需更改——过滤器对象会原样传递,并在下载时进行评估。

6.2. 添加新任务

  1. modules/task_manager/task_manager/tasks/ 中创建任务

  2. worker.py 的自动发现中注册

  3. 如需要则添加到工作流

# modules/task_manager/task_manager/tasks/custom.py
from task_manager.worker import app

@app.task
def my_custom_task(data):
    # Process data
    return result

6.3. 添加新指标

指标通过 shared.redis_metrics 存储在 Redis 中。要添加新指标:

  1. modules/shared/shared/redis_metrics.pyMETRICS 字典中注册:

    # short name → (prometheus type, help text)
    METRICS: dict[str, tuple[str, str]] = {
        ...
        'my_counter_total': ('counter', 'Description of the counter.'),
        'my_gauge':         ('gauge',   'Description of the gauge.'),
    }
  2. 从任何服务中增加计数器或设置指标值:

    from shared import incr_counter, set_gauge
    
    # 计数器(例如在 Celery 任务中)
    incr_counter('my_counter_total', {'label1': 'value', 'label2': 'value'})
    
    # 仪表(例如在定时任务中)
    set_gauge('my_gauge', {'label1': 'value'}, 42.0)

只有在 METRICS 中注册的指标才会出现在 /metrics 输出中。

7. 测试

7.1. 运行测试

# 安装测试依赖项
pip install pytest pytest-cov

# 运行测试
pytest modules/

# 带覆盖率
pytest --cov=modules modules/

7.2. 手动测试

# 启动服务
docker-compose up -d

# 创建测试订阅
curl -X POST http://localhost:5002/subscriptions \
  -H "Content-Type: application/json" \
  -d '{"topic": "cache/a/wis2/de-dwd/data/#", "target": "test"}'

# 监控日志
docker-compose logs -f subscriber celery

# 检查下载
ls -la downloads/test/

8. 开发环境搭建

8.1. 本地开发

# 创建虚拟环境
python -m venv venv
source venv/bin/activate

# 以可编辑模式安装模块
pip install -e modules/shared
pip install -e modules/task_manager
pip install -e modules/subscriber
pip install -e modules/subscription_manager

# 启动 Redis(开发用单实例)
docker run -d -p 6379:6379 redis:7.2-alpine redis-server --requirepass devpassword

# 设置环境变量
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=devpassword
export FLASK_SECRET_KEY=dev-secret-key
export LOG_LEVEL=DEBUG

# 运行订阅管理器
python -m subscription_manager.app

# 运行订阅者(在另一个终端中)
export GLOBAL_BROKER_HOST=globalbroker.meteo.fr
python -m subscriber.manager

# 运行 Celery worker(在另一个终端中)
celery -A task_manager.worker worker --loglevel=DEBUG

8.2. Docker 构建

# 构建所有镜像
docker-compose build

# 构建特定服务
docker-compose build celery

# 不使用缓存重新构建
docker-compose build --no-cache

9. 调试

9.1. 查看 Celery 任务

# 检查活跃任务
docker exec -it wis2downloader-celery-1 \
  celery -A task_manager.worker inspect active

# 检查保留任务
docker exec -it wis2downloader-celery-1 \
  celery -A task_manager.worker inspect reserved

9.2. Redis 检查

# 连接到 Redis(使用环境变量中的密码)
docker exec -it redis redis-cli -a $REDIS_PASSWORD

# 列出订阅
HGETALL global:subscriptions

# 检查队列长度
LLEN celery

# 查看去重键
KEYS wis2:notifications:*

# 查看 GDC 目录缓存(UI 启动时填充)
KEYS gdc:cache:*
TTL gdc:cache:CMA
所有 Redis 命令都需要身份验证。-a $REDIS_PASSWORD 标志从您的环境中传递密码。

9.3. MQTT 调试

# 手动订阅主题(mosquitto-clients)
mosquitto_sub -h globalbroker.meteo.fr -p 443 \
  -t 'cache/a/wis2/de-dwd/data/#' \
  -u everyone -P everyone \
  --capath /etc/ssl/certs