Этот документ был автоматически переведён с английского языка с помощью Claude AI. Перед использованием в production-среде терминология из области WMO/метеорологии должна быть проверена носителем языка. Авторитетной версией является оригинал на английском языке.

1. Обзор архитектуры

WIS2 Downloader — это распределённая система, состоящая из четырёх основных компонентов:

                         Docker Compose
  ┌──────────────────────────────────────────────────────────┐
  │                                                           │
  │  Web UI (NiceGUI :8080)                                   │
  │    - reads GDC catalogue data from Redis cache            │
  │    - posts subscriptions to Subscription Manager API      │
  │                          │                                │
  │                          ▼                                │
  │  Subscription Manager (Flask API :5001)                   │
  │    - persists subscriptions in Redis                      │
  │    - publishes subscribe/unsubscribe commands via PubSub  │
  │                          │                                │
  │              Redis PubSub (commands)                      │
  │                          │                                │
  │                          ▼                                │
  │  Subscriber (MQTT Client)                                 │
  │    - connects to WIS2 Global Broker                       │
  │    - on notification: queues Celery download task         │
  │                          │                                │
  │              Redis (Celery queue + dedup state)           │
  │                          │                                │
  │                          ▼                                │
  │  Celery Workers                                           │
  │    - download file, verify hash, apply filters, save      │
  │                                                           │
  └──────────────────────────────────────────────────────────┘

1.1. Поток данных

  1. При запуске веб-интерфейс загружает записи WCMP2 из трёх конечных точек GDC (или из кэша Redis) и строит иерархию тем в памяти

  2. Пользователь просматривает каталог или дерево и выбирает тему в веб-интерфейсе

  3. Пользователь нажимает Подписаться → веб-интерфейс отправляет POST-запрос к REST API Subscription Manager

  4. Subscription Manager сохраняет подписку в Redis и публикует команду подписки в канал Redis PubSub

  5. Subscriber получает команду и подписывается на тему MQTT на WIS2 Global Broker

  6. Когда поступает уведомление WIS2, Subscriber помещает задачу загрузки Celery в очередь

  7. Рабочий процесс Celery загружает файл, проверяет хэш, применяет фильтры и сохраняет на диск

2. Структура модулей

modules/
├── shared/                 # Shared utilities
│   └── shared/
│       ├── __init__.py
│       ├── logging.py      # Centralized logging
│       └── redis_client.py # Redis client singleton
│
├── subscriber/             # MQTT subscriber service
│   └── subscriber/
│       ├── __init__.py
│       ├── manager.py      # Entry point, thread management
│       ├── subscriber.py   # MQTT client wrapper
│       └── command_listener.py  # Redis PubSub listener
│
├── subscription_manager/   # REST API service
│   └── subscription_manager/
│       ├── __init__.py
│       ├── app.py          # Flask application
│       └── static/
│           └── openapi.yml # API specification
│
├── task_manager/           # Celery tasks
│   └── task_manager/
│       ├── __init__.py
│       ├── worker.py       # Celery app configuration
│       ├── tasks/
│       │   └── wis2.py     # Download tasks
│       └── workflows/
│           └── __init__.py # Task chains
│
└── ui/                     # NiceGUI web interface (port 8080)
    ├── main.py             # Entry point, page route, AppState
    ├── layout.py           # PageLayout builder
    ├── config.py           # Environment variable config
    ├── data.py             # GDC data layer — fetch, merge, cache, hierarchy
    ├── assets/
    │   └── base.css        # All custom CSS
    ├── components/         # Reusable layout components
    ├── i18n/               # Internationalisation
    │   ├── __init__.py     # t(), current_lang(), is_rtl(), LANGUAGES
    │   ├── en.py           # English strings (source of truth)
    │   ├── fr.py           # French
    │   ├── es.py           # Spanish
    │   ├── ar.py           # Arabic (RTL)
    │   ├── zh.py           # Chinese
    │   └── ru.py           # Russian
    ├── models/
    │   └── wcmp2.py        # WCMP2Record dataclass
    └── views/              # Page views (catalogue, tree, subscriptions, etc.)

3. Детали модулей

3.1. Общий модуль

Предоставляет общие утилиты, используемые во всех сервисах.

3.1.1. Клиент Redis

modules/shared/shared/redis_client.py
from shared import get_redis_client

