Skip to content

Раздел 2: Marketplace Adapters

Проект: Автоматическое управление рекламными кампаниями
Модуль: Marketing / Marketplace Adapters
Версия: 1.0
Дата: Январь 2026


2.1 Обзор

Назначение

Marketplace Adapters — компоненты модуля Marketing, обеспечивающие взаимодействие с рекламными API маркетплейсов. Каждый адаптер реализует унифицированный интерфейс и инкапсулирует специфику работы с конкретным API.

Поддерживаемые маркетплейсы

МаркетплейсAPIТипы рекламы
WildberriesPromotion APIАукцион, Автоматическая, Каталог
OzonPerformance APIТрафареты, Поиск, Брендовая полка
Яндекс.МаркетBidding APIБуст продаж, Рекомендации

Архитектура адаптеров


2.2 Унифицированный интерфейс

2.2.1 Базовый класс адаптера

python
# services/marketing/adapters/base.py

from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from datetime import date, datetime
from enum import Enum


class CampaignType(Enum):
    """Типы рекламных кампаний."""
    # Wildberries
    WB_AUCTION = "wb_auction"           # Аукцион (поиск)
    WB_AUTO = "wb_auto"                 # Автоматическая
    WB_CATALOG = "wb_catalog"           # Каталог
    
    # Ozon
    OZON_TRAFARETS = "ozon_trafarets"   # Трафареты
    OZON_SEARCH = "ozon_search"         # Продвижение в поиске
    OZON_BRAND_SHELF = "ozon_brand"     # Брендовая полка
    
    # Яндекс.Маркет
    YM_BOOST = "ym_boost"               # Буст продаж
    YM_RECOMMENDATIONS = "ym_reco"      # Рекомендации


class CampaignStatus(Enum):
    """Статусы кампании."""
    DRAFT = "draft"
    ACTIVE = "active"
    PAUSED = "paused"
    ARCHIVED = "archived"
    ERROR = "error"


class KeywordStatus(Enum):
    """Статусы ключевого слова."""
    ACTIVE = "active"
    PAUSED = "paused"
    REJECTED = "rejected"


class MatchType(Enum):
    """Типы соответствия ключевых слов."""
    EXACT = "exact"
    PHRASE = "phrase"
    BROAD = "broad"


@dataclass
class Campaign:
    """Унифицированная модель рекламной кампании."""
    id: Optional[str] = None           # Внутренний ID
    external_id: Optional[str] = None  # ID на маркетплейсе
    marketplace: str = ""              # wb, ozon, ym
    campaign_type: CampaignType = None
    name: str = ""
    status: CampaignStatus = CampaignStatus.DRAFT
    
    # Бюджеты
    daily_budget: float = 0.0
    total_budget: Optional[float] = None
    spent_today: float = 0.0
    spent_total: float = 0.0
    
    # Даты
    start_date: Optional[date] = None
    end_date: Optional[date] = None
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None
    
    # Бренд и SKU
    brand_id: str = ""
    sku_list: List[str] = None
    
    # Настройки стратегии
    strategy: Optional[str] = None
    target_position: Optional[int] = None
    max_bid: Optional[float] = None
    bid_step: Optional[float] = None
    target_drr: Optional[float] = None
    
    # Дополнительные параметры (специфичные для МП)
    extra: Dict[str, Any] = None


@dataclass
class Keyword:
    """Унифицированная модель ключевого слова."""
    id: Optional[str] = None
    external_id: Optional[str] = None
    campaign_id: str = ""
    keyword: str = ""
    match_type: MatchType = MatchType.PHRASE
    status: KeywordStatus = KeywordStatus.ACTIVE
    
    # Ставки
    current_bid: float = 0.0
    recommended_bid: Optional[float] = None
    min_bid: Optional[float] = None
    max_bid: Optional[float] = None
    
    # Статистика (последняя)
    views: int = 0
    clicks: int = 0
    ctr: float = 0.0
    orders: int = 0
    spent: float = 0.0
    
    # Мета
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None


@dataclass
class CampaignStats:
    """Статистика рекламной кампании."""
    campaign_id: str
    date: date
    
    # Основные метрики
    views: int = 0
    clicks: int = 0
    ctr: float = 0.0
    spent: float = 0.0
    cpc: float = 0.0
    
    # Конверсии
    orders: int = 0
    revenue: float = 0.0
    cpo: float = 0.0
    drr: float = 0.0
    
    # Позиции (средние)
    avg_position: Optional[float] = None


@dataclass 
class KeywordStats:
    """Статистика ключевого слова."""
    keyword_id: str
    campaign_id: str
    date: date
    
    views: int = 0
    clicks: int = 0
    ctr: float = 0.0
    spent: float = 0.0
    cpc: float = 0.0
    orders: int = 0
    revenue: float = 0.0
    position: Optional[int] = None


@dataclass
class BidUpdate:
    """Результат обновления ставки."""
    keyword_id: str
    old_bid: float
    new_bid: float
    success: bool
    error: Optional[str] = None


