Skip to content

Раздел 2: Data Ingestion

Проект: Финансовый учёт и управленческая аналитика
Модуль: CFO
Версия: 2.0
Дата: Февраль 2026


2.1 Назначение

Раздел описывает процессы сбора, парсинга и нормализации финансовых данных из всех источников.

Источники данных

ИсточникТипПериодичностьДанные
Wildberries APIAPIЕжедневноПродажи, комиссии, логистика
Wildberries ExcelФайлЕженедельноФинансовый отчёт из ЛК
Ozon APIAPIЕжедневноТранзакции
Ozon ExcelФайлЕженедельноОтчёт о реализации
Яндекс.Маркет APIAPIЕжедневноФинансовый отчёт
Яндекс.Маркет ExcelФайлЕженедельноДетализация из ЛК
1С:КА 2 → PostgreSQL1C_* таблицыЕженедельно (авто)Выручка, себестоимость, расходы
Бухгалтерская первичкаФайлПо мере поступленияНакладные, акты

2.2 Архитектура Data Ingestion

2.2.1 Общая схема

2.2.2 Структура папок для файлов

/data/inbox/cfo/
├── marketplace/
│   ├── wb/                    # Excel-отчёты Wildberries
│   │   └── WB_Finance_2026-01-15.xlsx
│   ├── ozon/                  # Excel-отчёты Ozon
│   │   └── Ozon_Realization_2026-01-15.xlsx
│   └── ym/                    # Excel-отчёты Яндекс.Маркет
│       └── YM_Finance_2026-01-15.xlsx
└── primary/                   # Бухгалтерская первичка
    ├── invoices/              # Накладные
    ├── acts/                  # Акты
    └── contracts/             # Договоры

2.3 Wildberries Finance Adapter

2.3.1 API Endpoint

ПараметрЗначение
Base URLhttps://statistics-api.wildberries.ru
Endpoint/api/v1/supplier/reportDetailByPeriod
MethodGET
AuthHeader Authorization: {API_KEY}

2.3.2 Параметры запроса

ПараметрТипОбязательныйОписание
dateFromstringДаНачало периода (RFC3339)
dateTostringДаКонец периода (RFC3339)
limitintНетЛимит записей (макс. 100000)
rrdidintНетID для пагинации

2.3.3 Структура ответа

json
{
  "realizationreport_id": 123456789,
  "date_from": "2026-01-01T00:00:00Z",
  "date_to": "2026-01-07T23:59:59Z",
  "create_dt": "2026-01-08T10:00:00Z",
  "suppliercontract_code": null,
  "rrd_id": 987654321,
  "gi_id": 111222333,
  "subject_name": "Платья",
  "nm_id": 12345678,
  "brand_name": "Охана Маркет",
  "sa_name": "OM-12345",
  "ts_name": "44",
  "barcode": "2000000000001",
  "doc_type_name": "Продажа",
  "quantity": 1,
  "retail_price": 3500.00,
  "retail_amount": 3500.00,
  "sale_percent": 15,
  "commission_percent": 15.5,
  "office_name": "Коледино",
  "supplier_oper_name": "Продажа",
  "order_dt": "2026-01-05T14:30:00Z",
  "sale_dt": "2026-01-06T10:00:00Z",
  "rr_dt": "2026-01-07T00:00:00Z",
  "shk_id": 999888777,
  "retail_price_withdisc_rub": 2975.00,
  "delivery_amount": 50,
  "return_amount": 0,
  "delivery_rub": 85.00,
  "gi_box_type_name": "Короб",
  "product_discount_for_report": 525.00,
  "supplier_promo": 0,
  "rid": 0,
  "ppvz_spp_prc": 10.5,
  "ppvz_kvw_prc_base": 15.5,
  "ppvz_kvw_prc": 15.5,
  "ppvz_sales_commission": 461.13,
  "ppvz_for_pay": 2428.87,
  "ppvz_reward": 0,
  "acquiring_fee": 0,
  "acquiring_bank": "",
  "ppvz_vw": 297.50,
  "ppvz_vw_nds": 59.50,
  "ppvz_office_id": 123,
  "ppvz_office_name": "Коледино",
  "ppvz_supplier_id": 456,
  "ppvz_supplier_name": "ООО Охана",
  "ppvz_inn": "1234567890",
  "declaration_number": "",
  "bonus_type_name": "",
  "sticker_id": "WB-123456789",
  "site_country": "RU",
  "penalty": 0,
  "additional_payment": 0,
  "rebill_logistic_cost": 0,
  "rebill_logistic_org": "",
  "kiz": "",
  "storage_fee": 5.50,
  "deduction": 0,
  "acceptance": 0,
  "srid": "abc123def456"
}

