Skip to content

Раздел 2: Polling

Проект: Интеллектуальная система управления репутацией
Модуль: Reputation / Polling Service
Версия: 2.1
Дата: Январь 2026


2.1 Назначение и принципы

Назначение

Polling Service — компонент модуля Reputation, отвечающий за периодический сбор отзывов и вопросов покупателей с маркетплейсов Wildberries, Ozon и Яндекс.Маркет.

Принцип работы

Pull-модель: Система периодически опрашивает API маркетплейсов для получения новых данных.

КритерийPull (polling)Push (webhook)
Единообразие✅ Одинаково для всех площадок❌ Не все поддерживают
Контроль нагрузки✅ Полный контроль❌ Зависит от источника
Надёжность✅ Retry при сбоях❌ Потеря при недоступности
Сложность✅ Проще реализация❌ Требует endpoint + верификация

Ключевые принципы

ПринципОписание
ИдемпотентностьПовторный polling не создаёт дубликатов
Rate Limit ComplianceСтрогое соблюдение лимитов API
Graceful DegradationСбой одной площадки не влияет на другие
Incremental SyncЗапрос только новых данных (с last_poll_time)
Distributed ScheduleСмещение задач для равномерной нагрузки

2.2 Архитектура Polling Service

Компоненты

Взаимодействие компонентов

КомпонентОтветственностьЗависимости
Celery BeatЗапуск задач по расписаниюRedis
Task DispatcherМаршрутизация по платформамAdapters
AdaptersВзаимодействие с APIRate Limiter
Rate LimiterСоблюдение лимитовRedis
DeduplicationПроверка уникальностиPostgreSQL

2.3 Celery Tasks

Реестр задач polling

ЗадачаТипОчередьСмещениеОписание
reputation.poll_wb_reviewsperiodicdefault:00Отзывы Wildberries
reputation.poll_wb_questionsperiodicdefault:50Вопросы Wildberries
reputation.poll_ozon_reviewsperiodicdefault1:40Отзывы Ozon
reputation.poll_ozon_questionsperiodicdefault2:30Вопросы Ozon
reputation.poll_ym_reviewsperiodicdefault3:20Отзывы Яндекс.Маркет
reputation.poll_ym_questionsperiodicdefault4:10Вопросы Яндекс.Маркет

Конфигурация Celery Beat

python
from celery.schedules import crontab

# Расписание с распределённым смещением
# Интервал 5 минут, смещение ~50 секунд между задачами
beat_schedule = {
    # Wildberries Reviews: */5 :00
    "poll-wb-reviews": {
        "task": "tasks.reputation_tasks.poll_wb_reviews",
        "schedule": crontab(minute="*/5"),
        "options": {"queue": "default"}
    },
    # Wildberries Questions: */5 :50
    "poll-wb-questions": {
        "task": "tasks.reputation_tasks.poll_wb_questions",
        "schedule": crontab(minute="0,5,10,15,20,25,30,35,40,45,50,55"),
        "options": {"queue": "default", "countdown": 50}
    },
    
    # Ozon Reviews: */5 1:40
    "poll-ozon-reviews": {
        "task": "tasks.reputation_tasks.poll_ozon_reviews",
        "schedule": crontab(minute="1,6,11,16,21,26,31,36,41,46,51,56"),
        "options": {"queue": "default", "countdown": 40}
    },
    # Ozon Questions: */5 2:30
    "poll-ozon-questions": {
        "task": "tasks.reputation_tasks.poll_ozon_questions",
        "schedule": crontab(minute="2,7,12,17,22,27,32,37,42,47,52,57"),
        "options": {"queue": "default", "countdown": 30}
    },
    
    # Яндекс.Маркет Reviews: */5 3:20
    "poll-ym-reviews": {
        "task": "tasks.reputation_tasks.poll_ym_reviews",
        "schedule": crontab(minute="3,8,13,18,23,28,33,38,43,48,53,58"),
        "options": {"queue": "default", "countdown": 20}
    },
    # Яндекс.Маркет Questions: */5 4:10
    "poll-ym-questions": {
        "task": "tasks.reputation_tasks.poll_ym_questions",
        "schedule": crontab(minute="4,9,14,19,24,29,34,39,44,49,54,59"),
        "options": {"queue": "default", "countdown": 10}
    },
}

Визуализация распределения

Минута 0:  WB Reviews  ─────────────────────────────────────────
           :00         :50
Минута 1:              ───── WB Questions ─────
                             1:40
Минута 2:                    ───── Ozon Reviews ─────
                                   2:30
Минута 3:                          ───── Ozon Questions ─────
                                         3:20
Минута 4:                                ───── YM Reviews ─────
                                               4:10
Минута 5:  WB Reviews  ────────────────────── YM Questions ─────

Алгоритм задачи polling


2.4 Credentials Management

Environment Variables

Credentials для API маркетплейсов хранятся в environment variables:

