|
Этот документ был автоматически переведён с английского языка с помощью Claude AI. Перед использованием в production-среде терминология из области WMO/метеорологии должна быть проверена носителем языка. Авторитетной версией является оригинал на английском языке. |
1. Обзор архитектуры
WIS2 Downloader — это распределённая система, состоящая из четырёх основных компонентов:
Docker Compose
┌──────────────────────────────────────────────────────────┐
│ │
│ Web UI (NiceGUI :8080) │
│ - reads GDC catalogue data from Redis cache │
│ - posts subscriptions to Subscription Manager API │
│ │ │
│ ▼ │
│ Subscription Manager (Flask API :5001) │
│ - persists subscriptions in Redis │
│ - publishes subscribe/unsubscribe commands via PubSub │
│ │ │
│ Redis PubSub (commands) │
│ │ │
│ ▼ │
│ Subscriber (MQTT Client) │
│ - connects to WIS2 Global Broker │
│ - on notification: queues Celery download task │
│ │ │
│ Redis (Celery queue + dedup state) │
│ │ │
│ ▼ │
│ Celery Workers │
│ - download file, verify hash, apply filters, save │
│ │
└──────────────────────────────────────────────────────────┘
1.1. Поток данных
-
При запуске веб-интерфейс загружает записи WCMP2 из трёх конечных точек GDC (или из кэша Redis) и строит иерархию тем в памяти
-
Пользователь просматривает каталог или дерево и выбирает тему в веб-интерфейсе
-
Пользователь нажимает Подписаться → веб-интерфейс отправляет POST-запрос к REST API Subscription Manager
-
Subscription Manager сохраняет подписку в Redis и публикует команду подписки в канал Redis PubSub
-
Subscriber получает команду и подписывается на тему MQTT на WIS2 Global Broker
-
Когда поступает уведомление WIS2, Subscriber помещает задачу загрузки Celery в очередь
-
Рабочий процесс Celery загружает файл, проверяет хэш, применяет фильтры и сохраняет на диск
2. Структура модулей
modules/
├── shared/ # Shared utilities
│ └── shared/
│ ├── __init__.py
│ ├── logging.py # Centralized logging
│ └── redis_client.py # Redis client singleton
│
├── subscriber/ # MQTT subscriber service
│ └── subscriber/
│ ├── __init__.py
│ ├── manager.py # Entry point, thread management
│ ├── subscriber.py # MQTT client wrapper
│ └── command_listener.py # Redis PubSub listener
│
├── subscription_manager/ # REST API service
│ └── subscription_manager/
│ ├── __init__.py
│ ├── app.py # Flask application
│ └── static/
│ └── openapi.yml # API specification
│
├── task_manager/ # Celery tasks
│ └── task_manager/
│ ├── __init__.py
│ ├── worker.py # Celery app configuration
│ ├── tasks/
│ │ └── wis2.py # Download tasks
│ └── workflows/
│ └── __init__.py # Task chains
│
└── ui/ # NiceGUI web interface (port 8080)
├── main.py # Entry point, page route, AppState
├── layout.py # PageLayout builder
├── config.py # Environment variable config
├── data.py # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│ └── base.css # All custom CSS
├── components/ # Reusable layout components
├── i18n/ # Internationalisation
│ ├── __init__.py # t(), current_lang(), is_rtl(), LANGUAGES
│ ├── en.py # English strings (source of truth)
│ ├── fr.py # French
│ ├── es.py # Spanish
│ ├── ar.py # Arabic (RTL)
│ ├── zh.py # Chinese
│ └── ru.py # Russian
├── models/
│ └── wcmp2.py # WCMP2Record dataclass
└── views/ # Page views (catalogue, tree, subscriptions, etc.)
3. Детали модулей
3.1. Общий модуль
Предоставляет общие утилиты, используемые во всех сервисах.
3.1.1. Клиент Redis
from shared import get_redis_client
# Получить клиент Redis (singleton)
redis = get_redis_client()
# Использовать как обычный клиент Redis
redis.set('key', 'value')
redis.get('key')
Клиент:
-
Подключается напрямую к серверу Redis
-
Кэширует соединение (паттерн «одиночка»)
-
Содержит логику повторных попыток для устранения временных сбоев
3.1.2. Журналирование
from shared import setup_logging
# Настроить корневой логгер (вызывается один раз при запуске)
setup_logging()
# Получить логгер для конкретного модуля
LOGGER = setup_logging(__name__)
LOGGER.info("Message with UTC timestamp")
Возможности:
-
Метки времени в формате ISO 8601 в UTC
-
Настраивается через переменную окружения
LOG_LEVEL -
Вывод в stdout для сбора логов Docker
3.2. Модуль Subscriber
Подключается к WIS2 Global Broker через MQTT и обрабатывает входящие уведомления.
3.2.1. Точка входа
def run_manager():
# 1. Create MQTT subscriber
mqtt_subscriber = Subscriber(**broker_config)
# 2. Create Redis command listener
redis_listener = CommandListener(
subscriber=mqtt_subscriber,
channel=COMMAND_CHANNEL
)
# 3. Start MQTT in separate thread
mqtt_thread = threading.Thread(target=mqtt_subscriber.start, daemon=True)
mqtt_thread.start()
# 4. Start Redis listener (blocks)
redis_listener.start()
3.2.2. Подписчик MQTT
class Subscriber:
def __init__(self, host, port, uid, pwd, protocol, session):
# Configure MQTT client with callbacks
self.client = mqtt.Client(...)
self.client.on_message = self._on_message
def _on_message(self, client, userdata, msg):
# Parse notification, create Celery task
job = {
"topic": msg.topic,
"target": target,
"filters": filters,
"payload": json.loads(msg.payload)
}
workflow = wis2_download(job)
workflow.apply_async()
def subscribe(self, topic, target, filters):
self.client.subscribe(topic, qos=0)
self.active_subscriptions[topic] = {...}
def unsubscribe(self, topic):
self.client.unsubscribe(topic)
del self.active_subscriptions[topic]
3.2.3. Слушатель команд
class CommandListener(threading.Thread):
def __init__(self, subscriber, channel):
self.subscriber = subscriber
self.pubsub = get_redis_client().pubsub()
def run(self):
self.pubsub.subscribe(self.channel)
while not self.stop_event.is_set():
message = self.pubsub.get_message()
if message:
self._process_command(message)
def _process_command(self, message):
command = json.loads(message['data'])
if command['action'] == 'subscribe':
self.subscriber.subscribe(
command['topic'],
command['save_path'],
command['filters']
)
elif command['action'] == 'unsubscribe':
self.subscriber.unsubscribe(command['topic'])
3.3. Модуль Subscription Manager
REST API на Flask для управления подписками.
3.3.1. Приложение Flask
@app.post('/subscriptions')
def add_subscription():
data = get_json()
topic = normalise_topic(data.get('topic'))
target = normalise_path(data.get('target', ''))
filters = data.get('filters', {})
# Publish command to subscriber via Redis PubSub
command = {
"action": "subscribe",
"topic": topic,
"save_path": target,
"filters": filters
}
publish_command(command, COMMAND_CHANNEL)
# Persist to Redis for durability
persist_subscription(topic, target, filters)
return jsonify({"status": "accepted", ...}), 202
3.3.2. Конечная точка метрик
Метрики хранятся в Redis с использованием атомарных операций HINCRBYFLOAT, что делает их безопасными при работе нескольких контейнеров Celery. Конечная точка читает данные из Redis и формирует вывод в формате Prometheus text format:
@app.route('/metrics')
def expose_metrics():
from shared.redis_metrics import generate_prometheus_text, set_gauge
set_gauge('celery_queue_length', {}, get_queue_length())
return Response(generate_prometheus_text(), mimetype='text/plain')
Увеличение счётчика из любого воркера:
from shared import incr_counter
incr_counter('downloads_total', {'cache': hostname, 'media_type': mime_type})
3.4. Модуль Task Manager
Рабочие процессы Celery для загрузки и обработки файлов.
3.4.1. Конфигурация Celery
app = Celery('tasks',
broker=CELERY_BROKER_URL,
result_backend=CELERY_RESULT_BACKEND)
# Автоматическое обнаружение задач
app.autodiscover_tasks(['task_manager.tasks', 'task_manager.tasks.wis2'])
3.4.2. Задача загрузки
@app.task(bind=True)
@metrics_collector
def download_from_wis2(self, job):
result = {...} # Initialize result dict
# 1. Extract identifiers
message_id = job['payload']['id']
data_id = job['payload']['properties']['data_id']
filehash = job['payload']['properties']['integrity']['value']
# 2. Deduplication check
for key, type in [(message_id, 'by-msg-id'), ...]:
if get_status(key, type) == STATUS_SUCCESS:
result['status'] = STATUS_SKIPPED
return result
# 3. Acquire lock
lock_acquired = redis_client.set(lock_key, 1, nx=True, ex=LOCK_EXPIRE)
if not lock_acquired:
raise self.retry(countdown=10, max_retries=10)
# 4. Download file
response = _pool.request('GET', download_url, ...)
# 5. Verify hash
if hash_expected and hash_base64 != hash_expected:
result['status'] = STATUS_FAILED
return result
# 6. Check media type filter
if not _is_allowed_media_type(file_type, filters):
result['status'] = STATUS_SKIPPED
return result
# 7. Save file
output_path.write_bytes(data)
result['status'] = STATUS_SUCCESS
return result
3.4.3. Цепочки рабочих процессов
def wis2_download(args):
# Chain: download -> decode/ingest
workflow = download_from_wis2.s(args) | decode_and_ingest.s()
return workflow
3.4.4. Планировщик (периодические задачи)
Отдельное приложение Celery (scheduler.py) выполняет обслуживание системы с помощью Celery Beat. Оно использует базы данных Redis 2/3 для исключения конфликтов с очередями рабочих процессов загрузки (базы данных 0/1). Периодические задачи определены в tasks/scheduled_tasks.py:
| Задача | Интервал | Назначение |
|---|---|---|
|
Каждые 5 мин |
Устанавливает значения метрик |
|
Каждые 10 мин |
Удаляет файлы старше |
|
Ежедневно |
Полный обход |
Планировщик запускается как два сервиса Docker Compose: celery-scheduler-workers (выполняет задачи) и celery-beats (запускает их по расписанию).
4. Модуль пользовательского интерфейса
Интерфейс — это веб-приложение NiceGUI, работающее на порту 8080. Это основной пользовательский интерфейс для обнаружения наборов данных, просмотра иерархии тем WIS2 и создания подписок.
4.1. Структура модулей
modules/ui/
├── main.py # Application entry point, page route, AppState
├── layout.py # PageLayout builder
├── config.py # Environment variable config (SUBSCRIPTION_MANAGER_URL etc.)
├── data.py # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│ └── base.css # All custom CSS (inside @layer components)
├── components/
│ ├── header.py # WMO banner + toolbar + language selector
│ ├── footer.py # Footer bar
│ ├── navigation_drawer.py # Left nav drawer with view links
│ └── page_body.py # Main content area + right sidebar
├── i18n/ # Internationalisation
│ ├── __init__.py # t(), current_lang(), is_rtl(), LANGUAGES
│ ├── en.py # English strings (source of truth)
│ ├── fr.py # French
│ ├── es.py # Spanish
│ ├── ar.py # Arabic (RTL)
│ ├── zh.py # Chinese
│ └── ru.py # Russian
├── models/
│ └── wcmp2.py # WCMP2Record dataclass (GeoJSON Feature)
└── views/
├── dashboard.py # Grafana iframe embed
├── catalogue.py # Catalogue search + result cards
├── tree.py # Topic hierarchy tree
├── subscriptions.py # Active subscriptions list
├── settings.py # GDC status + refresh trigger
└── shared.py # Shared sidebar logic (on_topics_picked, confirm_subscribe, show_metadata)
4.2. Слой данных (data.py)
Слой данных отвечает за получение записей WCMP2 из трёх каталогов глобального обнаружения WIS2 (GDC), их объединение и построение двух кэшей на уровне модуля, из которых читают все представления.
4.2.1. Источники GDC
При запуске записи загружаются из трёх публичных конечных точек GDC:
| Короткое имя | URL |
|---|---|
|
|
|
|
|
Необработанные JSON-ответы кэшируются в Redis под ключами gdc:cache:CMA, gdc:cache:DWD и gdc:cache:ECCC с TTL, управляемым GDC_CACHE_TTL_SECONDS (по умолчанию 6 часов). Redis необязателен — если недоступен, данные загружаются по HTTP при каждом запуске.
4.2.2. Объединённые записи
После загрузки всех источников GDC _build_merged_records() дедуплицирует записи по ID между каталогами:
-
Записи, присутствующие только в одном каталоге, появляются один раз с одной записью
source_gdcs. -
Записи, присутствующие в нескольких каталогах, объединяются:
source_gdcsперечисляет все вносящие вклад каталоги. -
has_discrepancyустанавливается вTrue, если свойства, геометрия или ссылки различаются между каталогами. -
Ссылки объединяются между каталогами, чтобы данные каналов из любого GDC сохранялись в объединённой записи.
Результат хранится в списке _merged_records на уровне модуля и возвращается функцией merged_records().
4.2.3. Иерархия тем
_build_topic_hierarchy() перебирает _merged_records и вставляет первый MQTT-канал cache/ каждой записи во вложенный словарь:
{
"cache": {
"children": {
"a": {
"children": {
"wis2": {
"children": {
"de-dwd": {
"children": {
...
"synop": {
"datasets": [<WCMP2Record>, ...]
}
}
}
}
}
}
}
}
}
}
Узел может содержать как "children", так и "datasets", если один канал является префиксом другого.
Иерархия хранится в _topic_hierarchy и возвращается функцией topic_hierarchy(). Она используется tree.py для построения виджета ui.tree и get_datasets_for_channel() для разрешения наборов данных для заданной темы.
4.2.4. Ключевые функции
| Функция | Описание |
|---|---|
|
Возвращает кэшированный список объединённых записей |
|
Возвращает кэшированную иерархию тем |
|
Удаляет завершающий |
|
Загружает данные GDC (сначала кэш Redis, затем HTTP), перестраивает |
4.3. Модель WCMP2 (models/wcmp2.py)
WCMP2Record — это dataclass-представление GeoJSON Feature метаданных обнаружения WIS2.
from models.wcmp2 import WCMP2Record
rec = WCMP2Record.from_dict(feature_dict)
rec.id # str - unique dataset identifier
rec.title # str | None
rec.description # str | None
rec.keywords # list[str]
rec.wmo_data_policy # 'core' | 'recommended' | None
rec.geometry # Geometry | None (.type, .coordinates)
rec.links # list[Link] (.channel, .href, .rel, .extra)
rec.mqtt_channels # list[str] - channels from all links
Link.extra — это словарь, захватывающий внесхемные ключи из ответа GDC (например, специфичные для GDC filters, используемые для полей пользовательских фильтров на боковой панели подписки).
4.4. AppState
Каждый сеанс браузера имеет собственный экземпляр AppState (определён в main.py):
class AppState:
def __init__(self):
self.selected_topics: list[str] = []
self.current_view: str = 'dashboard'
selected_topics хранит темы MQTT, выбранные в данный момент в интерфейсе, и управляет правой боковой панелью. current_view отслеживает имя активного представления и сохраняется в app.storage.user['current_view'] перед любой перезагрузкой страницы (например, при смене языка), чтобы пользователь вернулся к тому же представлению.
4.5. Представления
Все представления следуют одному и тому же соглашению: функция render(container, …), которая строит элементы NiceGUI внутри предоставленного контейнерного элемента. render всегда является обычной (не асинхронной) функцией, чтобы её можно было синхронно вызывать из show_view() в main.py.
4.5.1. Представление каталога
views/catalogue.py обеспечивает полнотекстовый поиск по merged_records() с чистыми функциями-фильтрами:
| Функция | Описание |
|---|---|
|
Сопоставляет запрос с ID, заголовком, описанием, ключевыми словами и концепциями темы |
|
Сопоставляет |
|
Ключевые слова через запятую; все должны присутствовать в записи |
|
Пересечение геометрии Shapely с заданной ограничивающей рамкой |
Карточки результатов отображаются функцией update_search_results(), которая также отвечает за разбивку на страницы.
4.5.2. Древовидное представление
views/tree.py преобразует topic_hierarchy() в узлы ui.tree через _to_tree_nodes() (рекурсивно, в алфавитном порядке). Для выбора используется on_select, обеспечивающий выбор одного узла — on_tick не используется, так как конфликтует с внутренней синхронизацией состояния NiceGUI.
4.5.3. Общая боковая панель (views/shared.py)
on_topics_picked(e, state, layout, is_page_selection, dataset_id) — центральный обработчик выбора в каталоге и дереве. Он:
-
Обновляет
state.selected_topics -
Строит правую боковую панель с полем ввода каталога сохранения, фильтрами наборов данных/типа медиа/ограничивающей рамки/даты и необязательными пользовательскими фильтрами
-
Привязывает кнопку Подписаться к
confirm_subscribe()
confirm_subscribe() показывает диалоговое окно с полной JSON-нагрузкой перед вызовом subscribe_to_topics().
show_metadata(dataset_id) находит запись из merged_records() и отображает диалоговое окно с деталями и интерактивной картой Leaflet.
4.6. Добавление нового представления
-
Создайте
modules/ui/views/myview.pyс функциейrender(container), используяt()для всех видимых пользователю строк -
Добавьте запись навигации в
components/navigation_drawer.py— списокNAV_ITEMSпринимает кортеж(view_id, label_key, icon), гдеlabel_key— ключ i18n типаnav.* -
Добавьте ветку в диспетчер
show_view()вmain.py -
Добавьте новые CSS-классы в
assets/base.cssвнутри блока@layer components { … } -
Добавьте все новые строковые ключи в
i18n/en.pyи в каждый другой языковой файл — смотрите Интернационализация (i18n)
4.7. Интернационализация (i18n)
Интерфейс поддерживает несколько языков через лёгкую систему i18n на основе словарей в modules/ui/i18n/.
4.7.1. Поддерживаемые языки
| Код | Язык | Направление |
|---|---|---|
|
English |
LTR |
|
Français |
LTR |
|
Español |
LTR |
|
العربية |
RTL |
|
中文 |
LTR |
|
Русский |
LTR |
Активный язык хранится для каждого сеанса браузера в app.storage.user['lang'] и по умолчанию является английским. Пользователи меняют язык через селектор в панели инструментов заголовка; страница перезагружается для применения изменения.
4.7.2. Принцип работы t()
from i18n import t
# Простой поиск
label = t('nav.dashboard') # → 'Dashboard'
# С интерполяцией (использует str.format)
label = t('subscriptions.folder', path='./') # → 'Folder: ./'
Цепочка поиска:
-
Файл выбранного языка
-
Английский (
en.py) — резервный, если ключ отсутствует в выбранном языке -
Строка самого ключа — резервный, если также отсутствует в английском (виден при разработке)
t() читает app.storage.user['lang'] из текущего сеанса запроса NiceGUI. Всегда вызывайте её во время рендеринга (внутри обработчика @ui.page или обратного вызова события NiceGUI), а не во время импорта модуля.
|
Строки, содержащие буквальные символы { или } (например, примеры JSON в тексте подсказок), должны использовать одинарные скобки как есть. Не используйте экранирование {{ / }} — t() вызывает .format(**kwargs) только при передаче именованных аргументов, поэтому {{ будет отображаться буквально.
|
4.7.3. Соглашения об именовании ключей
Ключи используют пространства имён, разделённые точками:
| Префикс | Область | Пример |
|---|---|---|
|
Метки навигационного ящика |
|
|
Метки кнопок |
|
|
Боковая панель подписки |
|
|
Представление каталога |
|
|
Древовидное представление |
|
|
Представление управления подписками |
|
|
Представление настроек |
|
|
Представление ручной подписки |
|
|
Сообщения валидации ручной подписки |
|
|
Заголовки диалоговых окон |
|
|
Диалоговое окно метаданных |
|
|
Общие сообщения валидации |
|
|
Нижний колонтитул |
|
|
Метки доступности ARIA |
|
4.7.4. Поддержка RTL
Арабский (ar) — язык с письмом справа налево. При выборе арабского is_rtl() возвращает True, а обработчик on_connect() в main.py через JavaScript добавляет dir="rtl" на элемент <html>, активируя режим RTL-макета Quasar. Пользовательские CSS-переопределения в assets/base.css (под блоком комментария RTL overrides) исправляют физические отступы Quasar для .q-page-container и перепозиционируют боковую панель подписки.
Чтобы добавить ещё один RTL-язык, добавьте его код в RTL_LANGUAGES в init.py — никаких других изменений не требуется.
4.7.5. Добавление нового языка
Скопируйте modules/ui/i18n/en.py в modules/ui/i18n/{code}.py и переведите все значения. Сохраните каждый ключ; не удаляйте ни одного. Сохраните имена {placeholder} точно так, как они появляются в английском источнике:
# modules/ui/i18n/de.py
"""German strings."""
STRINGS: dict[str, str] = {
'nav.dashboard': 'Dashboard',
'nav.catalogue': 'Katalogsuche',
'nav.tree': 'Baumsuche',
'nav.manual': 'Manuell abonnieren',
'nav.manage': 'Abonnements verwalten',
'nav.settings': 'Einstellungen',
# ... all remaining keys ...
'subscriptions.folder': 'Ordner: {path}', # {path} must be preserved
'settings.records': '{name}: {count} Einträge',
}
init.pyfrom . import ar, de, en, es, fr, ru, zh (1)
LANGUAGES: dict[str, str] = {
'en': 'English',
'fr': 'Français',
'es': 'Español',
'ar': 'العربية',
'zh': '中文',
'ru': 'Русский',
'de': 'Deutsch', (2)
}
_STRINGS: dict[str, dict[str, str]] = {
'en': en.STRINGS,
'fr': fr.STRINGS,
'es': es.STRINGS,
'ar': ar.STRINGS,
'zh': zh.STRINGS,
'ru': ru.STRINGS,
'de': de.STRINGS, (3)
}
| 1 | Добавьте импорт |
| 2 | Добавьте отображаемое имя (на родном шрифте) |
| 3 | Добавьте маппинг строк |
Если язык пишется справа налево (например, иврит he, персидский fa):
RTL_LANGUAGES: frozenset[str] = frozenset({'ar', 'he'})
docker-compose restart wis2downloader-ui
Новый язык немедленно появится в селекторе языка в заголовке.
| Существующие нeanглийские переводы созданы машинным способом и предоставляются только как отправная точка. Все строки должны быть проверены носителем языка перед production-развёртыванием, с особым вниманием к терминологии WMO/метеорологической области (WIS2, BUFR, GRIB, Global Cache и т.д.), которая имеет устоявшиеся переводы в официальных документах WMO. |
4.7.6. Добавление новых переводимых строк
При добавлении текста интерфейса в любое представление или компонент:
en.py (источник истины)# modules/ui/i18n/en.py
'myview.title': 'My New View',
'myview.description': 'Showing results for {topic}.',
Скопируйте английское значение как заполнитель, если перевод ещё недоступен. t() автоматически откатывается к английскому, но наличие ключа предотвращает пробелы в инструментарии:
# fr.py / es.py / ar.py / zh.py / ru.py
'myview.title': 'My New View', # TODO: translate
'myview.description': 'Showing results for {topic}.',
t() в представленииfrom i18n import t
def render(container):
with container:
ui.label(t('myview.title')).classes('page-title')
ui.label(t('myview.description', topic=selected_topic))
5. Межсервисное взаимодействие
5.1. Redis PubSub (Команды)
Взаимодействие между Subscription Manager и Subscriber использует Redis PubSub на канале subscription_commands:
// Channel: subscription_commands
// Subscribe command
{
"action": "subscribe",
"topic": "cache/a/wis2/+/data/#",
"save_path": "all-data",
"filters": {"accepted_media_types": ["application/bufr"]}
}
// Unsubscribe command
{
"action": "unsubscribe",
"topic": "cache/a/wis2/+/data/#"
}
5.2. Задачи Celery (Загрузки)
Взаимодействие между Subscriber и рабочими процессами Celery использует очередь задач Celery:
job = {
"topic": "cache/a/wis2/de-dwd/data/...",
"target": "dwd-data",
"filters": {"accepted_media_types": [...]},
"_broker": "globalbroker.meteo.fr",
"_received": "2026-01-28 10:15:30",
"_queued": "2026-01-28 10:15:30",
"payload": {
"id": "...",
"properties": {
"data_id": "...",
"metadata_id": "...",
"integrity": {"method": "sha512", "value": "..."}
},
"links": [
{"rel": "canonical", "href": "https://..."}
]
}
}
5.3. Ключи Redis
| Шаблон ключа | Назначение |
|---|---|
|
Хэш всех подписок (sub_id → JSON |
|
Отслеживание дедупликации (типы: |
|
Распределённая блокировка для предотвращения параллельных дублирующих загрузок |
|
Кэшированный JSON каталога GDC (CMA, DWD, ECCC); TTL задаётся переменной |
|
Хэш метрик Prometheus (поле = JSON-словарь меток, значение = счётчик/метрика типа float) |
|
Контрольный сигнал работоспособности подписчика |
|
Очередь задач Celery (по умолчанию) |
6. Расширение системы
6.1. Добавление нового условия совпадения в механизм фильтрации
Механизм фильтрации находится в modules/shared/shared/filters.py. Каждое условие совпадения — это ключ в объекте match, обрабатываемый функцией _evaluate_match().
Чтобы добавить новое встроенное условие (например, совпадение по новому полю метаданных station_id):
-
Добавьте поле в
MatchContextвfilters.py:@dataclass class MatchContext: ... station_id: str | None = None -
Заполните его в
_build_context()вwis2.py(до и/или после загрузки). -
Добавьте ветку в
_evaluate_match():if 'station_id' in match: return _match_string_field(ctx.station_id, match['station_id']) -
Задокументируйте новое поле в
openapi.ymlи в справочнике по фильтрам в руководстве пользователя.
Изменения в app.py, command_listener.py или subscriber.py не требуются — объект фильтра передаётся без изменений и вычисляется во время загрузки.
6.2. Добавление новой задачи
-
Создайте задачу в
modules/task_manager/task_manager/tasks/ -
Зарегистрируйте в автообнаружении
worker.py -
При необходимости добавьте в рабочий процесс
# modules/task_manager/task_manager/tasks/custom.py
from task_manager.worker import app
@app.task
def my_custom_task(data):
# Process data
return result
6.3. Добавление новой метрики
Метрики хранятся в Redis через shared.redis_metrics. Чтобы добавить новую метрику:
-
Зарегистрируйте её в словаре
METRICSвmodules/shared/shared/redis_metrics.py:# short name → (prometheus type, help text) METRICS: dict[str, tuple[str, str]] = { ... 'my_counter_total': ('counter', 'Description of the counter.'), 'my_gauge': ('gauge', 'Description of the gauge.'), } -
Увеличивайте её из любого сервиса (счётчик) или устанавливайте значение (метрика):
from shared import incr_counter, set_gauge # Счётчик (например, в задаче Celery) incr_counter('my_counter_total', {'label1': 'value', 'label2': 'value'}) # Датчик (например, в запланированной задаче) set_gauge('my_gauge', {'label1': 'value'}, 42.0)
В вывод /metrics попадают только метрики, зарегистрированные в словаре METRICS.
7. Тестирование
7.1. Запуск тестов
# Установить тестовые зависимости
pip install pytest pytest-cov
# Запустить тесты
pytest modules/
# С покрытием
pytest --cov=modules modules/
7.2. Ручное тестирование
# Запустить сервисы
docker-compose up -d
# Создать тестовую подписку
curl -X POST http://localhost:5002/subscriptions \
-H "Content-Type: application/json" \
-d '{"topic": "cache/a/wis2/de-dwd/data/#", "target": "test"}'
# Следить за журналами
docker-compose logs -f subscriber celery
# Проверить загрузки
ls -la downloads/test/
8. Настройка среды разработки
8.1. Локальная разработка
# Создать виртуальное окружение
python -m venv venv
source venv/bin/activate
# Установить модули в режиме редактирования
pip install -e modules/shared
pip install -e modules/task_manager
pip install -e modules/subscriber
pip install -e modules/subscription_manager
# Запустить Redis (единственный экземпляр для разработки)
docker run -d -p 6379:6379 redis:7.2-alpine redis-server --requirepass devpassword
# Настроить переменные окружения
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=devpassword
export FLASK_SECRET_KEY=dev-secret-key
export LOG_LEVEL=DEBUG
# Запустить диспетчер подписок
python -m subscription_manager.app
# Запустить подписчика (в другом терминале)
export GLOBAL_BROKER_HOST=globalbroker.meteo.fr
python -m subscriber.manager
# Запустить worker Celery (в другом терминале)
celery -A task_manager.worker worker --loglevel=DEBUG
8.2. Сборка Docker
# Собрать все образы
docker-compose build
# Собрать конкретный сервис
docker-compose build celery
# Пересобрать без кеша
docker-compose build --no-cache
9. Отладка
9.1. Просмотр задач Celery
# Проверить активные задачи
docker exec -it wis2downloader-celery-1 \
celery -A task_manager.worker inspect active
# Проверить зарезервированные задачи
docker exec -it wis2downloader-celery-1 \
celery -A task_manager.worker inspect reserved
9.2. Инспектирование Redis
# Подключиться к Redis (использовать пароль из окружения)
docker exec -it redis redis-cli -a $REDIS_PASSWORD
# Список подписок
HGETALL global:subscriptions
# Проверить длину очереди
LLEN celery
# Просмотреть ключи дедупликации
KEYS wis2:notifications:*
# Просмотреть кеш каталога GDC (заполняется интерфейсом при запуске)
KEYS gdc:cache:*
TTL gdc:cache:CMA
Все команды Redis требуют аутентификации. Флаг -a $REDIS_PASSWORD передаёт пароль из вашего окружения.
|
9.3. Отладка MQTT
# Подписаться на топик вручную (mosquitto-clients)
mosquitto_sub -h globalbroker.meteo.fr -p 443 \
-t 'cache/a/wis2/de-dwd/data/#' \
-u everyone -P everyone \
--capath /etc/ssl/certs