class MarketplacePromoAdapter(ABC):
    """Базовый класс адаптера рекламного API маркетплейса."""
    
    marketplace: str = ""
    
    # === Кампании ===
    
    @abstractmethod
    async def get_campaigns(
        self, 
        brand_id: Optional[str] = None,
        status: Optional[CampaignStatus] = None
    ) -> List[Campaign]:
        """Получить список кампаний."""
        pass
    
    @abstractmethod
    async def get_campaign(self, external_id: str) -> Optional[Campaign]:
        """Получить кампанию по ID."""
        pass
    
    @abstractmethod
    async def create_campaign(self, campaign: Campaign) -> Campaign:
        """Создать кампанию."""
        pass
    
    @abstractmethod
    async def update_campaign(self, campaign: Campaign) -> bool:
        """Обновить кампанию."""
        pass
    
    @abstractmethod
    async def pause_campaign(self, external_id: str) -> bool:
        """Приостановить кампанию."""
        pass
    
    @abstractmethod
    async def resume_campaign(self, external_id: str) -> bool:
        """Возобновить кампанию."""
        pass
    
    @abstractmethod
    async def archive_campaign(self, external_id: str) -> bool:
        """Архивировать кампанию."""
        pass
    
    # === Ключевые слова ===
    
    @abstractmethod
    async def get_keywords(self, campaign_external_id: str) -> List[Keyword]:
        """Получить ключевые слова кампании."""
        pass
    
    @abstractmethod
    async def add_keywords(
        self, 
        campaign_external_id: str, 
        keywords: List[Keyword]
    ) -> List[Keyword]:
        """Добавить ключевые слова."""
        pass
    
    @abstractmethod
    async def remove_keywords(
        self, 
        campaign_external_id: str, 
        keyword_ids: List[str]
    ) -> bool:
        """Удалить ключевые слова."""
        pass
    
    @abstractmethod
    async def pause_keyword(self, keyword_external_id: str) -> bool:
        """Приостановить ключевое слово."""
        pass
    
    @abstractmethod
    async def resume_keyword(self, keyword_external_id: str) -> bool:
        """Возобновить ключевое слово."""
        pass
    
    # === Ставки ===
    
    @abstractmethod
    async def update_bid(
        self, 
        keyword_external_id: str, 
        new_bid: float
    ) -> BidUpdate:
        """Обновить ставку для ключевого слова."""
        pass
    
    @abstractmethod
    async def update_bids_batch(
        self, 
        updates: Dict[str, float]
    ) -> List[BidUpdate]:
        """Пакетное обновление ставок."""
        pass
    
    @abstractmethod
    async def get_recommended_bids(
        self, 
        keyword_ids: List[str]
    ) -> Dict[str, float]:
        """Получить рекомендуемые ставки."""
        pass
    
    # === Статистика ===
    
    @abstractmethod
    async def get_campaign_stats(
        self,
        campaign_external_id: str,
        date_from: date,
        date_to: date
    ) -> List[CampaignStats]:
        """Получить статистику кампании."""
        pass
    
    @abstractmethod
    async def get_keywords_stats(
        self,
        campaign_external_id: str,
        date_from: date,
        date_to: date
    ) -> List[KeywordStats]:
        """Получить статистику по ключевым словам."""
        pass
    
    # === Позиции (real-time) ===
    
    @abstractmethod
    async def get_current_position(
        self, 
        sku: str, 
        keyword: str
    ) -> Optional[int]:
        """Получить текущую позицию товара по ключевому слову."""
        pass
    
    # === Вспомогательные ===
    
    @abstractmethod
    async def validate_campaign(self, campaign: Campaign) -> List[str]:
        """Валидация кампании перед созданием. Возвращает список ошибок."""
        pass
    
    @abstractmethod
    async def get_available_campaign_types(self) -> List[CampaignType]:
        """Получить доступные типы кампаний."""
        pass

2.2.2 Adapter Manager

python
# services/marketing/adapters/manager.py

from typing import Dict, Optional
from .base import MarketplacePromoAdapter
from .wildberries import WildberriesPromoAdapter
from .ozon import OzonPerformanceAdapter
from .yandex_market import YandexMarketAdapter


class AdapterManager:
    """Менеджер адаптеров маркетплейсов."""
    
    def __init__(self):
        self._adapters: Dict[str, MarketplacePromoAdapter] = {}
        self._initialize_adapters()
    
    def _initialize_adapters(self):
        """Инициализация адаптеров."""
        self._adapters = {
            "wb": WildberriesPromoAdapter(),
            "ozon": OzonPerformanceAdapter(),
            "ym": YandexMarketAdapter(),
        }
    
    def get_adapter(self, marketplace: str) -> Optional[MarketplacePromoAdapter]:
        """Получить адаптер по коду маркетплейса."""
        return self._adapters.get(marketplace)
    
    def get_all_adapters(self) -> Dict[str, MarketplacePromoAdapter]:
        """Получить все адаптеры."""
        return self._adapters
    
    async def health_check(self) -> Dict[str, bool]:
        """Проверка доступности API всех маркетплейсов."""
        results = {}
        for mp, adapter in self._adapters.items():
            try:
                # Простой запрос для проверки
                await adapter.get_campaigns(status=None)
                results[mp] = True
            except Exception:
                results[mp] = False
        return results

2.3 Wildberries Promotion API

2.3.1 Общая информация

ПараметрЗначение
Base URLhttps://advert-api.wb.ru
АвторизацияHeader Authorization: {API_KEY}
ФорматJSON
Rate Limit~100 req/min (уточнить по документации)

2.3.2 Типы рекламных кампаний

Типtype в APIОписание
Аукцион8Поисковая реклама (ставки за ключи)
Автоматическая9Автоматическое продвижение
Каталог6Реклама в каталоге категорий

2.3.3 Endpoints

Управление кампаниями

EndpointМетодОписание
/adv/v1/promotion/countGETКоличество кампаний по статусам
/adv/v1/promotion/advertsPOSTСписок кампаний (с фильтрами)
/adv/v1/promotion/advertGETДетали кампании
/adv/v1/promotion/createPOSTСоздание кампании
/adv/v1/promotion/updatePUTОбновление кампании
/adv/v1/promotion/startPOSTЗапуск кампании
/adv/v1/promotion/pausePOSTПауза кампании

Ключевые слова (для Аукциона)

EndpointМетодОписание
/adv/v1/search/keywordsGETСписок ключевых слов
/adv/v1/search/set-keywordPOSTДобавить/изменить ключ
/adv/v1/search/delete-keywordPOSTУдалить ключ
/adv/v1/search/set-keyword-bidPOSTУстановить ставку

Статистика

EndpointМетодОписание
/adv/v2/fullstatGETПолная статистика кампании
/adv/v2/stat/wordsGETСтатистика по ключевым словам
/adv/v2/stat/datesGETСтатистика по дням