bash
# Wildberries
WB_API_TOKEN=xxx
WB_API_TOKEN_EXPIRES=2026-06-01T00:00:00Z

# Ozon
OZON_CLIENT_ID=xxx
OZON_API_KEY=xxx

# Яндекс.Маркет
YM_OAUTH_TOKEN=xxx
YM_CAMPAIGN_ID=xxx

Загрузка credentials

python
# config/marketplace_credentials.py
import os
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class WBCredentials:
    api_token: str
    expires_at: Optional[datetime] = None

@dataclass
class OzonCredentials:
    client_id: str
    api_key: str

@dataclass
class YMCredentials:
    oauth_token: str
    campaign_id: str

def get_wb_credentials() -> WBCredentials:
    expires_str = os.getenv("WB_API_TOKEN_EXPIRES")
    expires_at = datetime.fromisoformat(expires_str) if expires_str else None
    return WBCredentials(
        api_token=os.environ["WB_API_TOKEN"],
        expires_at=expires_at
    )

def get_ozon_credentials() -> OzonCredentials:
    return OzonCredentials(
        client_id=os.environ["OZON_CLIENT_ID"],
        api_key=os.environ["OZON_API_KEY"]
    )

def get_ym_credentials() -> YMCredentials:
    return YMCredentials(
        oauth_token=os.environ["YM_OAUTH_TOKEN"],
        campaign_id=os.environ["YM_CAMPAIGN_ID"]
    )

2.5 Wildberries Adapter

API Endpoints

EndpointМетодНазначение
/api/v1/feedbacksGETСписок отзывов
/api/v1/questionsGETСписок вопросов

Base URL: https://feedbacks-api.wildberries.ru

Параметры запроса

ПараметрТипОбязательныйОписание
isAnsweredbooleanНетФильтр по наличию ответа
takeintДаКоличество (макс. 10000)
skipintДаСмещение для пагинации
dateFromintНетUnix timestamp начала
dateTointНетUnix timestamp конца

Маппинг полей WB → reputation_items

WB полеreputation_items полеПреобразование
idexternal_idПрямое
nmIdskustr(nmId)
userNameclient_nameПрямое
textclient_textПрямое
productValuationratingПрямое (1-5)
createdDatecreated_atISO → datetime
answerЕсли есть → status='published'

Rate Limits

EndpointЛимитПериод
/api/v1/feedbacks1001 минута
/api/v1/questions1001 минута

2.6 Ozon Adapter

API Endpoints

EndpointМетодНазначение
/v1/review/listPOSTСписок отзывов
/v1/product/questionsPOSTСписок вопросов

Base URL: https://api-seller.ozon.ru

Заголовки авторизации

Client-Id: {OZON_CLIENT_ID}
Api-Key: {OZON_API_KEY}
Content-Type: application/json

Параметры запроса

json
{
  "filter": {
    "interaction_status": "NEED_ANSWER",
    "published_at_from": "2026-01-01T00:00:00Z"
  },
  "last_id": "",
  "limit": 100,
  "sort_by": "PUBLISHED_AT",
  "sort_dir": "DESC"
}

Rate Limits

EndpointЛимитПериод
/v1/review/list601 минута
/v1/product/questions601 минута

2.7 Яндекс.Маркет Adapter

API Endpoints

EndpointМетодНазначение
/campaigns/{id}/feedback/updatesGETСписок отзывов
/campaigns/{id}/offers/{id}/questionsGETСписок вопросов

Base URL: https://api.partner.market.yandex.ru

Заголовки авторизации

Authorization: Bearer {YM_OAUTH_TOKEN}
Content-Type: application/json

Rate Limits

EndpointЛимитПериод
Все endpoints1001 минута

2.8 Rate Limiter

Реализация

python
# services/rate_limiter.py
import redis
import time
from typing import Optional

class RateLimiter:
    """Token bucket rate limiter с Redis backend."""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    def acquire(
        self, 
        key: str, 
        max_tokens: int, 
        refill_rate: float,
        timeout: float = 30.0
    ) -> bool:
        """
        Попытка получить токен.
        
        Args:
            key: Уникальный ключ лимита (например, 'wb:feedbacks')
            max_tokens: Максимальное количество токенов
            refill_rate: Скорость пополнения (токенов/сек)
            timeout: Максимальное время ожидания
        
        Returns:
            True если токен получен, False если timeout
        """
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            tokens = self._get_tokens(key, max_tokens, refill_rate)
            
            if tokens >= 1:
                self._consume_token(key)
                return True
            
            # Ждём пополнения
            wait_time = 1 / refill_rate
            time.sleep(min(wait_time, 1.0))
        
        return False
    
    def _get_tokens(self, key: str, max_tokens: int, refill_rate: float) -> float:
        """Получить текущее количество токенов."""
        pipe = self.redis.pipeline()
        now = time.time()
        
        # Получить последнее обновление и количество токенов
        data = self.redis.hgetall(f"ratelimit:{key}")
        
        if not data:
            # Инициализация: полный bucket
            self.redis.hset(f"ratelimit:{key}", mapping={
                "tokens": max_tokens,
                "last_update": now
            })
            return max_tokens
        
        tokens = float(data.get(b"tokens", max_tokens))
        last_update = float(data.get(b"last_update", now))
        
        # Пополнение токенов
        elapsed = now - last_update
        tokens = min(max_tokens, tokens + elapsed * refill_rate)
        
        # Обновить состояние
        self.redis.hset(f"ratelimit:{key}", mapping={
            "tokens": tokens,
            "last_update": now
        })
        
        return tokens
    
    def _consume_token(self, key: str):
        """Использовать один токен."""
        self.redis.hincrbyfloat(f"ratelimit:{key}", "tokens", -1)