2.3.4 Маппинг полей WB → CFO

Поле WB APIПоле CFOОписание
sa_nameskuАртикул продавца (основной идентификатор)
nm_idnm_idНоменклатура WB
barcodebarcodeШтрихкод (опционально, из API МП)
subject_namecategoryКатегория
brand_namebrand_nameБренд
ts_namesizeРазмер
retail_price_withdisc_rubrevenueВыручка (с учётом скидок)
ppvz_sales_commissioncommissionКомиссия МП
delivery_rublogisticsЛогистика
return_amountreturn_logisticsОбратная логистика
storage_feestorageХранение
ppvz_for_paypayoutК выплате
sale_dtsale_dateДата продажи
doc_type_nameoperation_typeТип операции
sridexternal_idУникальный ID операции

2.3.5 Реализация адаптера

Код реализации WBFinanceAdapter — см. CFO Раздел 7: Celery, задача cfo_import_wb. Ниже — интерфейс и маппинг.

python
@dataclass
class WBTransaction:
    """Транзакция Wildberries."""
    external_id: str
    sku: str
    nm_id: int
    barcode: Optional[str]  # Опционально — приходит из API МП
    category: str
    brand_name: str
    size: str
    revenue: float
    commission: float
    logistics: float
    return_logistics: float
    storage: float
    payout: float
    sale_date: datetime
    operation_type: str
    quantity: int


class WBFinanceAdapter:
    """Адаптер для финансового API Wildberries."""
    
    BASE_URL = "https://statistics-api.wildberries.ru"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"Authorization": api_key}
    
    async def get_transactions(
        self,
        date_from: date,
        date_to: date
    ) -> List[WBTransaction]:
        """Получение транзакций за период с пагинацией."""
        ...
    
    def _map_transaction(self, raw: dict) -> WBTransaction:
        """Маппинг сырых данных в транзакцию."""
        
        return WBTransaction(
            external_id=raw.get("srid", ""),
            sku=raw.get("sa_name", ""),
            nm_id=raw.get("nm_id", 0),
            barcode=raw.get("barcode") or None,
            category=raw.get("subject_name", ""),
            brand_name=raw.get("brand_name", ""),
            size=raw.get("ts_name", ""),
            revenue=float(raw.get("retail_price_withdisc_rub", 0)),
            commission=float(raw.get("ppvz_sales_commission", 0)),
            logistics=float(raw.get("delivery_rub", 0)),
            return_logistics=float(raw.get("return_amount", 0)),
            storage=float(raw.get("storage_fee", 0)),
            payout=float(raw.get("ppvz_for_pay", 0)),
            sale_date=datetime.fromisoformat(
                raw.get("sale_dt", "").replace("Z", "+00:00")
            ),
            operation_type=raw.get("doc_type_name", ""),
            quantity=int(raw.get("quantity", 0))
        )

2.4 Ozon Finance Adapter

2.4.1 API Endpoint

ПараметрЗначение
Base URLhttps://api-seller.ozon.ru
Endpoint/v3/finance/transaction/list
MethodPOST
AuthHeaders Client-Id + Api-Key

2.4.2 Параметры запроса

json
{
  "filter": {
    "date": {
      "from": "2026-01-01T00:00:00Z",
      "to": "2026-01-07T23:59:59Z"
    },
    "operation_type": [],
    "posting_number": "",
    "transaction_type": "all"
  },
  "page": 1,
  "page_size": 1000
}

2.4.3 Структура ответа