Ставки

EndpointМетодОписание
/adv/v1/search/bidGETТекущие ставки
/adv/v1/search/set-bidPOSTУстановить ставку

2.3.4 Примеры запросов

Получение списка кампаний:

http
POST /adv/v1/promotion/adverts HTTP/1.1
Host: advert-api.wb.ru
Authorization: {API_KEY}
Content-Type: application/json

{
  "status": [9],
  "type": [8],
  "limit": 100,
  "offset": 0
}

Ответ:

json
{
  "adverts": [
    {
      "advertId": 12345678,
      "type": 8,
      "status": 9,
      "dailyBudget": 5000,
      "name": "Платья летние",
      "createTime": "2026-01-10T10:00:00Z",
      "changeTime": "2026-01-15T14:30:00Z",
      "startTime": "2026-01-10T10:00:00Z",
      "endTime": null,
      "params": [
        {
          "intervals": [...],
          "nms": [123456, 234567],
          "subjectId": 1000
        }
      ]
    }
  ],
  "total": 15
}

Установка ставки:

http
POST /adv/v1/search/set-bid HTTP/1.1
Host: advert-api.wb.ru
Authorization: {API_KEY}
Content-Type: application/json

{
  "advertId": 12345678,
  "keyword": "летнее платье",
  "bid": 150
}

2.3.5 Маппинг полей

WB APIУнифицированная модель
advertIdexternal_id
typecampaign_type (маппинг через Enum)
statusstatus (маппинг: 9=active, 11=paused, etc.)
dailyBudgetdaily_budget
namename
createTimecreated_at
params[].nmssku_list

2.3.6 Реализация адаптера

python
# services/marketing/adapters/wildberries.py

import httpx
from typing import List, Optional, Dict
from datetime import date, datetime
from .base import (
    MarketplacePromoAdapter, Campaign, Keyword, CampaignStats,
    KeywordStats, BidUpdate, CampaignType, CampaignStatus,
    KeywordStatus, MatchType
)
from core.config import settings
from core.rate_limiter import RateLimiter