# Получить клиент Redis (singleton)
redis = get_redis_client()

# Использовать как обычный клиент Redis
redis.set('key', 'value')
redis.get('key')

Клиент:

  • Подключается напрямую к серверу Redis

  • Кэширует соединение (паттерн «одиночка»)

  • Содержит логику повторных попыток для устранения временных сбоев

3.1.2. Журналирование

modules/shared/shared/logging.py
from shared import setup_logging

# Настроить корневой логгер (вызывается один раз при запуске)
setup_logging()

# Получить логгер для конкретного модуля
LOGGER = setup_logging(__name__)

LOGGER.info("Message with UTC timestamp")

Возможности:

  • Метки времени в формате ISO 8601 в UTC

  • Настраивается через переменную окружения LOG_LEVEL

  • Вывод в stdout для сбора логов Docker

3.2. Модуль Subscriber

Подключается к WIS2 Global Broker через MQTT и обрабатывает входящие уведомления.

3.2.1. Точка входа

modules/subscriber/subscriber/manager.py
def run_manager():
    # 1. Create MQTT subscriber
    mqtt_subscriber = Subscriber(**broker_config)

    # 2. Create Redis command listener
    redis_listener = CommandListener(
        subscriber=mqtt_subscriber,
        channel=COMMAND_CHANNEL
    )

    # 3. Start MQTT in separate thread
    mqtt_thread = threading.Thread(target=mqtt_subscriber.start, daemon=True)
    mqtt_thread.start()

    # 4. Start Redis listener (blocks)
    redis_listener.start()

3.2.2. Подписчик MQTT

modules/subscriber/subscriber/subscriber.py
class Subscriber:
    def __init__(self, host, port, uid, pwd, protocol, session):
        # Configure MQTT client with callbacks
        self.client = mqtt.Client(...)
        self.client.on_message = self._on_message

    def _on_message(self, client, userdata, msg):
        # Parse notification, create Celery task
        job = {
            "topic": msg.topic,
            "target": target,
            "filters": filters,
            "payload": json.loads(msg.payload)
        }
        workflow = wis2_download(job)
        workflow.apply_async()

    def subscribe(self, topic, target, filters):
        self.client.subscribe(topic, qos=0)
        self.active_subscriptions[topic] = {...}

    def unsubscribe(self, topic):
        self.client.unsubscribe(topic)
        del self.active_subscriptions[topic]

3.2.3. Слушатель команд

modules/subscriber/subscriber/command_listener.py
class CommandListener(threading.Thread):
    def __init__(self, subscriber, channel):
        self.subscriber = subscriber
        self.pubsub = get_redis_client().pubsub()

    def run(self):
        self.pubsub.subscribe(self.channel)
        while not self.stop_event.is_set():
            message = self.pubsub.get_message()
            if message:
                self._process_command(message)

    def _process_command(self, message):
        command = json.loads(message['data'])
        if command['action'] == 'subscribe':
            self.subscriber.subscribe(
                command['topic'],
                command['save_path'],
                command['filters']
            )
        elif command['action'] == 'unsubscribe':
            self.subscriber.unsubscribe(command['topic'])

3.3. Модуль Subscription Manager

REST API на Flask для управления подписками.

3.3.1. Приложение Flask

modules/subscription_manager/subscription_manager/app.py
@app.post('/subscriptions')
def add_subscription():
    data = get_json()
    topic = normalise_topic(data.get('topic'))
    target = normalise_path(data.get('target', ''))
    filters = data.get('filters', {})

    # Publish command to subscriber via Redis PubSub
    command = {
        "action": "subscribe",
        "topic": topic,
        "save_path": target,
        "filters": filters
    }
    publish_command(command, COMMAND_CHANNEL)

    # Persist to Redis for durability
    persist_subscription(topic, target, filters)

    return jsonify({"status": "accepted", ...}), 202

3.3.2. Конечная точка метрик

Метрики хранятся в Redis с использованием атомарных операций HINCRBYFLOAT, что делает их безопасными при работе нескольких контейнеров Celery. Конечная точка читает данные из Redis и формирует вывод в формате Prometheus text format:

@app.route('/metrics')
def expose_metrics():
    from shared.redis_metrics import generate_prometheus_text, set_gauge
    set_gauge('celery_queue_length', {}, get_queue_length())
    return Response(generate_prometheus_text(), mimetype='text/plain')

