|
本文档由 Claude AI 从英文自动翻译。 在生产环境使用前,WMO/气象领域术语应由母语使用者进行审核。 请参阅 英文原版 获取权威版本。 |
1. 架构概述
WIS2 Downloader 是一个由四个主要组件构成的分布式系统:
Docker Compose
┌──────────────────────────────────────────────────────────┐
│ │
│ Web UI (NiceGUI :8080) │
│ - reads GDC catalogue data from Redis cache │
│ - posts subscriptions to Subscription Manager API │
│ │ │
│ ▼ │
│ Subscription Manager (Flask API :5001) │
│ - persists subscriptions in Redis │
│ - publishes subscribe/unsubscribe commands via PubSub │
│ │ │
│ Redis PubSub (commands) │
│ │ │
│ ▼ │
│ Subscriber (MQTT Client) │
│ - connects to WIS2 Global Broker │
│ - on notification: queues Celery download task │
│ │ │
│ Redis (Celery queue + dedup state) │
│ │ │
│ ▼ │
│ Celery Workers │
│ - download file, verify hash, apply filters, save │
│ │
└──────────────────────────────────────────────────────────┘
1.1. 数据流
-
启动时,Web 界面从三个 GDC 端点(或 Redis 缓存)获取 WCMP2 记录,并在内存中构建主题层次结构
-
用户在 Web 界面浏览目录或树状视图并选择主题
-
用户点击 订阅 → Web 界面向 Subscription Manager REST API 发送 POST 请求
-
Subscription Manager 将订阅持久化到 Redis,并向 Redis PubSub 频道发布订阅命令
-
Subscriber 接收命令并在 WIS2 Global Broker 上订阅 MQTT 主题
-
当 WIS2 通知到达时,Subscriber 将 Celery 下载任务加入队列
-
Celery 工作器下载文件、验证哈希、应用过滤器并保存到磁盘
2. 模块结构
modules/
├── shared/ # Shared utilities
│ └── shared/
│ ├── __init__.py
│ ├── logging.py # Centralized logging
│ └── redis_client.py # Redis client singleton
│
├── subscriber/ # MQTT subscriber service
│ └── subscriber/
│ ├── __init__.py
│ ├── manager.py # Entry point, thread management
│ ├── subscriber.py # MQTT client wrapper
│ └── command_listener.py # Redis PubSub listener
│
├── subscription_manager/ # REST API service
│ └── subscription_manager/
│ ├── __init__.py
│ ├── app.py # Flask application
│ └── static/
│ └── openapi.yml # API specification
│
├── task_manager/ # Celery tasks
│ └── task_manager/
│ ├── __init__.py
│ ├── worker.py # Celery app configuration
│ ├── tasks/
│ │ └── wis2.py # Download tasks
│ └── workflows/
│ └── __init__.py # Task chains
│
└── ui/ # NiceGUI web interface (port 8080)
├── main.py # Entry point, page route, AppState
├── layout.py # PageLayout builder
├── config.py # Environment variable config
├── data.py # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│ └── base.css # All custom CSS
├── components/ # Reusable layout components
├── i18n/ # Internationalisation
│ ├── __init__.py # t(), current_lang(), is_rtl(), LANGUAGES
│ ├── en.py # English strings (source of truth)
│ ├── fr.py # French
│ ├── es.py # Spanish
│ ├── ar.py # Arabic (RTL)
│ ├── zh.py # Chinese
│ └── ru.py # Russian
├── models/
│ └── wcmp2.py # WCMP2Record dataclass
└── views/ # Page views (catalogue, tree, subscriptions, etc.)
3. 模块详情
3.1. 共享模块
提供所有服务使用的公共实用工具。
3.1.1. Redis 客户端
from shared import get_redis_client
# 获取 Redis 客户端(单例模式)
redis = get_redis_client()
# 像普通 Redis 客户端一样使用
redis.set('key', 'value')
redis.get('key')
客户端特性:
-
直接连接到 Redis 服务器
-
缓存连接(单例模式)
-
包含针对瞬时故障的重试逻辑
3.1.2. 日志记录
from shared import setup_logging
# 配置根日志记录器(在启动时调用一次)
setup_logging()
# 获取模块专用日志记录器
LOGGER = setup_logging(__name__)
LOGGER.info("Message with UTC timestamp")
功能特性:
-
ISO 8601 格式的 UTC 时间戳
-
通过
LOG_LEVEL环境变量配置 -
输出到 stdout 供 Docker 日志收集
3.2. Subscriber 模块
通过 MQTT 连接到 WIS2 Global Broker 并处理传入通知。
3.2.1. 入口点
def run_manager():
# 1. Create MQTT subscriber
mqtt_subscriber = Subscriber(**broker_config)
# 2. Create Redis command listener
redis_listener = CommandListener(
subscriber=mqtt_subscriber,
channel=COMMAND_CHANNEL
)
# 3. Start MQTT in separate thread
mqtt_thread = threading.Thread(target=mqtt_subscriber.start, daemon=True)
mqtt_thread.start()
# 4. Start Redis listener (blocks)
redis_listener.start()
3.2.2. MQTT 订阅者
class Subscriber:
def __init__(self, host, port, uid, pwd, protocol, session):
# Configure MQTT client with callbacks
self.client = mqtt.Client(...)
self.client.on_message = self._on_message
def _on_message(self, client, userdata, msg):
# Parse notification, create Celery task
job = {
"topic": msg.topic,
"target": target,
"filters": filters,
"payload": json.loads(msg.payload)
}
workflow = wis2_download(job)
workflow.apply_async()
def subscribe(self, topic, target, filters):
self.client.subscribe(topic, qos=0)
self.active_subscriptions[topic] = {...}
def unsubscribe(self, topic):
self.client.unsubscribe(topic)
del self.active_subscriptions[topic]
3.2.3. 命令监听器
class CommandListener(threading.Thread):
def __init__(self, subscriber, channel):
self.subscriber = subscriber
self.pubsub = get_redis_client().pubsub()
def run(self):
self.pubsub.subscribe(self.channel)
while not self.stop_event.is_set():
message = self.pubsub.get_message()
if message:
self._process_command(message)
def _process_command(self, message):
command = json.loads(message['data'])
if command['action'] == 'subscribe':
self.subscriber.subscribe(
command['topic'],
command['save_path'],
command['filters']
)
elif command['action'] == 'unsubscribe':
self.subscriber.unsubscribe(command['topic'])
3.3. Subscription Manager 模块
用于管理订阅的 Flask REST API。
3.3.1. Flask 应用程序
@app.post('/subscriptions')
def add_subscription():
data = get_json()
topic = normalise_topic(data.get('topic'))
target = normalise_path(data.get('target', ''))
filters = data.get('filters', {})
# Publish command to subscriber via Redis PubSub
command = {
"action": "subscribe",
"topic": topic,
"save_path": target,
"filters": filters
}
publish_command(command, COMMAND_CHANNEL)
# Persist to Redis for durability
persist_subscription(topic, target, filters)
return jsonify({"status": "accepted", ...}), 202
3.3.2. 指标端点
指标使用原子 HINCRBYFLOAT 操作存储在 Redis 中,可在多个 Celery worker 容器之间安全使用。端点从 Redis 读取数据并以 Prometheus 文本格式呈现:
@app.route('/metrics')
def expose_metrics():
from shared.redis_metrics import generate_prometheus_text, set_gauge
set_gauge('celery_queue_length', {}, get_queue_length())
return Response(generate_prometheus_text(), mimetype='text/plain')
从任意 worker 增加计数器:
from shared import incr_counter
incr_counter('downloads_total', {'cache': hostname, 'media_type': mime_type})
3.4. Task Manager 模块
用于下载和处理文件的 Celery 工作器。
3.4.1. Celery 配置
app = Celery('tasks',
broker=CELERY_BROKER_URL,
result_backend=CELERY_RESULT_BACKEND)
# 自动发现任务
app.autodiscover_tasks(['task_manager.tasks', 'task_manager.tasks.wis2'])
3.4.2. 下载任务
@app.task(bind=True)
@metrics_collector
def download_from_wis2(self, job):
result = {...} # Initialize result dict
# 1. Extract identifiers
message_id = job['payload']['id']
data_id = job['payload']['properties']['data_id']
filehash = job['payload']['properties']['integrity']['value']
# 2. Deduplication check
for key, type in [(message_id, 'by-msg-id'), ...]:
if get_status(key, type) == STATUS_SUCCESS:
result['status'] = STATUS_SKIPPED
return result
# 3. Acquire lock
lock_acquired = redis_client.set(lock_key, 1, nx=True, ex=LOCK_EXPIRE)
if not lock_acquired:
raise self.retry(countdown=10, max_retries=10)
# 4. Download file
response = _pool.request('GET', download_url, ...)
# 5. Verify hash
if hash_expected and hash_base64 != hash_expected:
result['status'] = STATUS_FAILED
return result
# 6. Check media type filter
if not _is_allowed_media_type(file_type, filters):
result['status'] = STATUS_SKIPPED
return result
# 7. Save file
output_path.write_bytes(data)
result['status'] = STATUS_SUCCESS
return result
3.4.3. 工作流链
def wis2_download(args):
# Chain: download -> decode/ingest
workflow = download_from_wis2.s(args) | decode_and_ingest.s()
return workflow
3.4.4. 调度器(定期任务)
一个独立的 Celery 应用(scheduler.py)使用 Celery Beat 处理日常维护任务。它使用 Redis 数据库 2/3,以避免干扰下载 worker 队列(数据库 0/1)。定期任务在 tasks/scheduled_tasks.py 中定义:
| 任务 | 执行间隔 | 用途 |
|---|---|---|
|
每 5 分钟 |
设置 |
|
每 10 分钟 |
删除超过 |
|
每天 |
完整的 |
调度器作为两个 Docker Compose 服务启动:celery-scheduler-workers(执行任务)和 celery-beats(按计划触发任务)。
4. 界面模块
界面是一个运行在 8080 端口的 NiceGUI Web 应用程序。它是用户发现数据集、浏览 WIS2 主题层次结构和创建订阅的主要界面。
4.1. 模块结构
modules/ui/
├── main.py # Application entry point, page route, AppState
├── layout.py # PageLayout builder
├── config.py # Environment variable config (SUBSCRIPTION_MANAGER_URL etc.)
├── data.py # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│ └── base.css # All custom CSS (inside @layer components)
├── components/
│ ├── header.py # WMO banner + toolbar + language selector
│ ├── footer.py # Footer bar
│ ├── navigation_drawer.py # Left nav drawer with view links
│ └── page_body.py # Main content area + right sidebar
├── i18n/ # Internationalisation
│ ├── __init__.py # t(), current_lang(), is_rtl(), LANGUAGES
│ ├── en.py # English strings (source of truth)
│ ├── fr.py # French
│ ├── es.py # Spanish
│ ├── ar.py # Arabic (RTL)
│ ├── zh.py # Chinese
│ └── ru.py # Russian
├── models/
│ └── wcmp2.py # WCMP2Record dataclass (GeoJSON Feature)
└── views/
├── dashboard.py # Grafana iframe embed
├── catalogue.py # Catalogue search + result cards
├── tree.py # Topic hierarchy tree
├── subscriptions.py # Active subscriptions list
├── settings.py # GDC status + refresh trigger
└── shared.py # Shared sidebar logic (on_topics_picked, confirm_subscribe, show_metadata)
4.2. 数据层 (data.py)
数据层负责从 WIS2 的三个全球发现目录 (GDC) 获取 WCMP2 记录、合并记录,并构建所有视图读取的两个模块级缓存。
4.2.1. GDC 数据源
启动时从三个公开 GDC 端点获取记录:
| 简称 | URL |
|---|---|
|
|
|
|
|
原始 JSON 响应以 gdc:cache:CMA、gdc:cache:DWD 和 gdc:cache:ECCC 为键缓存在 Redis 中,TTL 由 GDC_CACHE_TTL_SECONDS 控制(默认 6 小时)。Redis 是可选的——如果不可用,每次启动时通过 HTTP 获取数据。
4.2.2. 合并记录
加载所有 GDC 数据源后,_build_merged_records() 按 ID 对各目录间的记录进行去重:
-
仅存在于一个目录中的记录只出现一次,带单个
source_gdcs条目。 -
存在于多个目录中的记录被合并:
source_gdcs列出所有贡献目录。 -
如果各目录间的属性、几何形状或链接不同,
has_discrepancy被设为True。 -
跨目录联合链接,确保任一 GDC 中存在的频道数据在合并记录中得以保留。
结果存储在模块级 _merged_records 列表中,由 merged_records() 返回。
4.2.3. 主题层次结构
_build_topic_hierarchy() 遍历 _merged_records,将每条记录的第一个 cache/ MQTT 频道插入嵌套字典:
{
"cache": {
"children": {
"a": {
"children": {
"wis2": {
"children": {
"de-dwd": {
"children": {
...
"synop": {
"datasets": [<WCMP2Record>, ...]
}
}
}
}
}
}
}
}
}
}
如果一个频道是另一个的前缀,节点可以同时包含 "children" 和 "datasets"。
层次结构存储在 _topic_hierarchy 中,由 topic_hierarchy() 返回。tree.py 使用它构建 ui.tree 组件,get_datasets_for_channel() 使用它解析给定主题的数据集。
4.2.4. 关键函数
| 函数 | 描述 |
|---|---|
|
返回缓存的合并记录列表 |
|
返回缓存的主题层次结构 |
|
去除末尾的 |
|
获取 GDC 数据(优先 Redis 缓存,然后 HTTP),重建 |
4.3. WCMP2 模型 (models/wcmp2.py)
WCMP2Record 是 WIS2 发现元数据 GeoJSON Feature 的 dataclass 表示。
from models.wcmp2 import WCMP2Record
rec = WCMP2Record.from_dict(feature_dict)
rec.id # str - unique dataset identifier
rec.title # str | None
rec.description # str | None
rec.keywords # list[str]
rec.wmo_data_policy # 'core' | 'recommended' | None
rec.geometry # Geometry | None (.type, .coordinates)
rec.links # list[Link] (.channel, .href, .rel, .extra)
rec.mqtt_channels # list[str] - channels from all links
Link.extra 是一个字典,捕获 GDC 响应中非模式键(例如 GDC 特定的 filters,用于订阅侧边栏的自定义过滤器字段)。
4.4. AppState
每个浏览器会话都有自己的 AppState 实例(在 main.py 中定义):
class AppState:
def __init__(self):
self.selected_topics: list[str] = []
self.current_view: str = 'dashboard'
selected_topics 保存当前在界面中选中的 MQTT 主题,驱动右侧边栏。current_view 跟踪当前视图名称,并在任何页面重新加载(例如语言切换)之前持久化到 app.storage.user['current_view'],以便用户返回到相同的视图。
4.5. 视图
所有视图遵循相同约定:render(container, …) 函数在提供的容器元素内构建 NiceGUI 元素。render 始终是普通(非异步)函数,以便可以从 main.py 中的 show_view() 同步调用。
4.5.1. 目录视图
views/catalogue.py 通过纯过滤函数对 merged_records() 提供全文搜索:
| 函数 | 描述 |
|---|---|
|
将查询与 ID、标题、描述、关键词和主题概念进行匹配 |
|
匹配 |
|
逗号分隔的关键词;所有关键词都必须出现在记录中 |
|
Shapely 几何体与给定边界框的交集 |
结果卡片由 update_search_results() 渲染,该函数也负责分页。
4.5.2. 树状视图
views/tree.py 通过 _to_tree_nodes()(递归,按字母排序)将 topic_hierarchy() 转换为 ui.tree 节点。选择使用 on_select 强制单节点选择——不使用 on_tick,因为它与 NiceGUI 内部状态同步冲突。
4.5.3. 共享侧边栏 (views/shared.py)
on_topics_picked(e, state, layout, is_page_selection, dataset_id) 是目录和树状选择的中央处理器。它:
-
更新
state.selected_topics -
构建右侧边栏,包含保存目录输入、数据集/媒体类型/边界框/日期过滤器,以及可选的自定义过滤器
-
将 订阅 按钮绑定到
confirm_subscribe()
confirm_subscribe() 在调用 subscribe_to_topics() 之前显示包含完整 JSON 载荷的对话框。
show_metadata(dataset_id) 从 merged_records() 查找记录,并渲染带有交互式 Leaflet 地图的详情对话框。
4.6. 添加新视图
-
创建
modules/ui/views/myview.py,包含render(container)函数,对所有用户可见的字符串使用t() -
在
components/navigation_drawer.py中添加导航条目——NAV_ITEMS列表接受(view_id, label_key, icon)元组,其中label_key是nav.*i18n 键 -
在
main.py的show_view()分发器中添加分支 -
将任何新 CSS 类添加到
assets/base.css的@layer components { … }块中 -
将所有新字符串键添加到
i18n/en.py和每个其他语言文件——参见 国际化 (i18n)
4.7. 国际化 (i18n)
界面通过 modules/ui/i18n/ 中基于字典的轻量级 i18n 系统支持多种界面语言。
4.7.1. 支持的语言
| 代码 | 语言 | 方向 |
|---|---|---|
|
English |
LTR |
|
Français |
LTR |
|
Español |
LTR |
|
العربية |
RTL |
|
中文 |
LTR |
|
Русский |
LTR |
活跃语言按浏览器会话存储在 app.storage.user['lang'] 中,默认为英语。用户通过标题工具栏中的选择器更改语言;页面重新加载以应用更改。
4.7.2. t() 的工作原理
from i18n import t
# 简单查找
label = t('nav.dashboard') # → 'Dashboard'
# 带插值(使用 str.format)
label = t('subscriptions.folder', path='./') # → 'Folder: ./'
查找链为:
-
所选语言文件
-
英语(
en.py)——如果所选语言中缺少该键则作为回退 -
键字符串本身——如果英语中也缺少则作为回退(在开发中可见)
t() 从当前 NiceGUI 请求会话读取 app.storage.user['lang']。始终在渲染时调用它(在 @ui.page 处理器或 NiceGUI 事件回调内),而不是在模块导入时。
|
包含字面量 { 或 } 字符的字符串(例如提示文本中的 JSON 示例)必须按原样使用单括号。不要*使用 {{ / }} 转义——t() 仅在传递关键字参数时调用 .format(*kwargs),因此 {{ 会按字面量显示。
|
4.7.3. 键命名规范
键使用点分隔的命名空间:
| 前缀 | 范围 | 示例 |
|---|---|---|
|
导航抽屉标签 |
|
|
按钮标签 |
|
|
订阅侧边栏 |
|
|
目录视图 |
|
|
树状视图 |
|
|
管理订阅视图 |
|
|
设置视图 |
|
|
手动订阅视图 |
|
|
手动订阅验证消息 |
|
|
对话框标题 |
|
|
元数据对话框 |
|
|
共享验证消息 |
|
|
页脚 |
|
|
ARIA 无障碍标签 |
|
4.7.4. 从右到左 (RTL) 支持
阿拉伯语(ar)是从右到左的语言。选择阿拉伯语时,is_rtl() 返回 True,main.py 中的 on_connect() 处理器通过 JavaScript 在 <html> 元素上注入 dir="rtl",激活 Quasar 的 RTL 布局模式。assets/base.css 中的自定义 CSS 覆盖(在 RTL overrides 注释块下)修正了 Quasar 在 .q-page-container 上的物理内边距,并重新定位订阅侧边栏。
要添加另一种 RTL 语言,将其代码添加到 init.py 中的 RTL_LANGUAGES——无需其他更改。
4.7.5. 添加新语言
将 modules/ui/i18n/en.py 复制到 modules/ui/i18n/{code}.py 并翻译所有值。保留每个键;不要删除任何键。完全按照英语源中出现的方式保留 {placeholder} 名称:
# modules/ui/i18n/de.py
"""German strings."""
STRINGS: dict[str, str] = {
'nav.dashboard': 'Dashboard',
'nav.catalogue': 'Katalogsuche',
'nav.tree': 'Baumsuche',
'nav.manual': 'Manuell abonnieren',
'nav.manage': 'Abonnements verwalten',
'nav.settings': 'Einstellungen',
# ... all remaining keys ...
'subscriptions.folder': 'Ordner: {path}', # {path} must be preserved
'settings.records': '{name}: {count} Einträge',
}
init.py 中注册语言from . import ar, de, en, es, fr, ru, zh (1)
LANGUAGES: dict[str, str] = {
'en': 'English',
'fr': 'Français',
'es': 'Español',
'ar': 'العربية',
'zh': '中文',
'ru': 'Русский',
'de': 'Deutsch', (2)
}
_STRINGS: dict[str, dict[str, str]] = {
'en': en.STRINGS,
'fr': fr.STRINGS,
'es': es.STRINGS,
'ar': ar.STRINGS,
'zh': zh.STRINGS,
'ru': ru.STRINGS,
'de': de.STRINGS, (3)
}
| 1 | 添加导入 |
| 2 | 添加显示名称(使用原生文字) |
| 3 | 添加字符串映射 |
如果语言是从右到左的(例如希伯来语 he、波斯语 fa):
RTL_LANGUAGES: frozenset[str] = frozenset({'ar', 'he'})
docker-compose restart wis2downloader-ui
新语言将立即出现在标题语言选择器中。
| 现有的非英语翻译是机器生成的,仅作为起点提供。在生产部署前,所有字符串应由母语使用者审核,特别注意 WMO/气象领域术语(WIS2、BUFR、GRIB、Global Cache 等),这些术语在 WMO 官方文件中有既定翻译。 |
4.7.6. 添加新的可翻译字符串
向任何视图或组件添加界面文本时:
en.py(真实来源)# modules/ui/i18n/en.py
'myview.title': 'My New View',
'myview.description': 'Showing results for {topic}.',
如果翻译尚不可用,复制英语值作为占位符。t() 自动回退到英语,但有键存在可避免工具中的空缺:
# fr.py / es.py / ar.py / zh.py / ru.py
'myview.title': 'My New View', # TODO: translate
'myview.description': 'Showing results for {topic}.',
t()from i18n import t
def render(container):
with container:
ui.label(t('myview.title')).classes('page-title')
ui.label(t('myview.description', topic=selected_topic))
5. 服务间通信
5.1. Redis PubSub(命令)
Subscription Manager 与 Subscriber 之间的通信使用 Redis PubSub,频道为 subscription_commands:
// Channel: subscription_commands
// Subscribe command
{
"action": "subscribe",
"topic": "cache/a/wis2/+/data/#",
"save_path": "all-data",
"filters": {"accepted_media_types": ["application/bufr"]}
}
// Unsubscribe command
{
"action": "unsubscribe",
"topic": "cache/a/wis2/+/data/#"
}
5.2. Celery 任务(下载)
Subscriber 与 Celery 工作器之间的通信使用 Celery 任务队列:
job = {
"topic": "cache/a/wis2/de-dwd/data/...",
"target": "dwd-data",
"filters": {"accepted_media_types": [...]},
"_broker": "globalbroker.meteo.fr",
"_received": "2026-01-28 10:15:30",
"_queued": "2026-01-28 10:15:30",
"payload": {
"id": "...",
"properties": {
"data_id": "...",
"metadata_id": "...",
"integrity": {"method": "sha512", "value": "..."}
},
"links": [
{"rel": "canonical", "href": "https://..."}
]
}
}
5.3. Redis 键
| 键模式 | 用途 |
|---|---|
|
所有订阅的哈希表(sub_id → JSON |
|
去重跟踪(类型: |
|
防止并发重复下载的分布式锁 |
|
已缓存的 GDC 目录 JSON(CMA、DWD、ECCC);TTL 由 |
|
Prometheus 指标哈希(字段 = JSON 标签字典,值 = 浮点计数器/指标) |
|
订阅者健康心跳 |
|
Celery 任务队列(默认) |
6. 扩展系统
6.1. 向过滤器引擎添加新的匹配条件
过滤器引擎位于 modules/shared/shared/filters.py。每个匹配条件是 _evaluate_match() 调度的匹配对象中的一个键。
要添加新的内置条件(例如匹配新的元数据字段 station_id):
-
在
filters.py的MatchContext中添加字段:@dataclass class MatchContext: ... station_id: str | None = None -
在
wis2.py的_build_context()中填充该字段(在预下载和/或后下载阶段)。 -
在
_evaluate_match()中添加分支:if 'station_id' in match: return _match_string_field(ctx.station_id, match['station_id']) -
在
openapi.yml和用户指南的过滤器参考中记录新字段。
app.py、command_listener.py 或 subscriber.py 无需更改——过滤器对象会原样传递,并在下载时进行评估。
6.2. 添加新任务
-
在
modules/task_manager/task_manager/tasks/中创建任务 -
在
worker.py的自动发现中注册 -
如需要则添加到工作流
# modules/task_manager/task_manager/tasks/custom.py
from task_manager.worker import app
@app.task
def my_custom_task(data):
# Process data
return result
6.3. 添加新指标
指标通过 shared.redis_metrics 存储在 Redis 中。要添加新指标:
-
在
modules/shared/shared/redis_metrics.py的METRICS字典中注册:# short name → (prometheus type, help text) METRICS: dict[str, tuple[str, str]] = { ... 'my_counter_total': ('counter', 'Description of the counter.'), 'my_gauge': ('gauge', 'Description of the gauge.'), } -
从任何服务中增加计数器或设置指标值:
from shared import incr_counter, set_gauge # 计数器(例如在 Celery 任务中) incr_counter('my_counter_total', {'label1': 'value', 'label2': 'value'}) # 仪表(例如在定时任务中) set_gauge('my_gauge', {'label1': 'value'}, 42.0)
只有在 METRICS 中注册的指标才会出现在 /metrics 输出中。
7. 测试
7.1. 运行测试
# 安装测试依赖项
pip install pytest pytest-cov
# 运行测试
pytest modules/
# 带覆盖率
pytest --cov=modules modules/
7.2. 手动测试
# 启动服务
docker-compose up -d
# 创建测试订阅
curl -X POST http://localhost:5002/subscriptions \
-H "Content-Type: application/json" \
-d '{"topic": "cache/a/wis2/de-dwd/data/#", "target": "test"}'
# 监控日志
docker-compose logs -f subscriber celery
# 检查下载
ls -la downloads/test/
8. 开发环境搭建
8.1. 本地开发
# 创建虚拟环境
python -m venv venv
source venv/bin/activate
# 以可编辑模式安装模块
pip install -e modules/shared
pip install -e modules/task_manager
pip install -e modules/subscriber
pip install -e modules/subscription_manager
# 启动 Redis(开发用单实例)
docker run -d -p 6379:6379 redis:7.2-alpine redis-server --requirepass devpassword
# 设置环境变量
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=devpassword
export FLASK_SECRET_KEY=dev-secret-key
export LOG_LEVEL=DEBUG
# 运行订阅管理器
python -m subscription_manager.app
# 运行订阅者(在另一个终端中)
export GLOBAL_BROKER_HOST=globalbroker.meteo.fr
python -m subscriber.manager
# 运行 Celery worker(在另一个终端中)
celery -A task_manager.worker worker --loglevel=DEBUG
8.2. Docker 构建
# 构建所有镜像
docker-compose build
# 构建特定服务
docker-compose build celery
# 不使用缓存重新构建
docker-compose build --no-cache
9. 调试
9.1. 查看 Celery 任务
# 检查活跃任务
docker exec -it wis2downloader-celery-1 \
celery -A task_manager.worker inspect active
# 检查保留任务
docker exec -it wis2downloader-celery-1 \
celery -A task_manager.worker inspect reserved
9.2. Redis 检查
# 连接到 Redis(使用环境变量中的密码)
docker exec -it redis redis-cli -a $REDIS_PASSWORD
# 列出订阅
HGETALL global:subscriptions
# 检查队列长度
LLEN celery
# 查看去重键
KEYS wis2:notifications:*
# 查看 GDC 目录缓存(UI 启动时填充)
KEYS gdc:cache:*
TTL gdc:cache:CMA
所有 Redis 命令都需要身份验证。-a $REDIS_PASSWORD 标志从您的环境中传递密码。
|
9.3. MQTT 调试
# 手动订阅主题(mosquitto-clients)
mosquitto_sub -h globalbroker.meteo.fr -p 443 \
-t 'cache/a/wis2/de-dwd/data/#' \
-u everyone -P everyone \
--capath /etc/ssl/certs