class WildberriesPromoAdapter(MarketplacePromoAdapter):
    """Адаптер Wildberries Promotion API."""
    
    marketplace = "wb"
    BASE_URL = "https://advert-api.wb.ru"
    
    # Маппинг типов кампаний
    TYPE_MAP = {
        8: CampaignType.WB_AUCTION,
        9: CampaignType.WB_AUTO,
        6: CampaignType.WB_CATALOG,
    }
    TYPE_MAP_REVERSE = {v: k for k, v in TYPE_MAP.items()}
    
    # Маппинг статусов
    STATUS_MAP = {
        4: CampaignStatus.DRAFT,      # Готова к запуску
        7: CampaignStatus.ARCHIVED,   # Завершена
        9: CampaignStatus.ACTIVE,     # Активна
        11: CampaignStatus.PAUSED,    # Приостановлена
    }
    STATUS_MAP_REVERSE = {v: k for k, v in STATUS_MAP.items()}
    
    def __init__(self):
        self.api_key = settings.WB_PROMO_API_KEY
        self.rate_limiter = RateLimiter(rate=100, per=60)
        self.client = httpx.AsyncClient(
            base_url=self.BASE_URL,
            headers={"Authorization": self.api_key},
            timeout=30.0
        )
    
    async def _request(
        self, 
        method: str, 
        endpoint: str, 
        **kwargs
    ) -> Dict:
        """Выполнить запрос с rate limiting."""
        await self.rate_limiter.acquire()
        
        response = await self.client.request(method, endpoint, **kwargs)
        response.raise_for_status()
        
        return response.json() if response.content else {}
    
    # === Кампании ===
    
    async def get_campaigns(
        self,
        brand_id: Optional[str] = None,
        status: Optional[CampaignStatus] = None
    ) -> List[Campaign]:
        """Получить список кампаний."""
        
        payload = {
            "limit": 100,
            "offset": 0
        }
        
        if status:
            wb_status = self.STATUS_MAP_REVERSE.get(status)
            if wb_status:
                payload["status"] = [wb_status]
        
        data = await self._request("POST", "/adv/v1/promotion/adverts", json=payload)
        
        campaigns = []
        for adv in data.get("adverts", []):
            campaign = self._parse_campaign(adv)
            # Фильтрация по бренду (на стороне клиента, если нужно)
            if brand_id is None or campaign.brand_id == brand_id:
                campaigns.append(campaign)
        
        return campaigns
    
    async def get_campaign(self, external_id: str) -> Optional[Campaign]:
        """Получить кампанию по ID."""
        try:
            data = await self._request(
                "GET", 
                "/adv/v1/promotion/advert",
                params={"id": external_id}
            )
            return self._parse_campaign(data) if data else None
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                return None
            raise
    
    async def create_campaign(self, campaign: Campaign) -> Campaign:
        """Создать кампанию."""
        
        # Валидация
        errors = await self.validate_campaign(campaign)
        if errors:
            raise ValueError(f"Validation failed: {errors}")
        
        wb_type = self.TYPE_MAP_REVERSE.get(campaign.campaign_type)
        
        payload = {
            "type": wb_type,
            "name": campaign.name,
            "dailyBudget": int(campaign.daily_budget),
            "nms": [int(sku) for sku in campaign.sku_list],
        }
        
        if campaign.end_date:
            payload["endTime"] = campaign.end_date.isoformat()
        
        data = await self._request("POST", "/adv/v1/promotion/create", json=payload)
        
        campaign.external_id = str(data.get("advertId"))
        campaign.status = CampaignStatus.DRAFT
        campaign.created_at = datetime.utcnow()
        
        return campaign
    
    async def update_campaign(self, campaign: Campaign) -> bool:
        """Обновить кампанию."""
        
        payload = {
            "advertId": int(campaign.external_id),
            "name": campaign.name,
            "dailyBudget": int(campaign.daily_budget),
        }
        
        await self._request("PUT", "/adv/v1/promotion/update", json=payload)
        return True
    
    async def pause_campaign(self, external_id: str) -> bool:
        """Приостановить кампанию."""
        await self._request(
            "POST",
            "/adv/v1/promotion/pause",
            json={"advertId": int(external_id)}
        )
        return True
    
    async def resume_campaign(self, external_id: str) -> bool:
        """Возобновить кампанию."""
        await self._request(
            "POST",
            "/adv/v1/promotion/start",
            json={"advertId": int(external_id)}
        )
        return True
    
    async def archive_campaign(self, external_id: str) -> bool:
        """Архивировать кампанию (для WB - пауза + удаление)."""
        # WB не имеет явного архивирования
        return await self.pause_campaign(external_id)
    
    # === Ключевые слова ===
    
    async def get_keywords(self, campaign_external_id: str) -> List[Keyword]:
        """Получить ключевые слова кампании."""
        
        data = await self._request(
            "GET",
            "/adv/v1/search/keywords",
            params={"advertId": campaign_external_id}
        )
        
        keywords = []
        for kw in data.get("keywords", []):
            keywords.append(Keyword(
                external_id=kw.get("id"),
                campaign_id=campaign_external_id,
                keyword=kw.get("keyword", ""),
                match_type=MatchType.PHRASE,  # WB не различает типы
                status=KeywordStatus.ACTIVE if kw.get("status") == 1 else KeywordStatus.PAUSED,
                current_bid=float(kw.get("bid", 0)),
                views=kw.get("views", 0),
                clicks=kw.get("clicks", 0),
                ctr=kw.get("ctr", 0.0),
            ))
        
        return keywords
    
    async def add_keywords(
        self,
        campaign_external_id: str,
        keywords: List[Keyword]
    ) -> List[Keyword]:
        """Добавить ключевые слова."""
        
        for kw in keywords:
            await self._request(
                "POST",
                "/adv/v1/search/set-keyword",
                json={
                    "advertId": int(campaign_external_id),
                    "keyword": kw.keyword,
                    "bid": int(kw.current_bid) if kw.current_bid else 50
                }
            )
            kw.campaign_id = campaign_external_id
        
        return keywords
    
    async def remove_keywords(
        self,
        campaign_external_id: str,
        keyword_ids: List[str]
    ) -> bool:
        """Удалить ключевые слова."""
        
        for kw_id in keyword_ids:
            await self._request(
                "POST",
                "/adv/v1/search/delete-keyword",
                json={
                    "advertId": int(campaign_external_id),
                    "keywordId": int(kw_id)
                }
            )
        
        return True
    
    async def pause_keyword(self, keyword_external_id: str) -> bool:
        """Приостановить ключевое слово."""
        # WB: установить ставку 0 = пауза
        await self._request(
            "POST",
            "/adv/v1/search/set-keyword-bid",
            json={"keywordId": int(keyword_external_id), "bid": 0}
        )
        return True
    
    async def resume_keyword(self, keyword_external_id: str) -> bool:
        """Возобновить ключевое слово (требуется новая ставка)."""
        # Нужна ставка для возобновления
        raise NotImplementedError("Use update_bid to resume keyword")
    
    # === Ставки ===
    
    async def update_bid(
        self,
        keyword_external_id: str,
        new_bid: float
    ) -> BidUpdate:
        """Обновить ставку."""
        
        # Получить текущую ставку для логирования
        old_bid = 0.0  # TODO: получить из кэша или API
        
        try:
            await self._request(
                "POST",
                "/adv/v1/search/set-bid",
                json={
                    "keywordId": int(keyword_external_id),
                    "bid": int(new_bid)
                }
            )
            return BidUpdate(
                keyword_id=keyword_external_id,
                old_bid=old_bid,
                new_bid=new_bid,
                success=True
            )
        except Exception as e:
            return BidUpdate(
                keyword_id=keyword_external_id,
                old_bid=old_bid,
                new_bid=new_bid,
                success=False,
                error=str(e)
            )
    
    async def update_bids_batch(
        self,
        updates: Dict[str, float]
    ) -> List[BidUpdate]:
        """Пакетное обновление ставок."""
        results = []
        for keyword_id, new_bid in updates.items():
            result = await self.update_bid(keyword_id, new_bid)
            results.append(result)
        return results
    
    async def get_recommended_bids(
        self,
        keyword_ids: List[str]
    ) -> Dict[str, float]:
        """Получить рекомендуемые ставки."""
        # WB не предоставляет рекомендации напрямую
        # Используем данные Watcher
        return {}
    
    # === Статистика ===
    
    async def get_campaign_stats(
        self,
        campaign_external_id: str,
        date_from: date,
        date_to: date
    ) -> List[CampaignStats]:
        """Получить статистику кампании."""
        
        data = await self._request(
            "GET",
            "/adv/v2/stat/dates",
            params={
                "advertId": campaign_external_id,
                "dateFrom": date_from.isoformat(),
                "dateTo": date_to.isoformat()
            }
        )
        
        stats = []
        for day in data.get("dates", []):
            stats.append(CampaignStats(
                campaign_id=campaign_external_id,
                date=date.fromisoformat(day["date"]),
                views=day.get("views", 0),
                clicks=day.get("clicks", 0),
                ctr=day.get("ctr", 0.0),
                spent=float(day.get("sum", 0)),
                cpc=float(day.get("cpc", 0)),
                orders=day.get("orders", 0),
                revenue=float(day.get("orderSum", 0)),
                cpo=float(day.get("cpo", 0)),
                drr=float(day.get("drr", 0)),
            ))
        
        return stats
    
    async def get_keywords_stats(
        self,
        campaign_external_id: str,
        date_from: date,
        date_to: date
    ) -> List[KeywordStats]:
        """Получить статистику по ключевым словам."""
        
        data = await self._request(
            "GET",
            "/adv/v2/stat/words",
            params={
                "advertId": campaign_external_id,
                "dateFrom": date_from.isoformat(),
                "dateTo": date_to.isoformat()
            }
        )
        
        stats = []
        for kw in data.get("keywords", []):
            stats.append(KeywordStats(
                keyword_id=str(kw.get("keywordId")),
                campaign_id=campaign_external_id,
                date=date_to,  # Агрегированные данные
                views=kw.get("views", 0),
                clicks=kw.get("clicks", 0),
                ctr=kw.get("ctr", 0.0),
                spent=float(kw.get("sum", 0)),
                orders=kw.get("orders", 0),
            ))
        
        return stats
    
    async def get_current_position(
        self,
        sku: str,
        keyword: str
    ) -> Optional[int]:
        """Получить текущую позицию (через внутренний API или Watcher)."""
        # Требует парсинга или Watcher
        return None
    
    # === Вспомогательные ===
    
    async def validate_campaign(self, campaign: Campaign) -> List[str]:
        """Валидация кампании."""
        errors = []
        
        if not campaign.name:
            errors.append("Name is required")
        
        if campaign.daily_budget < 100:
            errors.append("Daily budget must be at least 100 RUB")
        
        if not campaign.sku_list:
            errors.append("At least one SKU is required")
        
        if campaign.campaign_type not in self.TYPE_MAP_REVERSE:
            errors.append(f"Invalid campaign type for WB: {campaign.campaign_type}")
        
        return errors
    
    async def get_available_campaign_types(self) -> List[CampaignType]:
        """Доступные типы кампаний."""
        return [
            CampaignType.WB_AUCTION,
            CampaignType.WB_AUTO,
            CampaignType.WB_CATALOG,
        ]
    
    def _parse_campaign(self, data: Dict) -> Campaign:
        """Парсинг кампании из ответа API."""
        
        wb_type = data.get("type", 8)
        wb_status = data.get("status", 4)
        
        # Извлечение SKU из params
        sku_list = []
        for param in data.get("params", []):
            sku_list.extend([str(nm) for nm in param.get("nms", [])])
        
        return Campaign(
            external_id=str(data.get("advertId")),
            marketplace="wb",
            campaign_type=self.TYPE_MAP.get(wb_type, CampaignType.WB_AUCTION),
            name=data.get("name", ""),
            status=self.STATUS_MAP.get(wb_status, CampaignStatus.DRAFT),
            daily_budget=float(data.get("dailyBudget", 0)),
            spent_today=float(data.get("todaySum", 0)),
            created_at=datetime.fromisoformat(data["createTime"].replace("Z", "+00:00")) if data.get("createTime") else None,
            updated_at=datetime.fromisoformat(data["changeTime"].replace("Z", "+00:00")) if data.get("changeTime") else None,
            sku_list=sku_list,
        )
    
    async def close(self):
        """Закрыть соединение."""
        await self.client.aclose()

