Skip to content

Раздел 8: Celery Tasks

Проект: Мониторинг правовых изменений
Модуль: Lex / Celery
Версия: 1.0
Дата: Январь 2026


8.1 Обзор задач

Реестр задач

ЗадачаТипОчередьПериодичностьОписание
check_legal_updatesperiodicdefault08:00Проверка обновлений законодательства
analyze_documentasyncheavyПо событиюAI-анализ документа
send_legal_digestperiodicdefaultПонедельник 09:00Еженедельный дайджест
cleanup_old_alertsperiodicdefault03:00Очистка старых алертов
office_heartbeatperiodicdefault*/1Статус в Office Dashboard

8.2 Celery Beat Schedule

python
from celery.schedules import crontab

beat_schedule = {
    "lex-check-updates": {
        "task": "lex.tasks.check_legal_updates",
        "schedule": crontab(hour=8, minute=0),
        "options": {"queue": "default"}
    },
    "lex-weekly-digest": {
        "task": "lex.tasks.send_legal_digest",
        "schedule": crontab(day_of_week=1, hour=9, minute=0),
        "options": {"queue": "default"}
    },
    "lex-cleanup": {
        "task": "lex.tasks.cleanup_old_alerts",
        "schedule": crontab(hour=3, minute=0),
        "options": {"queue": "default"}
    },
    "lex-office-heartbeat": {
        "task": "lex.tasks.office_heartbeat",
        "schedule": 60.0,
        "options": {"queue": "default"}
    },
}

8.3 Основные задачи

python
from celery import shared_task
from app.utils.office_reporter import OfficeReporter

reporter = OfficeReporter(
    agent_id="lex_monitor",
    department="lex",
    name="Правовой мониторинг",
    salary_equivalent=70000,
    fte_coefficient=1.0
)

@shared_task(name='lex.tasks.check_legal_updates')
def check_legal_updates():
    """Проверка обновлений законодательства."""
    
    reporter.report_working("Проверка правовых обновлений")
    
    try:
        # ... логика проверки ...
        changes = fetch_legal_changes()
        
        if changes:
            for change in changes:
                analyze_document.delay(change.document_id)
        
        reporter.report_idle(metrics={
            "documents_monitored": get_monitored_count(),
            "changes_detected_today": len(changes)
        })
        
        return {"success": True, "changes": len(changes)}
        
    except Exception as e:
        reporter.report_error(f"Legal check: {e}")
        raise

analyze_document

python
@shared_task(
    name='lex.tasks.analyze_document',
    bind=True,
    max_retries=2,
    time_limit=300
)
def analyze_document(self, document_id: str):
    """AI-анализ правового документа."""
    
    reporter.report_working(f"Анализ документа {document_id[:8]}...")
    
    try:
        # ... AI-анализ ...
        analysis = run_legal_analysis(document_id)
        
        if analysis.is_critical:
            send_alert.delay(analysis.id)
        
        reporter.report_idle(metrics={
            "documents_monitored": get_monitored_count(),
            "changes_detected_today": get_daily_changes()
        })
        
        return {"success": True, "analysis_id": analysis.id}
        
    except Exception as e:
        reporter.report_error(f"Document analysis: {e}")
        raise self.retry(exc=e)

8.4 Интеграция с Office Dashboard

Агент Lex

agent_idnamesalary_equivalentfte_coefficient
lex_monitorПравовой мониторинг700001.0

Инициализация репортера

python
# app/tasks/lex/office.py

from app.utils.office_reporter import OfficeReporter

OFFICE_REPORTER = OfficeReporter(
    agent_id="lex_monitor",
    department="lex",
    name="Правовой мониторинг",
    salary_equivalent=70000,
    fte_coefficient=1.0
)

Heartbeat задача

python
@shared_task(name='lex.tasks.office_heartbeat')
def office_heartbeat():
    OFFICE_REPORTER.heartbeat()
    return {"success": True}

Метрики для Office

МетрикаОписание
documents_monitoredДокументов на мониторинге
changes_detected_todayИзменений обнаружено за день

Документ подготовлен: Январь 2026
Версия: 1.0
Статус: Черновик

Документация ADOLF Platform