json
{
  "result": {
    "operations": [
      {
        "operation_id": 123456789,
        "operation_type": "OperationAgentDeliveredToCustomer",
        "operation_date": "2026-01-06T10:00:00Z",
        "operation_type_name": "Доставка покупателю",
        "delivery_charge": 50.0,
        "return_delivery_charge": 0,
        "accruals_for_sale": 2500.0,
        "sale_commission": 375.0,
        "amount": 2075.0,
        "type": "compensation",
        "posting": {
          "delivery_schema": "FBO",
          "order_date": "2026-01-05T14:30:00Z",
          "posting_number": "12345678-0001-1"
        },
        "items": [
          {
            "name": "Платье летнее синее",
            "sku": 123456789,
            "offer_id": "OM-12345"
          }
        ],
        "services": [
          {
            "name": "MarketplaceServiceItemFulfillment",
            "price": -50.0
          }
        ]
      }
    ],
    "page_count": 5,
    "row_count": 4500
  }
}

2.4.4 Маппинг полей Ozon → CFO

Поле Ozon APIПоле CFOОписание
items[0].offer_idskuАртикул продавца (основной идентификатор)
items[0].skunm_idSKU Ozon
items[0].nameproduct_nameНазвание товара
accruals_for_salerevenueВыручка
sale_commissioncommissionКомиссия
delivery_chargelogisticsЛогистика
return_delivery_chargereturn_logisticsОбратная логистика
amountpayoutК выплате
operation_datesale_dateДата операции
operation_type_nameoperation_typeТип операции
operation_idexternal_idУникальный ID

2.4.5 Реализация адаптера

Код реализации OzonFinanceAdapter — см. CFO Раздел 7: Celery, задача cfo_import_ozon. Ниже — интерфейс и маппинг.

python
@dataclass
class OzonTransaction:
    """Транзакция Ozon."""
    external_id: str
    sku: str
    ozon_sku: int
    product_name: str
    revenue: float
    commission: float
    logistics: float
    return_logistics: float
    storage: float
    payout: float
    sale_date: datetime
    operation_type: str
    quantity: int


class OzonFinanceAdapter:
    """Адаптер для финансового API Ozon."""
    
    BASE_URL = "https://api-seller.ozon.ru"
    
    def __init__(self, client_id: str, api_key: str):
        self.client_id = client_id
        self.api_key = api_key
        self.headers = {
            "Client-Id": client_id,
            "Api-Key": api_key,
            "Content-Type": "application/json"
        }
    
    async def get_transactions(
        self,
        date_from: date,
        date_to: date
    ) -> List[OzonTransaction]:
        """Получение транзакций за период с пагинацией."""
        ...
    
    def _map_transaction(self, raw: dict) -> OzonTransaction:
        """Маппинг сырых данных в транзакцию."""
        
        items = raw.get("items", [{}])
        first_item = items[0] if items else {}
        
        storage = sum(
            abs(float(s.get("price", 0)))
            for s in raw.get("services", [])
            if "Storage" in s.get("name", "")
        )
        
        return OzonTransaction(
            external_id=str(raw.get("operation_id", "")),
            sku=first_item.get("offer_id", ""),
            ozon_sku=first_item.get("sku", 0),
            product_name=first_item.get("name", ""),
            revenue=float(raw.get("accruals_for_sale", 0)),
            commission=float(raw.get("sale_commission", 0)),
            logistics=float(raw.get("delivery_charge", 0)),
            return_logistics=float(raw.get("return_delivery_charge", 0)),
            storage=storage,
            payout=float(raw.get("amount", 0)),
            sale_date=datetime.fromisoformat(
                raw.get("operation_date", "").replace("Z", "+00:00")
            ),
            operation_type=raw.get("operation_type_name", ""),
            quantity=1
        )

2.5 Яндекс.Маркет Finance Adapter

2.5.1 API Endpoint

ПараметрЗначение
Base URLhttps://api.partner.market.yandex.ru
Endpoint/v2/campaigns/{campaignId}/stats/orders
MethodGET
AuthHeader Authorization: OAuth {TOKEN}

2.5.2 Маппинг полей YM → CFO

Поле YM APIПоле CFOОписание
shopSkuskuАртикул продавца
marketSkunm_idSKU Яндекса
offerNameproduct_nameНазвание
prices.valuerevenueВыручка
commissions.amountcommissionКомиссия
delivery.pricelogisticsЛогистика

2.6 Excel Parser

2.6.1 Wildberries Excel

Структура файла:

КолонкаОписание
Артикул поставщикаSKU
БаркодШтрихкод (из файла МП, не 1С)
БрендНазвание бренда
ПредметКатегория
РазмерРазмер товара
К перечислению продавцуСумма выплаты
Вознаграждение ВБКомиссия
ЛогистикаСтоимость доставки
ХранениеСтоимость хранения
Дата продажиДата операции