2.4 Ozon Performance API

2.4.1 Общая информация

ПараметрЗначение
Base URLhttps://api-performance.ozon.ru
АвторизацияHeader Authorization: Bearer {ACCESS_TOKEN}
ФорматJSON
Rate Limit~60 req/min (уточнить по документации)
OAuthТребуется получение access_token через client_id/client_secret

2.4.2 Авторизация (OAuth 2.0)

http
POST /api/client/token HTTP/1.1
Host: api-performance.ozon.ru
Content-Type: application/json

{
  "client_id": "{CLIENT_ID}",
  "client_secret": "{CLIENT_SECRET}",
  "grant_type": "client_credentials"
}

Ответ:

json
{
  "access_token": "eyJhbGciOiJ...",
  "token_type": "Bearer",
  "expires_in": 1800
}

2.4.3 Типы рекламных кампаний

ТипКод APIОписание
ТрафаретыSKUРеклама товаров в карточках
Продвижение в поискеSEARCH_PROMOПоисковая реклама
Брендовая полкаBRAND_SHELFБрендовая реклама

2.4.4 Endpoints

Управление кампаниями

EndpointМетодОписание
/api/client/campaignGETСписок кампаний
/api/client/campaign/{id}GETДетали кампании
/api/client/campaignPOSTСоздание кампании
/api/client/campaign/{id}PUTОбновление кампании
/api/client/campaign/{id}/activatePOSTАктивация
/api/client/campaign/{id}/deactivatePOSTДеактивация

Группы товаров и ставки

EndpointМетодОписание
/api/client/campaign/{id}/productsGETТовары кампании
/api/client/campaign/{id}/productsPOSTДобавить товары
/api/client/campaign/{id}/products/deletePOSTУдалить товары
/api/client/campaign/{id}/bidPUTУстановить ставку
/api/client/campaign/{id}/bidsPOSTПакетные ставки

Статистика

EndpointМетодОписание
/api/client/statisticsPOSTСтатистика кампаний
/api/client/statistics/dailyPOSTСтатистика по дням
/api/client/statistics/productsPOSTСтатистика по товарам

2.4.5 Примеры запросов

Получение списка кампаний:

http
GET /api/client/campaign HTTP/1.1
Host: api-performance.ozon.ru
Authorization: Bearer {ACCESS_TOKEN}

Ответ:

json
{
  "list": [
    {
      "id": "campaign-uuid-123",
      "title": "Летняя коллекция",
      "state": "CAMPAIGN_STATE_RUNNING",
      "advObjectType": "SKU",
      "dailyBudget": 5000.00,
      "budget": 100000.00,
      "fromDate": "2026-01-10",
      "toDate": "2026-02-10",
      "createdAt": "2026-01-09T15:00:00Z",
      "updatedAt": "2026-01-15T12:00:00Z"
    }
  ],
  "total": 10
}

Установка ставки:

http
PUT /api/client/campaign/{campaign_id}/bid HTTP/1.1
Host: api-performance.ozon.ru
Authorization: Bearer {ACCESS_TOKEN}
Content-Type: application/json

{
  "sku": 123456789,
  "bid": 150.00
}