Увеличение счётчика из любого воркера:

from shared import incr_counter
incr_counter('downloads_total', {'cache': hostname, 'media_type': mime_type})

3.4. Модуль Task Manager

Рабочие процессы Celery для загрузки и обработки файлов.

3.4.1. Конфигурация Celery

modules/task_manager/task_manager/worker.py
app = Celery('tasks',
             broker=CELERY_BROKER_URL,
             result_backend=CELERY_RESULT_BACKEND)

# Автоматическое обнаружение задач
app.autodiscover_tasks(['task_manager.tasks', 'task_manager.tasks.wis2'])

3.4.2. Задача загрузки

modules/task_manager/task_manager/tasks/wis2.py
@app.task(bind=True)
@metrics_collector
def download_from_wis2(self, job):
    result = {...}  # Initialize result dict

    # 1. Extract identifiers
    message_id = job['payload']['id']
    data_id = job['payload']['properties']['data_id']
    filehash = job['payload']['properties']['integrity']['value']

    # 2. Deduplication check
    for key, type in [(message_id, 'by-msg-id'), ...]:
        if get_status(key, type) == STATUS_SUCCESS:
            result['status'] = STATUS_SKIPPED
            return result

    # 3. Acquire lock
    lock_acquired = redis_client.set(lock_key, 1, nx=True, ex=LOCK_EXPIRE)
    if not lock_acquired:
        raise self.retry(countdown=10, max_retries=10)

    # 4. Download file
    response = _pool.request('GET', download_url, ...)

    # 5. Verify hash
    if hash_expected and hash_base64 != hash_expected:
        result['status'] = STATUS_FAILED
        return result

    # 6. Check media type filter
    if not _is_allowed_media_type(file_type, filters):
        result['status'] = STATUS_SKIPPED
        return result

    # 7. Save file
    output_path.write_bytes(data)
    result['status'] = STATUS_SUCCESS
    return result

3.4.3. Цепочки рабочих процессов

modules/task_manager/task_manager/workflows/init.py
def wis2_download(args):
    # Chain: download -> decode/ingest
    workflow = download_from_wis2.s(args) | decode_and_ingest.s()
    return workflow

3.4.4. Планировщик (периодические задачи)

Отдельное приложение Celery (scheduler.py) выполняет обслуживание системы с помощью Celery Beat. Оно использует базы данных Redis 2/3 для исключения конфликтов с очередями рабочих процессов загрузки (базы данных 0/1). Периодические задачи определены в tasks/scheduled_tasks.py:

Задача Интервал Назначение

check_disk_space

Каждые 5 мин

Устанавливает значения метрик disk_total_bytes, disk_used_bytes, disk_free_bytes

clean_directory

Каждые 10 мин

Удаляет файлы старше DOWNLOAD_RETENTION_PERIOD дней; уменьшает disk_downloads_bytes на размер каждого удалённого файла

recalibrate_downloads_size

Ежедневно

Полный обход os.walk для исправления возможного расхождения в значении метрики disk_downloads_bytes

Планировщик запускается как два сервиса Docker Compose: celery-scheduler-workers (выполняет задачи) и celery-beats (запускает их по расписанию).

4. Модуль пользовательского интерфейса

Интерфейс — это веб-приложение NiceGUI, работающее на порту 8080. Это основной пользовательский интерфейс для обнаружения наборов данных, просмотра иерархии тем WIS2 и создания подписок.

4.1. Структура модулей