2.6.2 Маппинг колонок Excel

python
class WBExcelParser:
    """Парсер Excel-отчётов Wildberries."""
    
    COLUMN_MAPPING = {
        "Артикул поставщика": "sku",
        "Артикул": "sku",
        "Баркод": "barcode",  # Приходит из файла МП
        "Бренд": "brand_name",
        "Предмет": "category",
        "Размер": "size",
        "К перечислению продавцу": "payout",
        "К перечислению": "payout",
        "Вознаграждение ВБ": "commission",
        "Вознаграждение Вайлдберриз": "commission",
        "Логистика": "logistics",
        "Услуги логистики": "logistics",
        "Хранение": "storage",
        "Услуги хранения": "storage",
        "Дата продажи": "sale_date",
        "Дата заказа покупателем": "sale_date",
        "Тип документа": "operation_type",
        "Обоснование": "operation_type",
        "Кол-во": "quantity",
        "Количество": "quantity"
    }

2.7 Данные 1С через 1C_* VIEW

2.7.1 Источники данных 1С для CFO

Таблица 1C_*ЗапросДанныеИспользование в CFO
1C_account_turns_90Q-01Выручка и себестоимость по номенклатуреP&L, Cost Mapping, ABC-анализ
1C_account_turns_90_expensesQ-02Коммерческие и управленческие расходыP&L (статьи расходов)
1C_nomenclatureQ-10Справочник номенклатурыМаппинг SKU → название, бренд
1C_nomenclature_pricesQ-11Цены номенклатурыСправочные цены

2.7.2 VIEW: cfo_v_cost_prices

VIEW для получения себестоимости по SKU из данных бухучёта (счёт 90.02.1).

sql
CREATE OR REPLACE VIEW cfo_v_cost_prices AS
SELECT
    t.article                           AS sku,
    t.nomenclature                      AS product_name,
    t.organization                      AS organization,
    -- Себестоимость = оборот по Дт 90.02.1 / количество
    CASE 
        WHEN SUM(t.quantity_dt) > 0 
        THEN ROUND(SUM(t.amount_dt) / SUM(t.quantity_dt), 2)
        ELSE NULL
    END                                 AS cost_price,
    SUM(t.amount_dt)                    AS total_cost,
    SUM(t.quantity_dt)                  AS total_quantity,
    t.period                            AS period,
    MAX(t.loaded_at)                    AS data_freshness
FROM 1C_account_turns_90 t
WHERE t.account_dt = '90.02.1'
  AND t.article IS NOT NULL
  AND t.article != ''
GROUP BY t.article, t.nomenclature, t.organization, t.period;

COMMENT ON VIEW cfo_v_cost_prices IS 
    'Себестоимость по SKU из 1C_account_turns_90 (Q-01, счёт 90.02.1)';

2.7.3 VIEW: cfo_v_revenue_1c

VIEW для получения выручки по данным бухучёта (счёт 90.01.1) — используется для сверки с данными маркетплейсов.

sql
CREATE OR REPLACE VIEW cfo_v_revenue_1c AS
SELECT
    t.article                           AS sku,
    t.nomenclature                      AS product_name,
    t.counterparty                      AS counterparty,
    t.organization                      AS organization,
    SUM(t.amount_ct)                    AS revenue,
    SUM(t.quantity_dt)                  AS quantity,
    t.period                            AS period,
    MAX(t.loaded_at)                    AS data_freshness
FROM 1C_account_turns_90 t
WHERE t.account_ct = '90.01.1'
  AND t.article IS NOT NULL
  AND t.article != ''
GROUP BY t.article, t.nomenclature, t.counterparty, t.organization, t.period;

COMMENT ON VIEW cfo_v_revenue_1c IS 
    'Выручка по SKU из 1C_account_turns_90 (Q-01, счёт 90.01.1)';

2.7.4 Сервис работы с себестоимостью