2.4.6 Маппинг полей

Ozon APIУнифицированная модель
idexternal_id
advObjectTypecampaign_type
statestatus
dailyBudgetdaily_budget
budgettotal_budget
titlename

2.4.7 Реализация адаптера (скелет)

python
# services/marketing/adapters/ozon.py

import httpx
from typing import List, Optional, Dict
from datetime import date, datetime
from .base import (
    MarketplacePromoAdapter, Campaign, Keyword, CampaignStats,
    CampaignType, CampaignStatus
)
from core.config import settings


class OzonPerformanceAdapter(MarketplacePromoAdapter):
    """Адаптер Ozon Performance API."""
    
    marketplace = "ozon"
    BASE_URL = "https://api-performance.ozon.ru"
    
    TYPE_MAP = {
        "SKU": CampaignType.OZON_TRAFARETS,
        "SEARCH_PROMO": CampaignType.OZON_SEARCH,
        "BRAND_SHELF": CampaignType.OZON_BRAND_SHELF,
    }
    
    STATUS_MAP = {
        "CAMPAIGN_STATE_RUNNING": CampaignStatus.ACTIVE,
        "CAMPAIGN_STATE_STOPPED": CampaignStatus.PAUSED,
        "CAMPAIGN_STATE_INACTIVE": CampaignStatus.DRAFT,
        "CAMPAIGN_STATE_ARCHIVED": CampaignStatus.ARCHIVED,
    }
    
    def __init__(self):
        self.client_id = settings.OZON_PERF_CLIENT_ID
        self.client_secret = settings.OZON_PERF_API_KEY
        self._access_token: Optional[str] = None
        self._token_expires: Optional[datetime] = None
        self.client = httpx.AsyncClient(
            base_url=self.BASE_URL,
            timeout=30.0
        )
    
    async def _ensure_token(self):
        """Обеспечить валидность access_token."""
        if self._access_token and self._token_expires and datetime.utcnow() < self._token_expires:
            return
        
        response = await self.client.post(
            "/api/client/token",
            json={
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "grant_type": "client_credentials"
            }
        )
        response.raise_for_status()
        data = response.json()
        
        self._access_token = data["access_token"]
        self._token_expires = datetime.utcnow() + timedelta(seconds=data["expires_in"] - 60)
    
    async def _request(self, method: str, endpoint: str, **kwargs) -> Dict:
        """Выполнить запрос с авторизацией."""
        await self._ensure_token()
        
        headers = kwargs.pop("headers", {})
        headers["Authorization"] = f"Bearer {self._access_token}"
        
        response = await self.client.request(
            method, endpoint, headers=headers, **kwargs
        )
        response.raise_for_status()
        
        return response.json() if response.content else {}
    
    async def get_campaigns(
        self,
        brand_id: Optional[str] = None,
        status: Optional[CampaignStatus] = None
    ) -> List[Campaign]:
        """Получить список кампаний."""
        data = await self._request("GET", "/api/client/campaign")
        
        campaigns = []
        for item in data.get("list", []):
            campaign = self._parse_campaign(item)
            if status is None or campaign.status == status:
                campaigns.append(campaign)
        
        return campaigns
    
    # ... остальные методы аналогичны WB адаптеру
    
    async def get_available_campaign_types(self) -> List[CampaignType]:
        return [
            CampaignType.OZON_TRAFARETS,
            CampaignType.OZON_SEARCH,
            CampaignType.OZON_BRAND_SHELF,
        ]

2.5 Яндекс.Маркет Bidding API

2.5.1 Общая информация

ПараметрЗначение
Base URLhttps://api.partner.market.yandex.ru
АвторизацияHeader Authorization: Bearer {OAUTH_TOKEN}
ФорматJSON
Rate Limit~30 req/min (уточнить по документации)

2.5.2 Типы рекламных кампаний

ТипОписание
Буст продажПродвижение товаров в поиске
РекомендацииРазмещение в блоках рекомендаций

2.5.3 Endpoints

Ставки продвижения

EndpointМетодОписание
/businesses/{businessId}/bids/infoPOSTИнформация о ставках
/businesses/{businessId}/bidsPUTУстановить ставки
/businesses/{businessId}/bids/recommendationsPOSTРекомендации по ставкам

Статистика

EndpointМетодОписание
/campaigns/{campaignId}/stats/skusPOSTСтатистика по SKU

2.5.4 Примеры запросов

Получение информации о ставках:

http
POST /businesses/{businessId}/bids/info HTTP/1.1
Host: api.partner.market.yandex.ru
Authorization: Bearer {OAUTH_TOKEN}
Content-Type: application/json

{
  "skus": ["SKU-001", "SKU-002"]
}

Ответ:

json
{
  "result": {
    "bids": [
      {
        "sku": "SKU-001",
        "bid": 100,
        "status": "PUBLISHED"
      }
    ]
  }
}

Установка ставок:

http
PUT /businesses/{businessId}/bids HTTP/1.1
Host: api.partner.market.yandex.ru
Authorization: Bearer {OAUTH_TOKEN}
Content-Type: application/json

{
  "bids": [
    {
      "sku": "SKU-001",
      "bid": 150
    }
  ]
}

2.5.5 Реализация адаптера (скелет)

python
# services/marketing/adapters/yandex_market.py

import httpx
from typing import List, Optional, Dict
from datetime import date
from .base import (
    MarketplacePromoAdapter, Campaign, Keyword, CampaignStats,
    CampaignType, CampaignStatus
)
from core.config import settings


