Skip to content

Раздел 7: Celery Tasks

Проект: Предиктивная аналитика товарных ниш
Модуль: Scout / Celery
Версия: 1.1
Дата: Январь 2026


7.1 Обзор задач

Реестр задач

ЗадачаТипОчередьПериодичностьОписание
update_trend_dataperiodicdefault06:00Обновление трендов
analyze_nicheasyncheavyПо запросуАнализ ниши
calculate_opportunity_scoreasyncdefaultПо событиюРасчёт оценки
export_reportasyncexportПо запросуЭкспорт отчёта
cleanup_old_analysesperiodicdefault03:00Очистка старых данных
office_heartbeatperiodicdefault*/1Статус в Office Dashboard

7.2 Celery Beat Schedule

python
from celery.schedules import crontab

beat_schedule = {
    "scout-update-trends": {
        "task": "scout.tasks.update_trend_data",
        "schedule": crontab(hour=6, minute=0),
        "options": {"queue": "default"}
    },
    "scout-cleanup": {
        "task": "scout.tasks.cleanup_old_analyses",
        "schedule": crontab(hour=3, minute=0),
        "options": {"queue": "default"}
    },
    "scout-office-heartbeat": {
        "task": "scout.tasks.office_heartbeat",
        "schedule": 60.0,
        "options": {"queue": "default"}
    },
}

7.3 Основные задачи

update_trend_data

python
from celery import shared_task
from app.utils.office_reporter import OfficeReporter

reporter = OfficeReporter(
    agent_id="scout_niche",
    department="scout",
    name="Анализ ниш",
    salary_equivalent=70000,
    fte_coefficient=1.0
)

@shared_task(name='scout.tasks.update_trend_data')
def update_trend_data():
    """Обновление данных трендов."""
    
    reporter.report_working("Обновление данных трендов")
    
    try:
        # ... логика обновления ...
        trends_updated = fetch_and_update_trends()
        
        reporter.report_idle(metrics={
            "trends_updated": trends_updated,
            "last_update": datetime.utcnow().isoformat()
        })
        
        return {"success": True, "updated": trends_updated}
        
    except Exception as e:
        reporter.report_error(str(e))
        raise

analyze_niche

python
@shared_task(
    name='scout.tasks.analyze_niche',
    bind=True,
    max_retries=2,
    time_limit=300
)
def analyze_niche(self, niche_id: str, params: dict):
    """Глубокий анализ товарной ниши."""
    
    reporter.report_working(f"Анализ ниши {niche_id}")
    
    try:
        # ... AI-анализ ...
        result = run_niche_analysis(niche_id, params)
        
        reporter.report_idle(metrics={
            "niches_analyzed_today": get_daily_count(),
            "opportunities_found": result.opportunities_count
        })
        
        return result.to_dict()
        
    except Exception as e:
        reporter.report_error(f"Анализ ниши: {e}")
        raise self.retry(exc=e)

7.4 Интеграция с Office Dashboard

Агент Scout

agent_idnamesalary_equivalentfte_coefficient
scout_nicheАнализ ниш700001.0

Инициализация репортера

python
# app/tasks/scout/office.py

from app.utils.office_reporter import OfficeReporter

OFFICE_REPORTER = OfficeReporter(
    agent_id="scout_niche",
    department="scout",
    name="Анализ ниш",
    salary_equivalent=70000,
    fte_coefficient=1.0
)

Heartbeat задача

python
@shared_task(name='scout.tasks.office_heartbeat')
def office_heartbeat():
    OFFICE_REPORTER.heartbeat()
    return {"success": True}

Метрики для Office

МетрикаОписание
niches_analyzed_todayПроанализировано ниш за день
opportunities_foundНайдено возможностей
trends_updatedОбновлено трендов

Документ подготовлен: Январь 2026
Версия: 1.1
Статус: Черновик

Документация ADOLF Platform