python
class CostPriceService:
    """Сервис работы с себестоимостью.
    
    v2.0: Читает из PostgreSQL VIEW cfo_v_cost_prices
    (источник — 1C_account_turns_90, заполняется Экстрактором 1С).
    """
    
    def __init__(self, db_session):
        self.db = db_session
        self._cost_cache: dict = {}
    
    async def get_cost_price(self, sku: str) -> Optional[float]:
        """Получение себестоимости по SKU.
        
        Читает из VIEW cfo_v_cost_prices (1C_account_turns_90).
        
        Args:
            sku: Артикул товара
            
        Returns:
            Себестоимость или None если не найдена
        """
        
        if sku in self._cost_cache:
            return self._cost_cache[sku]
        
        query = """
            SELECT cost_price 
            FROM cfo_v_cost_prices
            WHERE sku = $1
            ORDER BY period DESC
            LIMIT 1
        """
        
        row = await self.db.fetchrow(query, sku)
        
        if row and row["cost_price"]:
            cost = float(row["cost_price"])
            self._cost_cache[sku] = cost
            return cost
        
        return None
    
    async def get_cost_map(self) -> dict:
        """Получение полного словаря SKU → себестоимость.
        
        Берёт последний актуальный период для каждого SKU.
        
        Returns:
            Словарь {sku: cost_price}
        """
        
        query = """
            SELECT DISTINCT ON (sku) sku, cost_price
            FROM cfo_v_cost_prices
            WHERE cost_price IS NOT NULL
            ORDER BY sku, period DESC
        """
        
        rows = await self.db.fetch(query)
        self._cost_cache = {
            row["sku"]: float(row["cost_price"])
            for row in rows
        }
        
        return self._cost_cache
    
    async def check_data_freshness(self) -> dict:
        """Проверка свежести данных 1C_account_turns_90.
        
        Returns:
            Словарь с информацией о свежести данных
        """
        
        query = """
            SELECT 
                MAX(data_freshness) AS last_loaded,
                MAX(period) AS last_period,
                COUNT(DISTINCT sku) AS sku_count
            FROM cfo_v_cost_prices
        """
        
        row = await self.db.fetchrow(query)
        
        return {
            "last_loaded": row["last_loaded"],
            "last_period": row["last_period"],
            "sku_count": row["sku_count"],
            "is_stale": (
                row["last_loaded"] is not None 
                and (datetime.now() - row["last_loaded"]).days > 10
            )
        }
    
    async def enrich_transactions(
        self, 
        transactions: List["NormalizedTransaction"]
    ) -> List["NormalizedTransaction"]:
        """Обогащение транзакций себестоимостью.
        
        Связывание по полю SKU.
        
        Args:
            transactions: Список транзакций
            
        Returns:
            Транзакции с заполненным cost_price
        """
        
        # Предзагрузка кэша для batch-обработки
        if not self._cost_cache:
            await self.get_cost_map()
        
        for tx in transactions:
            if tx.sku:
                tx.cost_price = self._cost_cache.get(tx.sku)
        
        return transactions

2.8 Normalizer

2.8.1 Унифицированная структура транзакции

python
@dataclass
class NormalizedTransaction:
    """Нормализованная транзакция для всех маркетплейсов."""
    
    # Идентификация
    external_id: str              # Уникальный ID из источника
    marketplace: str              # wb, ozon, ym
    source: str                   # api, excel
    
    # Товар
    sku: str                      # Артикул продавца (основной идентификатор)
    barcode: Optional[str]        # Штрихкод (только из API/Excel МП, не из 1С)
    nm_id: Optional[int]          # ID номенклатуры МП
    product_name: Optional[str]   # Название товара
    category: Optional[str]       # Категория
    brand_name: Optional[str]     # Бренд
    size: Optional[str]           # Размер
    
    # Финансы
    revenue: float                # Выручка (цена продажи)
    cost_price: Optional[float]   # Себестоимость (из 1C_*, по SKU)
    commission: float             # Комиссия МП
    logistics: float              # Логистика до покупателя
    return_logistics: float       # Обратная логистика
    storage: float                # Хранение
    advertising: float            # Реклама (если есть)
    payout: float                 # К выплате
    
    # Расчётные поля
    gross_profit: Optional[float] # Валовая прибыль (revenue - cost_price)
    net_profit: Optional[float]   # Чистая прибыль (payout - cost_price)
    
    # Мета
    sale_date: date               # Дата продажи
    operation_type: str           # Тип операции
    quantity: int                 # Количество
    
    # Служебные
    created_at: datetime          # Дата создания записи
    source_file: Optional[str]    # Имя исходного файла
    
    def calculate_profits(self):
        """Расчёт прибыли при наличии себестоимости."""
        if self.cost_price is not None:
            self.gross_profit = self.revenue - (self.cost_price * self.quantity)
            self.net_profit = self.payout - (self.cost_price * self.quantity)