class YandexMarketAdapter(MarketplacePromoAdapter):
    """Адаптер Яндекс.Маркет Bidding API."""
    
    marketplace = "ym"
    BASE_URL = "https://api.partner.market.yandex.ru"
    
    def __init__(self):
        self.oauth_token = settings.YM_OAUTH_TOKEN
        self.campaign_id = settings.YM_CAMPAIGN_ID
        self.business_id = settings.YM_BUSINESS_ID
        self.client = httpx.AsyncClient(
            base_url=self.BASE_URL,
            headers={"Authorization": f"Bearer {self.oauth_token}"},
            timeout=30.0
        )
    
    async def _request(self, method: str, endpoint: str, **kwargs) -> Dict:
        """Выполнить запрос."""
        response = await self.client.request(method, endpoint, **kwargs)
        response.raise_for_status()
        return response.json() if response.content else {}
    
    # YM не имеет концепции "кампаний" в том же смысле
    # Работа идёт напрямую со ставками по SKU
    
    async def get_campaigns(
        self,
        brand_id: Optional[str] = None,
        status: Optional[CampaignStatus] = None
    ) -> List[Campaign]:
        """Получить виртуальные 'кампании' (группы SKU)."""
        # YM: создаём виртуальную кампанию для всех SKU
        return [Campaign(
            external_id=f"ym_{self.business_id}",
            marketplace="ym",
            campaign_type=CampaignType.YM_BOOST,
            name="Буст продаж",
            status=CampaignStatus.ACTIVE,
        )]
    
    async def update_bid(
        self,
        keyword_external_id: str,  # Для YM это SKU
        new_bid: float
    ) -> BidUpdate:
        """Обновить ставку для SKU."""
        
        try:
            await self._request(
                "PUT",
                f"/businesses/{self.business_id}/bids",
                json={
                    "bids": [
                        {"sku": keyword_external_id, "bid": int(new_bid)}
                    ]
                }
            )
            return BidUpdate(
                keyword_id=keyword_external_id,
                old_bid=0.0,
                new_bid=new_bid,
                success=True
            )
        except Exception as e:
            return BidUpdate(
                keyword_id=keyword_external_id,
                old_bid=0.0,
                new_bid=new_bid,
                success=False,
                error=str(e)
            )
    
    async def get_recommended_bids(
        self,
        keyword_ids: List[str]  # SKUs
    ) -> Dict[str, float]:
        """Получить рекомендуемые ставки."""
        
        data = await self._request(
            "POST",
            f"/businesses/{self.business_id}/bids/recommendations",
            json={"skus": keyword_ids}
        )
        
        result = {}
        for item in data.get("result", {}).get("recommendations", []):
            result[item["sku"]] = float(item.get("bid", 0))
        
        return result
    
    async def get_available_campaign_types(self) -> List[CampaignType]:
        return [
            CampaignType.YM_BOOST,
            CampaignType.YM_RECOMMENDATIONS,
        ]

2.6 Rate Limiter

2.6.1 Реализация

python
# core/rate_limiter.py

import asyncio
from datetime import datetime
from typing import Dict


class RateLimiter:
    """Token Bucket Rate Limiter."""
    
    def __init__(self, rate: int, per: int = 60):
        """
        Args:
            rate: Количество токенов за период
            per: Период в секундах
        """
        self.rate = rate
        self.per = per
        self.tokens = float(rate)
        self.last_update = datetime.now()
        self._lock = asyncio.Lock()
    
    async def acquire(self, tokens: int = 1):
        """Получить токен(ы), подождать если необходимо."""
        async with self._lock:
            now = datetime.now()
            elapsed = (now - self.last_update).total_seconds()
            
            # Пополнение токенов
            self.tokens = min(
                self.rate,
                self.tokens + elapsed * (self.rate / self.per)
            )
            self.last_update = now
            
            if self.tokens < tokens:
                # Ждём пополнения
                wait_time = (tokens - self.tokens) * (self.per / self.rate)
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= tokens


class MultiRateLimiter:
    """Rate Limiter для нескольких маркетплейсов."""
    
    def __init__(self, config: Dict[str, Dict[str, int]]):
        """
        Args:
            config: {"wb": {"rate": 100, "per": 60}, ...}
        """
        self.limiters = {
            mp: RateLimiter(**params)
            for mp, params in config.items()
        }
    
    async def acquire(self, marketplace: str, tokens: int = 1):
        """Получить токен для маркетплейса."""
        limiter = self.limiters.get(marketplace)
        if limiter:
            await limiter.acquire(tokens)


# Глобальный инстанс
rate_limiters = MultiRateLimiter({
    "wb": {"rate": 100, "per": 60},
    "ozon": {"rate": 60, "per": 60},
    "ym": {"rate": 30, "per": 60},
})

2.7 Retry Handler

2.7.1 Реализация

python
# core/retry_handler.py

import asyncio
import logging
from functools import wraps
from typing import Type, Tuple, Callable
import httpx


logger = logging.getLogger(__name__)


class RetryConfig:
    """Конфигурация retry."""
    
    def __init__(
        self,
        max_retries: int = 3,
        backoff_base: float = 2.0,
        backoff_max: float = 60.0,
        retryable_statuses: Tuple[int, ...] = (429, 500, 502, 503, 504),
        retryable_exceptions: Tuple[Type[Exception], ...] = (
            httpx.TimeoutException,
            httpx.ConnectError,
        )
    ):
        self.max_retries = max_retries
        self.backoff_base = backoff_base
        self.backoff_max = backoff_max
        self.retryable_statuses = retryable_statuses
        self.retryable_exceptions = retryable_exceptions