modules/ui/
├── main.py                  # Application entry point, page route, AppState
├── layout.py                # PageLayout builder
├── config.py                # Environment variable config (SUBSCRIPTION_MANAGER_URL etc.)
├── data.py                  # GDC data layer — fetch, merge, cache, hierarchy
├── assets/
│   └── base.css             # All custom CSS (inside @layer components)
├── components/
│   ├── header.py            # WMO banner + toolbar + language selector
│   ├── footer.py            # Footer bar
│   ├── navigation_drawer.py # Left nav drawer with view links
│   └── page_body.py         # Main content area + right sidebar
├── i18n/                    # Internationalisation
│   ├── __init__.py          # t(), current_lang(), is_rtl(), LANGUAGES
│   ├── en.py                # English strings (source of truth)
│   ├── fr.py                # French
│   ├── es.py                # Spanish
│   ├── ar.py                # Arabic (RTL)
│   ├── zh.py                # Chinese
│   └── ru.py                # Russian
├── models/
│   └── wcmp2.py             # WCMP2Record dataclass (GeoJSON Feature)
└── views/
    ├── dashboard.py         # Grafana iframe embed
    ├── catalogue.py         # Catalogue search + result cards
    ├── tree.py              # Topic hierarchy tree
    ├── subscriptions.py     # Active subscriptions list
    ├── settings.py          # GDC status + refresh trigger
    └── shared.py            # Shared sidebar logic (on_topics_picked, confirm_subscribe, show_metadata)

4.2. Слой данных (data.py)

Слой данных отвечает за получение записей WCMP2 из трёх каталогов глобального обнаружения WIS2 (GDC), их объединение и построение двух кэшей на уровне модуля, из которых читают все представления.

4.2.1. Источники GDC

При запуске записи загружаются из трёх публичных конечных точек GDC:

Короткое имя URL

CMA

https://gdc.wis.cma.cn

DWD

https://wis2.dwd.de/gdc

ECCC

https://wis2-gdc.weather.gc.ca

Необработанные JSON-ответы кэшируются в Redis под ключами gdc:cache:CMA, gdc:cache:DWD и gdc:cache:ECCC с TTL, управляемым GDC_CACHE_TTL_SECONDS (по умолчанию 6 часов). Redis необязателен — если недоступен, данные загружаются по HTTP при каждом запуске.

4.2.2. Объединённые записи

После загрузки всех источников GDC _build_merged_records() дедуплицирует записи по ID между каталогами:

  • Записи, присутствующие только в одном каталоге, появляются один раз с одной записью source_gdcs.

  • Записи, присутствующие в нескольких каталогах, объединяются: source_gdcs перечисляет все вносящие вклад каталоги.

  • has_discrepancy устанавливается в True, если свойства, геометрия или ссылки различаются между каталогами.

  • Ссылки объединяются между каталогами, чтобы данные каналов из любого GDC сохранялись в объединённой записи.

Результат хранится в списке _merged_records на уровне модуля и возвращается функцией merged_records().

4.2.3. Иерархия тем

_build_topic_hierarchy() перебирает _merged_records и вставляет первый MQTT-канал cache/ каждой записи во вложенный словарь:

{
  "cache": {
    "children": {
      "a": {
        "children": {
          "wis2": {
            "children": {
              "de-dwd": {
                "children": {
                  ...
                  "synop": {
                    "datasets": [<WCMP2Record>, ...]
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

Узел может содержать как "children", так и "datasets", если один канал является префиксом другого.

Иерархия хранится в _topic_hierarchy и возвращается функцией topic_hierarchy(). Она используется tree.py для построения виджета ui.tree и get_datasets_for_channel() для разрешения наборов данных для заданной темы.

4.2.4. Ключевые функции

Функция Описание

merged_records() → list[MergedRecord]

Возвращает кэшированный список объединённых записей

topic_hierarchy() → dict

Возвращает кэшированную иерархию тем

get_datasets_for_channel(channel) → list[WCMP2Record]

Удаляет завершающий /#, навигирует по иерархии и рекурсивно собирает все наборы данных из этого узла и его дочерних элементов

scrape_all(force=False)

Загружает данные GDC (сначала кэш Redis, затем HTTP), перестраивает _merged_records и _topic_hierarchy

4.3. Модель WCMP2 (models/wcmp2.py)

WCMP2Record — это dataclass-представление GeoJSON Feature метаданных обнаружения WIS2.

from models.wcmp2 import WCMP2Record

rec = WCMP2Record.from_dict(feature_dict)

rec.id                   # str - unique dataset identifier
rec.title                # str | None
rec.description          # str | None
rec.keywords             # list[str]
rec.wmo_data_policy      # 'core' | 'recommended' | None
rec.geometry             # Geometry | None  (.type, .coordinates)
rec.links                # list[Link]  (.channel, .href, .rel, .extra)
rec.mqtt_channels        # list[str]  - channels from all links

Link.extra — это словарь, захватывающий внесхемные ключи из ответа GDC (например, специфичные для GDC filters, используемые для полей пользовательских фильтров на боковой панели подписки).

4.4. AppState

Каждый сеанс браузера имеет собственный экземпляр AppState (определён в main.py):

class AppState:
    def __init__(self):
        self.selected_topics: list[str] = []
        self.current_view: str = 'dashboard'

selected_topics хранит темы MQTT, выбранные в данный момент в интерфейсе, и управляет правой боковой панелью. current_view отслеживает имя активного представления и сохраняется в app.storage.user['current_view'] перед любой перезагрузкой страницы (например, при смене языка), чтобы пользователь вернулся к тому же представлению.

4.5. Представления

Все представления следуют одному и тому же соглашению: функция render(container, …​), которая строит элементы NiceGUI внутри предоставленного контейнерного элемента. render всегда является обычной (не асинхронной) функцией, чтобы её можно было синхронно вызывать из show_view() в main.py.

4.5.1. Представление каталога

views/catalogue.py обеспечивает полнотекстовый поиск по merged_records() с чистыми функциями-фильтрами:

Функция Описание

filter_feature(record, query)

Сопоставляет запрос с ID, заголовком, описанием, ключевыми словами и концепциями темы

filter_by_data_policy(record, policy)

Сопоставляет 'core', 'recommended' или 'all'

filter_by_keywords(record, keywords)

Ключевые слова через запятую; все должны присутствовать в записи

filter_by_bbox(record, bbox)

Пересечение геометрии Shapely с заданной ограничивающей рамкой

Карточки результатов отображаются функцией update_search_results(), которая также отвечает за разбивку на страницы.

4.5.2. Древовидное представление

views/tree.py преобразует topic_hierarchy() в узлы ui.tree через _to_tree_nodes() (рекурсивно, в алфавитном порядке). Для выбора используется on_select, обеспечивающий выбор одного узла — on_tick не используется, так как конфликтует с внутренней синхронизацией состояния NiceGUI.

4.5.3. Общая боковая панель (views/shared.py)

on_topics_picked(e, state, layout, is_page_selection, dataset_id) — центральный обработчик выбора в каталоге и дереве. Он:

  1. Обновляет state.selected_topics

  2. Строит правую боковую панель с полем ввода каталога сохранения, фильтрами наборов данных/типа медиа/ограничивающей рамки/даты и необязательными пользовательскими фильтрами

  3. Привязывает кнопку Подписаться к confirm_subscribe()

confirm_subscribe() показывает диалоговое окно с полной JSON-нагрузкой перед вызовом subscribe_to_topics().

show_metadata(dataset_id) находит запись из merged_records() и отображает диалоговое окно с деталями и интерактивной картой Leaflet.

4.6. Добавление нового представления

  1. Создайте modules/ui/views/myview.py с функцией render(container), используя t() для всех видимых пользователю строк

  2. Добавьте запись навигации в components/navigation_drawer.py — список NAV_ITEMS принимает кортеж (view_id, label_key, icon), где label_key — ключ i18n типа nav.*

  3. Добавьте ветку в диспетчер show_view() в main.py

  4. Добавьте новые CSS-классы в assets/base.css внутри блока @layer components { …​ }

  5. Добавьте все новые строковые ключи в i18n/en.py и в каждый другой языковой файл — смотрите Интернационализация (i18n)

4.7. Интернационализация (i18n)

Интерфейс поддерживает несколько языков через лёгкую систему i18n на основе словарей в modules/ui/i18n/.

4.7.1. Поддерживаемые языки

Код Язык Направление

en

English

LTR

fr

Français

LTR

es

Español

LTR

ar

العربية

RTL

zh

中文

LTR

ru

Русский

LTR

Активный язык хранится для каждого сеанса браузера в app.storage.user['lang'] и по умолчанию является английским. Пользователи меняют язык через селектор в панели инструментов заголовка; страница перезагружается для применения изменения.

4.7.2. Принцип работы t()

from i18n import t

# Простой поиск
label = t('nav.dashboard')                      # → 'Dashboard'

# С интерполяцией (использует str.format)
label = t('subscriptions.folder', path='./')    # → 'Folder: ./'

Цепочка поиска:

  1. Файл выбранного языка

  2. Английский (en.py) — резервный, если ключ отсутствует в выбранном языке

  3. Строка самого ключа — резервный, если также отсутствует в английском (виден при разработке)

t() читает app.storage.user['lang'] из текущего сеанса запроса NiceGUI. Всегда вызывайте её во время рендеринга (внутри обработчика @ui.page или обратного вызова события NiceGUI), а не во время импорта модуля.
Строки, содержащие буквальные символы { или } (например, примеры JSON в тексте подсказок), должны использовать одинарные скобки как есть. Не используйте экранирование {{ / }}t() вызывает .format(**kwargs) только при передаче именованных аргументов, поэтому {{ будет отображаться буквально.

4.7.3. Соглашения об именовании ключей

Ключи используют пространства имён, разделённые точками:

Префикс Область Пример

nav.*

Метки навигационного ящика

nav.dashboard

btn.*

Метки кнопок

btn.subscribe

sidebar.*

Боковая панель подписки

sidebar.save_directory

catalogue.*

Представление каталога

catalogue.search_label

tree.*

Древовидное представление

tree.filter_label

subscriptions.*

Представление управления подписками

subscriptions.folder

settings.*

Представление настроек

settings.title

manual.*

Представление ручной подписки

manual.topic_label

manual.val.*

Сообщения валидации ручной подписки

manual.val.topic_required

dialog.*

Заголовки диалоговых окон

dialog.confirm_title

metadata.*

Диалоговое окно метаданных

metadata.title

validation.*

Общие сообщения валидации

validation.date_format

footer.*

Нижний колонтитул

footer.copyright

aria.*

Метки доступности ARIA

aria.toggle_nav

4.7.4. Поддержка RTL

Арабский (ar) — язык с письмом справа налево. При выборе арабского is_rtl() возвращает True, а обработчик on_connect() в main.py через JavaScript добавляет dir="rtl" на элемент <html>, активируя режим RTL-макета Quasar. Пользовательские CSS-переопределения в assets/base.css (под блоком комментария RTL overrides) исправляют физические отступы Quasar для .q-page-container и перепозиционируют боковую панель подписки.

Чтобы добавить ещё один RTL-язык, добавьте его код в RTL_LANGUAGES в init.py — никаких других изменений не требуется.

4.7.5. Добавление нового языка

Шаг 1 — Создание языкового файла

Скопируйте modules/ui/i18n/en.py в modules/ui/i18n/{code}.py и переведите все значения. Сохраните каждый ключ; не удаляйте ни одного. Сохраните имена {placeholder} точно так, как они появляются в английском источнике:

# modules/ui/i18n/de.py
"""German strings."""

STRINGS: dict[str, str] = {
    'nav.dashboard':        'Dashboard',
    'nav.catalogue':        'Katalogsuche',
    'nav.tree':             'Baumsuche',
    'nav.manual':           'Manuell abonnieren',
    'nav.manage':           'Abonnements verwalten',
    'nav.settings':         'Einstellungen',
    # ... all remaining keys ...
    'subscriptions.folder': 'Ordner: {path}',   # {path} must be preserved
    'settings.records':     '{name}: {count} Einträge',
}
Шаг 2 — Регистрация языка в init.py
from . import ar, de, en, es, fr, ru, zh        (1)

LANGUAGES: dict[str, str] = {
    'en': 'English',
    'fr': 'Français',
    'es': 'Español',
    'ar': 'العربية',
    'zh': '中文',
    'ru': 'Русский',
    'de': 'Deutsch',                             (2)
}

_STRINGS: dict[str, dict[str, str]] = {
    'en': en.STRINGS,
    'fr': fr.STRINGS,
    'es': es.STRINGS,
    'ar': ar.STRINGS,
    'zh': zh.STRINGS,
    'ru': ru.STRINGS,
    'de': de.STRINGS,                            (3)
}
1 Добавьте импорт
2 Добавьте отображаемое имя (на родном шрифте)
3 Добавьте маппинг строк
Шаг 3 — Пометить как RTL при необходимости

Если язык пишется справа налево (например, иврит he, персидский fa):

RTL_LANGUAGES: frozenset[str] = frozenset({'ar', 'he'})
Шаг 4 — Перезапустить интерфейс
docker-compose restart wis2downloader-ui

Новый язык немедленно появится в селекторе языка в заголовке.

Существующие нeanглийские переводы созданы машинным способом и предоставляются только как отправная точка. Все строки должны быть проверены носителем языка перед production-развёртыванием, с особым вниманием к терминологии WMO/метеорологической области (WIS2, BUFR, GRIB, Global Cache и т.д.), которая имеет устоявшиеся переводы в официальных документах WMO.

4.7.6. Добавление новых переводимых строк

При добавлении текста интерфейса в любое представление или компонент:

Шаг 1 — Добавить в en.py (источник истины)
# modules/ui/i18n/en.py
'myview.title':       'My New View',
'myview.description': 'Showing results for {topic}.',
Шаг 2 — Добавить во все другие языковые файлы

Скопируйте английское значение как заполнитель, если перевод ещё недоступен. t() автоматически откатывается к английскому, но наличие ключа предотвращает пробелы в инструментарии:

# fr.py / es.py / ar.py / zh.py / ru.py
'myview.title':       'My New View',         # TODO: translate
'myview.description': 'Showing results for {topic}.',
Шаг 3 — Использовать t() в представлении
from i18n import t

def render(container):
    with container:
        ui.label(t('myview.title')).classes('page-title')
        ui.label(t('myview.description', topic=selected_topic))

5. Межсервисное взаимодействие

5.1. Redis PubSub (Команды)

Взаимодействие между Subscription Manager и Subscriber использует Redis PubSub на канале subscription_commands:

// Channel: subscription_commands

// Subscribe command
{
  "action": "subscribe",
  "topic": "cache/a/wis2/+/data/#",
  "save_path": "all-data",
  "filters": {"accepted_media_types": ["application/bufr"]}
}

// Unsubscribe command
{
  "action": "unsubscribe",
  "topic": "cache/a/wis2/+/data/#"
}

5.2. Задачи Celery (Загрузки)

Взаимодействие между Subscriber и рабочими процессами Celery использует очередь задач Celery:

job = {
    "topic": "cache/a/wis2/de-dwd/data/...",
    "target": "dwd-data",
    "filters": {"accepted_media_types": [...]},
    "_broker": "globalbroker.meteo.fr",
    "_received": "2026-01-28 10:15:30",
    "_queued": "2026-01-28 10:15:30",
    "payload": {
        "id": "...",
        "properties": {
            "data_id": "...",
            "metadata_id": "...",
            "integrity": {"method": "sha512", "value": "..."}
        },
        "links": [
            {"rel": "canonical", "href": "https://..."}
        ]
    }
}

5.3. Ключи Redis

Шаблон ключа Назначение

global:subscriptions

Хэш всех подписок (sub_id → JSON {id, topic, save_path, filter})

wis2:notification:status:{type}:{id}

Отслеживание дедупликации (типы: by-msg-id, by-data-id, by-hash)

wis2:notification:data:lock:{id}

Распределённая блокировка для предотвращения параллельных дублирующих загрузок

gdc:cache:{name}

Кэшированный JSON каталога GDC (CMA, DWD, ECCC); TTL задаётся переменной GDC_CACHE_TTL_SECONDS

wis2:metrics:{metric_name}

Хэш метрик Prometheus (поле = JSON-словарь меток, значение = счётчик/метрика типа float)

subscriber:health:{id}

Контрольный сигнал работоспособности подписчика

celery

Очередь задач Celery (по умолчанию)

6. Расширение системы

6.1. Добавление нового условия совпадения в механизм фильтрации

Механизм фильтрации находится в modules/shared/shared/filters.py. Каждое условие совпадения — это ключ в объекте match, обрабатываемый функцией _evaluate_match().

Чтобы добавить новое встроенное условие (например, совпадение по новому полю метаданных station_id):

  1. Добавьте поле в MatchContext в filters.py:

    @dataclass
    class MatchContext:
        ...
        station_id: str | None = None
  2. Заполните его в _build_context() в wis2.py (до и/или после загрузки).

  3. Добавьте ветку в _evaluate_match():

    if 'station_id' in match:
        return _match_string_field(ctx.station_id, match['station_id'])
  4. Задокументируйте новое поле в openapi.yml и в справочнике по фильтрам в руководстве пользователя.

Изменения в app.py, command_listener.py или subscriber.py не требуются — объект фильтра передаётся без изменений и вычисляется во время загрузки.

6.2. Добавление новой задачи

  1. Создайте задачу в modules/task_manager/task_manager/tasks/

  2. Зарегистрируйте в автообнаружении worker.py

  3. При необходимости добавьте в рабочий процесс

# modules/task_manager/task_manager/tasks/custom.py
from task_manager.worker import app

@app.task
def my_custom_task(data):
    # Process data
    return result

6.3. Добавление новой метрики

Метрики хранятся в Redis через shared.redis_metrics. Чтобы добавить новую метрику:

  1. Зарегистрируйте её в словаре METRICS в modules/shared/shared/redis_metrics.py:

    # short name → (prometheus type, help text)
    METRICS: dict[str, tuple[str, str]] = {
        ...
        'my_counter_total': ('counter', 'Description of the counter.'),
        'my_gauge':         ('gauge',   'Description of the gauge.'),
    }
  2. Увеличивайте её из любого сервиса (счётчик) или устанавливайте значение (метрика):

    from shared import incr_counter, set_gauge
    
    # Счётчик (например, в задаче Celery)
    incr_counter('my_counter_total', {'label1': 'value', 'label2': 'value'})
    
    # Датчик (например, в запланированной задаче)
    set_gauge('my_gauge', {'label1': 'value'}, 42.0)

В вывод /metrics попадают только метрики, зарегистрированные в словаре METRICS.

7. Тестирование

7.1. Запуск тестов

# Установить тестовые зависимости
pip install pytest pytest-cov

# Запустить тесты
pytest modules/

# С покрытием
pytest --cov=modules modules/

7.2. Ручное тестирование

# Запустить сервисы
docker-compose up -d

# Создать тестовую подписку
curl -X POST http://localhost:5002/subscriptions \
  -H "Content-Type: application/json" \
  -d '{"topic": "cache/a/wis2/de-dwd/data/#", "target": "test"}'

# Следить за журналами
docker-compose logs -f subscriber celery

# Проверить загрузки
ls -la downloads/test/

8. Настройка среды разработки

8.1. Локальная разработка

# Создать виртуальное окружение
python -m venv venv
source venv/bin/activate

# Установить модули в режиме редактирования
pip install -e modules/shared
pip install -e modules/task_manager
pip install -e modules/subscriber
pip install -e modules/subscription_manager

# Запустить Redis (единственный экземпляр для разработки)
docker run -d -p 6379:6379 redis:7.2-alpine redis-server --requirepass devpassword

# Настроить переменные окружения
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=devpassword
export FLASK_SECRET_KEY=dev-secret-key
export LOG_LEVEL=DEBUG

# Запустить диспетчер подписок
python -m subscription_manager.app

# Запустить подписчика (в другом терминале)
export GLOBAL_BROKER_HOST=globalbroker.meteo.fr
python -m subscriber.manager

# Запустить worker Celery (в другом терминале)
celery -A task_manager.worker worker --loglevel=DEBUG

8.2. Сборка Docker

# Собрать все образы
docker-compose build

# Собрать конкретный сервис
docker-compose build celery

# Пересобрать без кеша
docker-compose build --no-cache

9. Отладка

9.1. Просмотр задач Celery

# Проверить активные задачи
docker exec -it wis2downloader-celery-1 \
  celery -A task_manager.worker inspect active

# Проверить зарезервированные задачи
docker exec -it wis2downloader-celery-1 \
  celery -A task_manager.worker inspect reserved

9.2. Инспектирование Redis

# Подключиться к Redis (использовать пароль из окружения)
docker exec -it redis redis-cli -a $REDIS_PASSWORD

# Список подписок
HGETALL global:subscriptions

# Проверить длину очереди
LLEN celery

# Просмотреть ключи дедупликации
KEYS wis2:notifications:*

# Просмотреть кеш каталога GDC (заполняется интерфейсом при запуске)
KEYS gdc:cache:*
TTL gdc:cache:CMA
Все команды Redis требуют аутентификации. Флаг -a $REDIS_PASSWORD передаёт пароль из вашего окружения.

9.3. Отладка MQTT

# Подписаться на топик вручную (mosquitto-clients)
mosquitto_sub -h globalbroker.meteo.fr -p 443 \
  -t 'cache/a/wis2/de-dwd/data/#' \
  -u everyone -P everyone \
  --capath /etc/ssl/certs