2.8.2 TransactionNormalizer

python
class TransactionNormalizer:
    """Нормализация транзакций из разных источников."""
    
    def __init__(self, cost_service: CostPriceService):
        self.cost_service = cost_service
    
    async def normalize_wb(self, tx: WBTransaction) -> NormalizedTransaction:
        """Нормализация транзакции WB."""
        
        cost_price = await self.cost_service.get_cost_price(tx.sku)
        
        normalized = NormalizedTransaction(
            external_id=tx.external_id,
            marketplace="wb",
            source="api",
            sku=tx.sku,
            barcode=tx.barcode,
            nm_id=tx.nm_id,
            product_name=None,
            category=tx.category,
            brand_name=tx.brand_name,
            size=tx.size,
            revenue=tx.revenue,
            cost_price=cost_price,
            commission=tx.commission,
            logistics=tx.logistics,
            return_logistics=tx.return_logistics,
            storage=tx.storage,
            advertising=0.0,
            payout=tx.payout,
            gross_profit=None,
            net_profit=None,
            sale_date=tx.sale_date.date(),
            operation_type=tx.operation_type,
            quantity=tx.quantity,
            created_at=datetime.now(),
            source_file=None
        )
        
        normalized.calculate_profits()
        return normalized
    
    async def normalize_ozon(self, tx: OzonTransaction) -> NormalizedTransaction:
        """Нормализация транзакции Ozon."""
        
        cost_price = await self.cost_service.get_cost_price(tx.sku)
        
        normalized = NormalizedTransaction(
            external_id=tx.external_id,
            marketplace="ozon",
            source="api",
            sku=tx.sku,
            barcode=None,
            nm_id=tx.ozon_sku,
            product_name=tx.product_name,
            category=None,
            brand_name=None,
            size=None,
            revenue=tx.revenue,
            cost_price=cost_price,
            commission=tx.commission,
            logistics=tx.logistics,
            return_logistics=tx.return_logistics,
            storage=tx.storage,
            advertising=0.0,
            payout=tx.payout,
            gross_profit=None,
            net_profit=None,
            sale_date=tx.sale_date.date(),
            operation_type=tx.operation_type,
            quantity=tx.quantity,
            created_at=datetime.now(),
            source_file=None
        )
        
        normalized.calculate_profits()
        return normalized

2.9 Validator

2.9.1 Правила валидации

ПравилоОписаниеДействие
Required fieldssku, marketplace, sale_date обязательныReject
Revenue >= 0Выручка не может быть отрицательнойReject
Valid marketplacemarketplace IN (wb, ozon, ym)Reject
Valid dateДата не в будущемReject
Duplicate checkПроверка по external_id + marketplaceSkip
Cost price warningСебестоимость не найдена в 1C_*Warning (не reject)

2.9.2 Реализация

python
@dataclass
class ValidationResult:
    """Результат валидации."""
    is_valid: bool
    errors: List[str]
    warnings: List[str]


class TransactionValidator:
    """Валидация транзакций."""
    
    VALID_MARKETPLACES = {"wb", "ozon", "ym"}
    
    def validate(self, tx: NormalizedTransaction) -> ValidationResult:
        """Валидация транзакции."""
        
        errors = []
        warnings = []
        
        if not tx.sku:
            errors.append("SKU is required")
        if not tx.marketplace:
            errors.append("Marketplace is required")
        if not tx.sale_date:
            errors.append("Sale date is required")
        if tx.marketplace not in self.VALID_MARKETPLACES:
            errors.append(f"Invalid marketplace: {tx.marketplace}")
        if tx.revenue < 0:
            errors.append(f"Revenue cannot be negative: {tx.revenue}")
        if tx.sale_date and tx.sale_date > date.today():
            errors.append(f"Sale date cannot be in future: {tx.sale_date}")
        if tx.cost_price is None:
            warnings.append(f"Cost price not found in 1C_* for SKU: {tx.sku}")
        
        return ValidationResult(
            is_valid=len(errors) == 0,
            errors=errors,
            warnings=warnings
        )
    
    def validate_batch(
        self,
        transactions: List[NormalizedTransaction]
    ) -> Tuple[List[NormalizedTransaction], List[Tuple[NormalizedTransaction, List[str]]]]:
        """Валидация пакета транзакций."""
        
        valid = []
        invalid = []
        warnings_count = 0
        
        for tx in transactions:
            result = self.validate(tx)
            
            if result.is_valid:
                valid.append(tx)
                if result.warnings:
                    warnings_count += 1
                    for w in result.warnings:
                        logger.warning(f"Transaction {tx.external_id}: {w}")
            else:
                invalid.append((tx, result.errors))
        
        logger.info(
            f"Validated: {len(valid)} valid, {len(invalid)} invalid, "
            f"{warnings_count} with warnings"
        )
        return valid, invalid