def with_retry(config: RetryConfig = None):
    """Декоратор для retry с exponential backoff."""
    
    if config is None:
        config = RetryConfig()
    
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(config.max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                    
                except httpx.HTTPStatusError as e:
                    if e.response.status_code not in config.retryable_statuses:
                        raise
                    
                    last_exception = e
                    
                    if e.response.status_code == 429:
                        # Rate limit: exponential backoff
                        wait = min(
                            config.backoff_base ** attempt * 10,
                            config.backoff_max
                        )
                    else:
                        # Server error: fixed backoff
                        wait = 30
                    
                    logger.warning(
                        f"Retry {attempt + 1}/{config.max_retries} for {func.__name__}, "
                        f"status {e.response.status_code}, waiting {wait}s"
                    )
                    
                    if attempt < config.max_retries:
                        await asyncio.sleep(wait)
                    
                except config.retryable_exceptions as e:
                    last_exception = e
                    wait = min(
                        config.backoff_base ** attempt * 5,
                        config.backoff_max
                    )
                    
                    logger.warning(
                        f"Retry {attempt + 1}/{config.max_retries} for {func.__name__}, "
                        f"exception {type(e).__name__}, waiting {wait}s"
                    )
                    
                    if attempt < config.max_retries:
                        await asyncio.sleep(wait)
            
            raise last_exception
        
        return wrapper
    return decorator

2.8 Credentials Manager

2.8.1 Конфигурация

python
# core/credentials.py

import os
from dataclasses import dataclass
from datetime import datetime
from typing import Optional


@dataclass
class WBPromoCredentials:
    """Credentials Wildberries Promotion API."""
    api_key: str
    expires_at: Optional[datetime] = None


@dataclass  
class OzonPerformanceCredentials:
    """Credentials Ozon Performance API."""
    client_id: str
    client_secret: str


@dataclass
class YMCredentials:
    """Credentials Яндекс.Маркет."""
    oauth_token: str
    business_id: str
    campaign_id: str


class CredentialsManager:
    """Менеджер credentials для маркетплейсов."""
    
    @staticmethod
    def get_wb() -> WBPromoCredentials:
        expires_str = os.getenv("WB_PROMO_API_KEY_EXPIRES")
        expires_at = datetime.fromisoformat(expires_str) if expires_str else None
        
        return WBPromoCredentials(
            api_key=os.environ["WB_PROMO_API_KEY"],
            expires_at=expires_at
        )
    
    @staticmethod
    def get_ozon() -> OzonPerformanceCredentials:
        return OzonPerformanceCredentials(
            client_id=os.environ["OZON_PERF_CLIENT_ID"],
            client_secret=os.environ["OZON_PERF_API_KEY"]
        )
    
    @staticmethod
    def get_ym() -> YMCredentials:
        return YMCredentials(
            oauth_token=os.environ["YM_OAUTH_TOKEN"],
            business_id=os.environ["YM_BUSINESS_ID"],
            campaign_id=os.environ["YM_CAMPAIGN_ID"]
        )
    
    @staticmethod
    def validate_all() -> dict:
        """Проверить наличие всех credentials."""
        results = {}
        
        try:
            CredentialsManager.get_wb()
            results["wb"] = True
        except KeyError:
            results["wb"] = False
        
        try:
            CredentialsManager.get_ozon()
            results["ozon"] = True
        except KeyError:
            results["ozon"] = False
        
        try:
            CredentialsManager.get_ym()
            results["ym"] = True
        except KeyError:
            results["ym"] = False
        
        return results

2.9 Обработка ошибок

2.9.1 Иерархия исключений

python
# core/exceptions.py

class MarketingError(Exception):
    """Базовое исключение модуля Marketing."""
    pass


class AdapterError(MarketingError):
    """Ошибка адаптера маркетплейса."""
    
    def __init__(self, marketplace: str, message: str, original: Exception = None):
        self.marketplace = marketplace
        self.original = original
        super().__init__(f"[{marketplace}] {message}")


class RateLimitError(AdapterError):
    """Превышен rate limit."""
    pass


class AuthenticationError(AdapterError):
    """Ошибка аутентификации."""
    pass


class ValidationError(AdapterError):
    """Ошибка валидации."""
    pass


class CampaignNotFoundError(AdapterError):
    """Кампания не найдена."""
    pass


class InsufficientBudgetError(AdapterError):
    """Недостаточный бюджет."""
    pass

2.9.2 Обработка в адаптере

python
async def _handle_response_error(self, response: httpx.Response):
    """Обработка ошибок ответа."""
    
    if response.status_code == 401:
        raise AuthenticationError(
            self.marketplace,
            "Invalid or expired API key"
        )
    
    if response.status_code == 429:
        raise RateLimitError(
            self.marketplace,
            "Rate limit exceeded"
        )
    
    if response.status_code == 404:
        raise CampaignNotFoundError(
            self.marketplace,
            "Campaign not found"
        )
    
    if response.status_code >= 400:
        error_data = response.json() if response.content else {}
        raise AdapterError(
            self.marketplace,
            f"API error {response.status_code}: {error_data}"
        )

2.10 Тестирование адаптеров

2.10.1 Unit-тесты

python
# tests/test_adapters.py

import pytest
from unittest.mock import AsyncMock, patch
from services.marketing.adapters.wildberries import WildberriesPromoAdapter
from services.marketing.adapters.base import Campaign, CampaignType


@pytest.fixture
def wb_adapter():
    return WildberriesPromoAdapter()


@pytest.mark.asyncio
async def test_get_campaigns(wb_adapter):
    """Тест получения списка кампаний."""
    
    mock_response = {
        "adverts": [
            {
                "advertId": 12345,
                "type": 8,
                "status": 9,
                "name": "Test Campaign",
                "dailyBudget": 5000,
            }
        ],
        "total": 1
    }
    
    with patch.object(wb_adapter, '_request', return_value=mock_response):
        campaigns = await wb_adapter.get_campaigns()
        
        assert len(campaigns) == 1
        assert campaigns[0].external_id == "12345"
        assert campaigns[0].campaign_type == CampaignType.WB_AUCTION


@pytest.mark.asyncio
async def test_update_bid(wb_adapter):
    """Тест обновления ставки."""
    
    with patch.object(wb_adapter, '_request', return_value={}):
        result = await wb_adapter.update_bid("123", 150.0)
        
        assert result.success is True
        assert result.new_bid == 150.0

Документ подготовлен: Январь 2026
Версия: 1.0
Статус: Черновик

Примечание: API endpoints и параметры указаны на основе публичной документации. Перед реализацией необходимо уточнить актуальную спецификацию API каждого маркетплейса.

Документация ADOLF Platform