Конфигурация лимитов

python
RATE_LIMITS = {
    "wb:feedbacks": {"max_tokens": 100, "refill_rate": 100/60},
    "wb:questions": {"max_tokens": 100, "refill_rate": 100/60},
    "ozon:reviews": {"max_tokens": 60, "refill_rate": 60/60},
    "ozon:questions": {"max_tokens": 60, "refill_rate": 60/60},
    "ym:feedbacks": {"max_tokens": 100, "refill_rate": 100/60},
    "ym:questions": {"max_tokens": 100, "refill_rate": 100/60},
}

2.9 Circuit Breaker

Реализация

python
# services/circuit_breaker.py
from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker для защиты от каскадных сбоев."""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        reset_timeout: int = 60,
        db_session = None
    ):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.db = db_session
    
    def get_state(self, platform: str, item_type: str) -> CircuitState:
        """Получить состояние circuit breaker."""
        state = self.db.query(PollingState).filter(
            PollingState.platform == platform,
            PollingState.item_type == item_type
        ).first()
        
        if not state:
            return CircuitState.CLOSED
        
        if state.circuit_breaker_open:
            if datetime.utcnow() > state.circuit_breaker_until:
                return CircuitState.HALF_OPEN
            return CircuitState.OPEN
        
        return CircuitState.CLOSED
    
    def record_success(self, platform: str, item_type: str):
        """Записать успешный запрос."""
        self.db.query(PollingState).filter(
            PollingState.platform == platform,
            PollingState.item_type == item_type
        ).update({
            "consecutive_errors": 0,
            "circuit_breaker_open": False,
            "circuit_breaker_until": None,
            "last_poll_status": "success"
        })
        self.db.commit()
    
    def record_failure(self, platform: str, item_type: str, error: str):
        """Записать неудачный запрос."""
        state = self.db.query(PollingState).filter(
            PollingState.platform == platform,
            PollingState.item_type == item_type
        ).first()
        
        state.consecutive_errors += 1
        state.last_poll_status = "error"
        state.last_poll_error = error
        
        if state.consecutive_errors >= self.failure_threshold:
            state.circuit_breaker_open = True
            state.circuit_breaker_until = datetime.utcnow() + timedelta(seconds=self.reset_timeout)
        
        self.db.commit()

2.10 Обработка ошибок

Единая политика

ПараметрЗначение
Timeout запроса30 секунд
Retry стратегияExponential backoff: 1с → 2с → 4с
Максимум попыток3
Circuit breaker открытиеПосле 5 последовательных ошибок
Circuit breaker reset60 секунд

Типы ошибок и реакция

КодТипДействие
200SuccessОбработать данные
400Bad RequestЛогировать, пропустить
401/403Auth ErrorАлерт администратору
404Not FoundЛогировать, пропустить
429Rate LimitExponential backoff
500-599Server ErrorRetry через 30 сек
TimeoutNetworkRetry с backoff

2.11 Мониторинг и метрики

Метрики polling

МетрикаТипОписание
reputation_polling_duration_secondshistogramВремя выполнения polling
reputation_polling_items_fetchedcounterКоличество полученных items
reputation_polling_errors_totalcounterКоличество ошибок
reputation_polling_duplicates_skippedcounterПропущенные дубликаты
reputation_circuit_breaker_stategaugeСостояние circuit breaker

Логирование

python
# Структура лога polling
{
    "event": "polling_completed",
    "platform": "wb",
    "item_type": "review",
    "duration_ms": 1234,
    "items_fetched": 15,
    "items_new": 10,
    "items_duplicate": 5,
    "errors": 0,
    "timestamp": "2026-01-10T14:30:00Z"
}

Приложение А: Контрольные точки

КритерийПроверка
Celery Beat запущенcelery -A app inspect scheduled
Задачи выполняютсяЛоги показывают polling каждые 5 мин
Дедупликация работаетПовторный polling не создаёт дубликатов
Rate Limiter работаетНет 429 ошибок
Circuit BreakerОткрывается при 5 ошибках подряд
polling_state обновляетсяlast_poll_time актуален

Документ подготовлен: Январь 2026
Версия: 2.1
Статус: Согласовано

Документация ADOLF Platform