2.10 Deduplicator

2.10.1 Стратегия дедупликации

ИсточникКлюч дедупликацииОписание
APIexternal_id + marketplaceУникальный ID операции
Excelsku + sale_date + revenue + marketplaceКомбинация полей

2.10.2 Реализация

python
class TransactionDeduplicator:
    """Дедупликация транзакций."""
    
    def __init__(self, db_session):
        self.db = db_session
    
    async def deduplicate(
        self,
        transactions: List[NormalizedTransaction]
    ) -> List[NormalizedTransaction]:
        """Удаление дубликатов."""
        
        existing_ids = await self._get_existing_ids(
            [t.external_id for t in transactions if t.external_id]
        )
        
        new_transactions = [
            t for t in transactions
            if t.external_id not in existing_ids
        ]
        
        excel_transactions = [t for t in new_transactions if t.source == "excel"]
        api_transactions = [t for t in new_transactions if t.source == "api"]
        
        unique_excel = self._deduplicate_excel(excel_transactions)
        
        return api_transactions + unique_excel
    
    async def _get_existing_ids(self, external_ids: List[str]) -> set:
        """Получение существующих ID из БД."""
        
        if not external_ids:
            return set()
        
        query = """
            SELECT external_id 
            FROM cfo_transactions 
            WHERE external_id = ANY($1)
        """
        
        rows = await self.db.fetch(query, external_ids)
        return {row["external_id"] for row in rows}
    
    def _deduplicate_excel(
        self,
        transactions: List[NormalizedTransaction]
    ) -> List[NormalizedTransaction]:
        """Дедупликация Excel-транзакций по комбинации полей."""
        
        seen = set()
        unique = []
        
        for tx in transactions:
            key = (tx.sku, tx.sale_date, tx.revenue, tx.marketplace)
            if key not in seen:
                seen.add(key)
                unique.append(tx)
        
        return unique

2.11 Ingestion Service

2.11.1 Оркестрация импорта

python
class DataIngestionService:
    """Сервис импорта финансовых данных.
    
    v2.0: Себестоимость из 1С читается из 1C_* через VIEW.
    Файловый импорт себестоимости удалён.
    """
    
    def __init__(
        self,
        wb_adapter: WBFinanceAdapter,
        ozon_adapter: OzonFinanceAdapter,
        ym_adapter: YMFinanceAdapter,
        excel_parser: "ExcelParser",
        cost_service: CostPriceService,
        normalizer: TransactionNormalizer,
        validator: TransactionValidator,
        deduplicator: TransactionDeduplicator,
        db_session
    ):
        self.wb = wb_adapter
        self.ozon = ozon_adapter
        self.ym = ym_adapter
        self.excel_parser = excel_parser
        self.cost_service = cost_service
        self.normalizer = normalizer
        self.validator = validator
        self.deduplicator = deduplicator
        self.db = db_session
    
    async def import_all(self, date_from: date, date_to: date) -> "ImportResult":
        """Полный импорт из всех источников.
        
        Себестоимость из 1С уже доступна через VIEW 
        (1C_account_turns_90 → cfo_v_cost_prices).
        """
        
        result = ImportResult()
        
        # Предзагрузка кэша себестоимости из 1C_*
        await self.cost_service.get_cost_map()
        
        # Проверка свежести данных 1C_*
        freshness = await self.cost_service.check_data_freshness()
        if freshness["is_stale"]:
            logger.warning(
                f"1C_account_turns_90 data is stale! "
                f"Last loaded: {freshness['last_loaded']}"
            )
        
        # API импорт
        result.merge(await self.import_wb_api(date_from, date_to))
        result.merge(await self.import_ozon_api(date_from, date_to))
        result.merge(await self.import_ym_api(date_from, date_to))
        
        # Excel импорт (файлы из ЛК маркетплейсов)
        result.merge(await self.import_excel_files())
        
        return result
    
    async def import_wb_api(
        self,
        date_from: date,
        date_to: date
    ) -> "ImportResult":
        """Импорт из WB API."""
        
        try:
            raw = await self.wb.get_transactions(date_from, date_to)
            normalized = [await self.normalizer.normalize_wb(t) for t in raw]
            valid, invalid = self.validator.validate_batch(normalized)
            unique = await self.deduplicator.deduplicate(valid)
            
            await self._save_transactions(unique)
            
            return ImportResult(
                source="wb_api",
                total=len(raw),
                valid=len(valid),
                invalid=len(invalid),
                duplicates=len(valid) - len(unique),
                saved=len(unique)
            )
        
        except Exception as e:
            logger.error(f"WB API import failed: {e}")
            return ImportResult(source="wb_api", error=str(e))
    
    async def _save_transactions(
        self,
        transactions: List[NormalizedTransaction]
    ):
        """Сохранение транзакций в БД."""
        
        if not transactions:
            return
        
        query = """
            INSERT INTO cfo_transactions (
                external_id, marketplace, source, sku, barcode, nm_id,
                product_name, category, brand_name, size,
                revenue, cost_price, commission, logistics, return_logistics,
                storage, advertising, payout, gross_profit, net_profit,
                sale_date, operation_type, quantity, source_file, created_at
            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
                      $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
                      $21, $22, $23, $24, $25)
        """
        
        for tx in transactions:
            await self.db.execute(
                query,
                tx.external_id, tx.marketplace, tx.source,
                tx.sku, tx.barcode, tx.nm_id,
                tx.product_name, tx.category, tx.brand_name, tx.size,
                tx.revenue, tx.cost_price, tx.commission,
                tx.logistics, tx.return_logistics,
                tx.storage, tx.advertising, tx.payout,
                tx.gross_profit, tx.net_profit,
                tx.sale_date, tx.operation_type, tx.quantity,
                tx.source_file, tx.created_at
            )


@dataclass
class ImportResult:
    """Результат импорта."""
    source: str = ""
    total: int = 0
    valid: int = 0
    invalid: int = 0
    duplicates: int = 0
    saved: int = 0
    error: Optional[str] = None
    
    def merge(self, other: "ImportResult"):
        """Объединение результатов."""
        self.total += other.total
        self.valid += other.valid
        self.invalid += other.invalid
        self.duplicates += other.duplicates
        self.saved += other.saved

2.12 История изменений

v2.0 (Февраль 2026)

КомпонентБыло (v1.1)Стало (v2.0)
Источник себестоимостиCSV/XLSX файлы из 1С → /data/inbox/cfo/costs/PostgreSQL 1C_account_turns_90 (через Экстрактор данных 1С)
CostPriceParserПарсер CSV/XLS файловУдалён — заменён на VIEW cfo_v_cost_prices
CSVParserПарсер CSV из 1СУдалён
Директория /data/inbox/cfo/costs/Каталог для файлов себестоимостиУдалён
CostPriceServiceЧитал из файлов → cfo_cost_pricesЧитает из VIEW cfo_v_cost_prices (поверх 1C_*)
IngestionService.import_cost_prices()Файловый импорт с архивированиемУдалён — данные уже в PostgreSQL
Проверка свежестиОтсутствовалаcheck_data_freshness() — алерт при устаревших данных 1C_*
Мониторинг 1C_*ОтсутствовалCelery-задача (см. Раздел 7)

v1.1 (Январь 2026)

КомпонентБыло (v1.0)Стало (v1.1)
Идентификатор товара из 1СШтрихкод (barcode)Артикул (sku)
Поле barcode в CostPriceRecordОбязательноеУдалено
Номенклатурная группаПрисутствовалаУдалена (не ведётся в 1С)
Связывание с транзакциямиПо barcodeПо sku
Конфигурация 1СНе указана1С:Комплексная автоматизация 2

Документ подготовлен: Февраль 2026
Версия: 2.0
Статус: Черновик

Документация ADOLF Platform