Skip to main content

ADOLF CFO — Раздел 3: AI Pipeline

Проект: Финансовый учёт и управленческая аналитика
Модуль: CFO
Версия: 1.0
Дата: Январь 2026

3.1 Назначение

Раздел описывает алгоритмы расчёта P&L, ABC-анализа, детекции аномалий и генерации AI-инсайтов.

Компоненты AI Pipeline

КомпонентОписаниеAI-модель
Cost MapperСопоставление SKU с себестоимостью
P&L CalculatorРасчёт маржинальности
ABC AnalyzerКлассификация SKU
Anomaly DetectorВыявление аномалий
Insight GeneratorТекстовые выводыClaude Opus 4.5
Custom Report GeneratorКастомные отчётыClaude Opus 4.5

3.2 Cost Mapper

3.2.1 Алгоритм маппинга

3.2.2 Приоритет маппинга

ПриоритетКлючТочностьОписание
1barcodeВысокаяТочное соответствие штрихкода
2skuВысокаяАртикул продавца
3nm_id + sizeСредняяНоменклатура МП + размер
4nm_idНизкаяТолько номенклатура (усреднённая COGS)

3.2.3 Реализация

from typing import Optional, Dict
from dataclasses import dataclass

@dataclass
class CostMapping:
    """Результат маппинга себестоимости."""
    cost_price: float
    mapping_key: str
    confidence: str
    source_date: date


class CostMapper:
    """Сопоставление транзакций с себестоимостью."""
    
    def __init__(self, db_session):
        self.db = db_session
        self._cache: Dict[str, CostMapping] = {}
    
    async def load_cost_prices(self):
        """Загрузка справочника себестоимости в кэш."""
        
        query = """
            SELECT barcode, sku, cost_price, update_date
            FROM cfo_cost_prices
            WHERE is_active = TRUE
        """
        
        rows = await self.db.fetch(query)
        
        for row in rows:
            if row["barcode"]:
                self._cache[f"barcode:{row['barcode']}"] = CostMapping(
                    cost_price=row["cost_price"],
                    mapping_key="barcode",
                    confidence="high",
                    source_date=row["update_date"]
                )
            
            if row["sku"]:
                self._cache[f"sku:{row['sku']}"] = CostMapping(
                    cost_price=row["cost_price"],
                    mapping_key="sku",
                    confidence="high",
                    source_date=row["update_date"]
                )
    
    def map_cost(self, transaction) -> Optional[CostMapping]:
        """Получение себестоимости для транзакции."""
        
        if transaction.barcode:
            key = f"barcode:{transaction.barcode}"
            if key in self._cache:
                return self._cache[key]
        
        if transaction.sku:
            key = f"sku:{transaction.sku}"
            if key in self._cache:
                return self._cache[key]
        
        if transaction.nm_id and transaction.size:
            key = f"nm_id_size:{transaction.nm_id}:{transaction.size}"
            if key in self._cache:
                return self._cache[key]
        
        return None

3.3 P&L Calculator

3.3.1 Формула расчёта

Revenue (Выручка)           = sale_price × quantity

COGS (Себестоимость)        = cost_price × quantity

Gross Profit                = Revenue - COGS
Gross Margin %              = Gross Profit / Revenue × 100

Operating Expenses:
  - Commission              = комиссия МП
  - Logistics               = логистика до покупателя
  - Return Logistics        = обратная логистика
  - Storage                 = хранение на складе МП
  - Advertising             = реклама на МП

Total Expenses              = COGS + Commission + Logistics + 
                              Return Logistics + Storage + Advertising

Net Profit                  = Revenue - Total Expenses
Net Margin %                = Net Profit / Revenue × 100

3.3.2 Структура результата

@dataclass
class PnLResult:
    """Результат расчёта P&L."""
    
    # Идентификация
    sku: str
    marketplace: str
    period_start: date
    period_end: date
    
    # Количественные показатели
    quantity_sold: int
    quantity_returned: int
    net_quantity: int
    
    # Выручка
    gross_revenue: float
    discounts: float
    returns_amount: float
    net_revenue: float
    
    # Себестоимость
    cogs: float
    cogs_mapped: bool
    
    # Расходы
    commission: float
    logistics: float
    return_logistics: float
    storage: float
    advertising: float
    total_expenses: float
    
    # Прибыль
    gross_profit: float
    gross_margin_pct: float
    net_profit: float
    net_margin_pct: float
    
    # Мета
    calculated_at: datetime

3.3.3 Реализация калькулятора

class PnLCalculator:
    """Расчёт P&L по транзакциям."""
    
    def __init__(self, cost_mapper: CostMapper):
        self.cost_mapper = cost_mapper
    
    def calculate_sku_pnl(
        self,
        transactions: List[NormalizedTransaction],
        sku: str,
        period_start: date,
        period_end: date
    ) -> PnLResult:
        """Расчёт P&L для одного SKU."""
        
        sku_transactions = [
            t for t in transactions
            if t.sku == sku and period_start <= t.sale_date <= period_end
        ]
        
        if not sku_transactions:
            return None
        
        # Агрегация
        quantity_sold = sum(
            t.quantity for t in sku_transactions
            if t.operation_type in ["Продажа", "DELIVERED"]
        )
        quantity_returned = sum(
            t.quantity for t in sku_transactions
            if t.operation_type in ["Возврат", "RETURNED"]
        )
        
        gross_revenue = sum(t.revenue for t in sku_transactions if t.revenue > 0)
        returns_amount = abs(sum(t.revenue for t in sku_transactions if t.revenue < 0))
        net_revenue = gross_revenue - returns_amount
        
        commission = sum(t.commission for t in sku_transactions)
        logistics = sum(t.logistics for t in sku_transactions)
        return_logistics = sum(t.return_logistics for t in sku_transactions)
        storage = sum(t.storage for t in sku_transactions)
        advertising = sum(t.advertising for t in sku_transactions)
        
        # Себестоимость
        cost_mapping = self.cost_mapper.map_cost(sku_transactions[0])
        if cost_mapping:
            cogs = cost_mapping.cost_price * (quantity_sold - quantity_returned)
            cogs_mapped = True
        else:
            cogs = 0
            cogs_mapped = False
        
        # Расчёт прибыли
        total_expenses = cogs + commission + logistics + return_logistics + storage + advertising
        gross_profit = net_revenue - cogs
        net_profit = net_revenue - total_expenses
        
        gross_margin_pct = (gross_profit / net_revenue * 100) if net_revenue > 0 else 0
        net_margin_pct = (net_profit / net_revenue * 100) if net_revenue > 0 else 0
        
        return PnLResult(
            sku=sku,
            marketplace=sku_transactions[0].marketplace,
            period_start=period_start,
            period_end=period_end,
            quantity_sold=quantity_sold,
            quantity_returned=quantity_returned,
            net_quantity=quantity_sold - quantity_returned,
            gross_revenue=gross_revenue,
            discounts=0,
            returns_amount=returns_amount,
            net_revenue=net_revenue,
            cogs=cogs,
            cogs_mapped=cogs_mapped,
            commission=commission,
            logistics=logistics,
            return_logistics=return_logistics,
            storage=storage,
            advertising=advertising,
            total_expenses=total_expenses,
            gross_profit=gross_profit,
            gross_margin_pct=round(gross_margin_pct, 2),
            net_profit=net_profit,
            net_margin_pct=round(net_margin_pct, 2),
            calculated_at=datetime.utcnow()
        )
    
    def calculate_aggregated_pnl(
        self,
        pnl_results: List[PnLResult],
        group_by: str
    ) -> Dict[str, PnLResult]:
        """Агрегация P&L по группам."""
        
        groups = {}
        
        for pnl in pnl_results:
            if group_by == "category":
                key = pnl.category or "Unknown"
            elif group_by == "brand":
                key = pnl.brand_id or "Unknown"
            elif group_by == "marketplace":
                key = pnl.marketplace
            else:
                key = "total"
            
            if key not in groups:
                groups[key] = []
            groups[key].append(pnl)
        
        aggregated = {}
        for key, items in groups.items():
            aggregated[key] = self._aggregate_pnl_list(items, key)
        
        return aggregated
    
    def _aggregate_pnl_list(self, items: List[PnLResult], group_name: str) -> PnLResult:
        """Агрегация списка P&L в один результат."""
        
        net_revenue = sum(p.net_revenue for p in items)
        cogs = sum(p.cogs for p in items)
        total_expenses = sum(p.total_expenses for p in items)
        net_profit = sum(p.net_profit for p in items)
        
        return PnLResult(
            sku=group_name,
            marketplace="all",
            period_start=min(p.period_start for p in items),
            period_end=max(p.period_end for p in items),
            quantity_sold=sum(p.quantity_sold for p in items),
            quantity_returned=sum(p.quantity_returned for p in items),
            net_quantity=sum(p.net_quantity for p in items),
            gross_revenue=sum(p.gross_revenue for p in items),
            discounts=sum(p.discounts for p in items),
            returns_amount=sum(p.returns_amount for p in items),
            net_revenue=net_revenue,
            cogs=cogs,
            cogs_mapped=all(p.cogs_mapped for p in items),
            commission=sum(p.commission for p in items),
            logistics=sum(p.logistics for p in items),
            return_logistics=sum(p.return_logistics for p in items),
            storage=sum(p.storage for p in items),
            advertising=sum(p.advertising for p in items),
            total_expenses=total_expenses,
            gross_profit=net_revenue - cogs,
            gross_margin_pct=round((net_revenue - cogs) / net_revenue * 100, 2) if net_revenue > 0 else 0,
            net_profit=net_profit,
            net_margin_pct=round(net_profit / net_revenue * 100, 2) if net_revenue > 0 else 0,
            calculated_at=datetime.utcnow()
        )

3.4 ABC Analyzer

3.4.1 Алгоритм классификации

3.4.2 Пороги классификации

КлассУсловиеНакопительная доляОписание
ATop performers0% — 80%Ключевые SKU, генерирующие 80% прибыли
BMiddle performers80% — 95%Стабильные SKU со средним вкладом
CLow performers95% — 100%Аутсайдеры с минимальным вкладом
DLoss makersNet Profit < 0Убыточные SKU

3.4.3 Структура результата

@dataclass
class ABCResult:
    """Результат ABC-анализа для SKU."""
    sku: str
    marketplace: str
    abc_class: str
    net_profit: float
    net_margin_pct: float
    cumulative_profit: float
    cumulative_pct: float
    rank: int
    total_skus_in_class: int


@dataclass
class ABCSummary:
    """Сводка ABC-анализа."""
    period_start: date
    period_end: date
    total_skus: int
    total_profit: float
    
    class_a_count: int
    class_a_profit: float
    class_a_pct: float
    
    class_b_count: int
    class_b_profit: float
    class_b_pct: float
    
    class_c_count: int
    class_c_profit: float
    class_c_pct: float
    
    class_d_count: int
    class_d_loss: float
    
    calculated_at: datetime

3.4.4 Реализация

class ABCAnalyzer:
    """ABC-анализ SKU."""
    
    def __init__(self, thresholds: dict = None):
        self.thresholds = thresholds or {
            "a": 80,
            "b": 95
        }
    
    def analyze(self, pnl_results: List[PnLResult]) -> Tuple[List[ABCResult], ABCSummary]:
        """Выполнение ABC-анализа."""
        
        # Разделение на прибыльные и убыточные
        profitable = [p for p in pnl_results if p.net_profit > 0]
        loss_makers = [p for p in pnl_results if p.net_profit <= 0]
        
        # Сортировка прибыльных по убыванию прибыли
        profitable.sort(key=lambda x: x.net_profit, reverse=True)
        
        total_profit = sum(p.net_profit for p in profitable)
        
        results = []
        cumulative_profit = 0
        
        # Классификация прибыльных SKU
        for i, pnl in enumerate(profitable):
            cumulative_profit += pnl.net_profit
            cumulative_pct = (cumulative_profit / total_profit * 100) if total_profit > 0 else 0
            
            if cumulative_pct <= self.thresholds["a"]:
                abc_class = "A"
            elif cumulative_pct <= self.thresholds["b"]:
                abc_class = "B"
            else:
                abc_class = "C"
            
            results.append(ABCResult(
                sku=pnl.sku,
                marketplace=pnl.marketplace,
                abc_class=abc_class,
                net_profit=pnl.net_profit,
                net_margin_pct=pnl.net_margin_pct,
                cumulative_profit=cumulative_profit,
                cumulative_pct=round(cumulative_pct, 2),
                rank=i + 1,
                total_skus_in_class=0
            ))
        
        # Добавление убыточных (класс D)
        for pnl in loss_makers:
            results.append(ABCResult(
                sku=pnl.sku,
                marketplace=pnl.marketplace,
                abc_class="D",
                net_profit=pnl.net_profit,
                net_margin_pct=pnl.net_margin_pct,
                cumulative_profit=0,
                cumulative_pct=0,
                rank=0,
                total_skus_in_class=0
            ))
        
        # Подсчёт SKU в каждом классе
        class_counts = {"A": 0, "B": 0, "C": 0, "D": 0}
        for r in results:
            class_counts[r.abc_class] += 1
        
        for r in results:
            r.total_skus_in_class = class_counts[r.abc_class]
        
        # Формирование сводки
        summary = self._create_summary(results, pnl_results)
        
        return results, summary
    
    def _create_summary(
        self,
        abc_results: List[ABCResult],
        pnl_results: List[PnLResult]
    ) -> ABCSummary:
        """Создание сводки ABC-анализа."""
        
        class_a = [r for r in abc_results if r.abc_class == "A"]
        class_b = [r for r in abc_results if r.abc_class == "B"]
        class_c = [r for r in abc_results if r.abc_class == "C"]
        class_d = [r for r in abc_results if r.abc_class == "D"]
        
        total_profit = sum(r.net_profit for r in abc_results if r.net_profit > 0)
        
        return ABCSummary(
            period_start=min(p.period_start for p in pnl_results),
            period_end=max(p.period_end for p in pnl_results),
            total_skus=len(abc_results),
            total_profit=total_profit,
            
            class_a_count=len(class_a),
            class_a_profit=sum(r.net_profit for r in class_a),
            class_a_pct=round(sum(r.net_profit for r in class_a) / total_profit * 100, 2) if total_profit > 0 else 0,
            
            class_b_count=len(class_b),
            class_b_profit=sum(r.net_profit for r in class_b),
            class_b_pct=round(sum(r.net_profit for r in class_b) / total_profit * 100, 2) if total_profit > 0 else 0,
            
            class_c_count=len(class_c),
            class_c_profit=sum(r.net_profit for r in class_c),
            class_c_pct=round(sum(r.net_profit for r in class_c) / total_profit * 100, 2) if total_profit > 0 else 0,
            
            class_d_count=len(class_d),
            class_d_loss=abs(sum(r.net_profit for r in class_d)),
            
            calculated_at=datetime.utcnow()
        )

3.5 Anomaly Detector

3.5.1 Типы аномалий

ТипОписаниеПорог по умолчанию
revenue_spikeРезкий рост выручки> +20% от среднего
revenue_dropРезкое падение выручки> -20% от среднего
expense_spikeРезкий рост расходов> +20% от среднего
margin_dropПадение маржи> -10 п.п.
new_loss_makerНовый убыточный SKUNet Profit < 0 (был > 0)

3.5.2 Алгоритм детекции

3.5.3 Реализация

@dataclass
class Anomaly:
    """Обнаруженная аномалия."""
    anomaly_type: str
    sku: str
    marketplace: str
    current_value: float
    expected_value: float
    deviation_pct: float
    severity: str
    description: str
    detected_at: datetime


class AnomalyDetector:
    """Детектор аномалий в финансовых данных."""
    
    def __init__(self, db_session, thresholds: dict = None):
        self.db = db_session
        self.thresholds = thresholds or {
            "revenue_change": 20,
            "expense_change": 20,
            "margin_drop": 10
        }
    
    async def detect(
        self,
        current_pnl: List[PnLResult],
        lookback_days: int = 30
    ) -> List[Anomaly]:
        """Обнаружение аномалий."""
        
        anomalies = []
        
        # Загрузка исторических данных
        historical = await self._load_historical(lookback_days)
        
        for pnl in current_pnl:
            sku_history = [h for h in historical if h.sku == pnl.sku]
            
            if not sku_history:
                continue
            
            # Расчёт средних
            avg_revenue = sum(h.net_revenue for h in sku_history) / len(sku_history)
            avg_expenses = sum(h.total_expenses for h in sku_history) / len(sku_history)
            avg_margin = sum(h.net_margin_pct for h in sku_history) / len(sku_history)
            
            # Проверка выручки
            if avg_revenue > 0:
                revenue_change = (pnl.net_revenue - avg_revenue) / avg_revenue * 100
                
                if abs(revenue_change) > self.thresholds["revenue_change"]:
                    anomalies.append(Anomaly(
                        anomaly_type="revenue_spike" if revenue_change > 0 else "revenue_drop",
                        sku=pnl.sku,
                        marketplace=pnl.marketplace,
                        current_value=pnl.net_revenue,
                        expected_value=avg_revenue,
                        deviation_pct=round(revenue_change, 2),
                        severity="warning" if abs(revenue_change) < 50 else "critical",
                        description=f"Выручка изменилась на {revenue_change:+.1f}%",
                        detected_at=datetime.utcnow()
                    ))
            
            # Проверка расходов
            if avg_expenses > 0:
                expense_change = (pnl.total_expenses - avg_expenses) / avg_expenses * 100
                
                if expense_change > self.thresholds["expense_change"]:
                    anomalies.append(Anomaly(
                        anomaly_type="expense_spike",
                        sku=pnl.sku,
                        marketplace=pnl.marketplace,
                        current_value=pnl.total_expenses,
                        expected_value=avg_expenses,
                        deviation_pct=round(expense_change, 2),
                        severity="warning",
                        description=f"Расходы выросли на {expense_change:.1f}%",
                        detected_at=datetime.utcnow()
                    ))
            
            # Проверка маржи
            margin_drop = avg_margin - pnl.net_margin_pct
            
            if margin_drop > self.thresholds["margin_drop"]:
                anomalies.append(Anomaly(
                    anomaly_type="margin_drop",
                    sku=pnl.sku,
                    marketplace=pnl.marketplace,
                    current_value=pnl.net_margin_pct,
                    expected_value=avg_margin,
                    deviation_pct=round(margin_drop, 2),
                    severity="warning",
                    description=f"Маржа упала на {margin_drop:.1f} п.п.",
                    detected_at=datetime.utcnow()
                ))
            
            # Проверка на новый убыток
            was_profitable = all(h.net_profit > 0 for h in sku_history)
            if was_profitable and pnl.net_profit < 0:
                anomalies.append(Anomaly(
                    anomaly_type="new_loss_maker",
                    sku=pnl.sku,
                    marketplace=pnl.marketplace,
                    current_value=pnl.net_profit,
                    expected_value=sum(h.net_profit for h in sku_history) / len(sku_history),
                    deviation_pct=0,
                    severity="critical",
                    description=f"SKU стал убыточным: {pnl.net_profit:.2f} ₽",
                    detected_at=datetime.utcnow()
                ))
        
        return anomalies
    
    async def _load_historical(self, days: int) -> List[PnLResult]:
        """Загрузка исторических P&L."""
        
        query = """
            SELECT * FROM cfo_pnl_daily
            WHERE sale_date >= CURRENT_DATE - INTERVAL '%s days'
        """
        
        rows = await self.db.fetch(query, days)
        return [PnLResult(**row) for row in rows]

3.6 AI Insight Generator

3.6.1 Назначение

Генерация текстовых выводов и рекомендаций на основе финансовых данных с использованием Claude Opus 4.5.

3.6.2 Типы инсайтов

ТипОписаниеТриггер
summaryОбщая сводка по периодуПо запросу
problemsВыявленные проблемыНаличие класса D или аномалий
recommendationsРекомендации по улучшениюПо запросу
trendsАнализ трендовДанные за 30+ дней

3.6.3 Промпт для AI

INSIGHT_SYSTEM_PROMPT = """
Ты финансовый аналитик e-commerce компании, специализирующейся на продаже одежды на маркетплейсах (Wildberries, Ozon, Яндекс.Маркет).

Твоя задача — анализировать финансовые данные и формировать краткие, конкретные выводы и рекомендации.

Правила:
1. Используй конкретные цифры и проценты
2. Указывай артикулы (SKU) при упоминании товаров
3. Давай actionable рекомендации
4. Приоритизируй проблемы по влиянию на прибыль
5. Пиши на русском языке
6. Будь краток — максимум 500 слов
"""

INSIGHT_USER_PROMPT = """
Проанализируй финансовые данные за период {period_start}{period_end}:

## Сводка P&L
- Выручка: {total_revenue:,.0f}
- Себестоимость: {total_cogs:,.0f}
- Расходы МП: {total_mp_expenses:,.0f}
- Чистая прибыль: {total_net_profit:,.0f}
- Маржинальность: {avg_margin:.1f}%

## ABC-анализ
- Класс A: {class_a_count} SKU ({class_a_profit:,.0f} ₽, {class_a_pct:.1f}% прибыли)
- Класс B: {class_b_count} SKU ({class_b_profit:,.0f} ₽)
- Класс C: {class_c_count} SKU ({class_c_profit:,.0f} ₽)
- Класс D (убыточные): {class_d_count} SKU (убыток {class_d_loss:,.0f} ₽)

## Топ-5 прибыльных SKU
{top_5_profitable}

## Топ-5 убыточных SKU
{top_5_loss_makers}

## Выявленные аномалии
{anomalies}

Сформируй:
1. Краткое резюме (2-3 предложения)
2. Ключевые проблемы
3. Рекомендации по убыточным SKU
4. Возможности для роста маржи
"""

3.6.4 Реализация

import anthropic

class InsightGenerator:
    """Генератор AI-инсайтов."""
    
    def __init__(self, api_key: str):
        self.client = anthropic.Anthropic(api_key=api_key)
        self.model = "claude-opus-4-5-20250514"
    
    async def generate_insights(
        self,
        pnl_summary: dict,
        abc_summary: ABCSummary,
        top_profitable: List[PnLResult],
        top_loss_makers: List[PnLResult],
        anomalies: List[Anomaly]
    ) -> str:
        """Генерация инсайтов."""
        
        # Форматирование данных для промпта
        top_5_profitable = "\n".join([
            f"- {p.sku}: прибыль {p.net_profit:,.0f} ₽, маржа {p.net_margin_pct:.1f}%"
            for p in top_profitable[:5]
        ])
        
        top_5_loss_makers = "\n".join([
            f"- {p.sku}: убыток {abs(p.net_profit):,.0f} ₽, маржа {p.net_margin_pct:.1f}%"
            for p in top_loss_makers[:5]
        ])
        
        anomalies_text = "\n".join([
            f"- {a.sku}: {a.description}"
            for a in anomalies[:10]
        ]) or "Аномалий не обнаружено"
        
        user_prompt = INSIGHT_USER_PROMPT.format(
            period_start=abc_summary.period_start,
            period_end=abc_summary.period_end,
            total_revenue=pnl_summary["total_revenue"],
            total_cogs=pnl_summary["total_cogs"],
            total_mp_expenses=pnl_summary["total_mp_expenses"],
            total_net_profit=pnl_summary["total_net_profit"],
            avg_margin=pnl_summary["avg_margin"],
            class_a_count=abc_summary.class_a_count,
            class_a_profit=abc_summary.class_a_profit,
            class_a_pct=abc_summary.class_a_pct,
            class_b_count=abc_summary.class_b_count,
            class_b_profit=abc_summary.class_b_profit,
            class_c_count=abc_summary.class_c_count,
            class_c_profit=abc_summary.class_c_profit,
            class_d_count=abc_summary.class_d_count,
            class_d_loss=abc_summary.class_d_loss,
            top_5_profitable=top_5_profitable,
            top_5_loss_makers=top_5_loss_makers,
            anomalies=anomalies_text
        )
        
        response = self.client.messages.create(
            model=self.model,
            max_tokens=1500,
            system=INSIGHT_SYSTEM_PROMPT,
            messages=[
                {"role": "user", "content": user_prompt}
            ]
        )
        
        return response.content[0].text
    
    async def generate_custom_report(
        self,
        user_query: str,
        context_data: dict
    ) -> str:
        """Генерация кастомного отчёта по запросу пользователя."""
        
        system_prompt = INSIGHT_SYSTEM_PROMPT + """

Пользователь запрашивает кастомный отчёт. 
Используй предоставленные данные для ответа на его вопрос.
Если данных недостаточно, укажи это явно.
"""
        
        user_prompt = f"""
Запрос пользователя: {user_query}

Доступные данные:
{self._format_context(context_data)}

Сформируй ответ на запрос пользователя.
"""
        
        response = self.client.messages.create(
            model=self.model,
            max_tokens=2000,
            system=system_prompt,
            messages=[
                {"role": "user", "content": user_prompt}
            ]
        )
        
        return response.content[0].text
    
    def _format_context(self, data: dict) -> str:
        """Форматирование контекстных данных."""
        
        parts = []
        
        if "pnl_by_sku" in data:
            parts.append("## P&L по SKU\n" + self._format_pnl_table(data["pnl_by_sku"]))
        
        if "pnl_by_category" in data:
            parts.append("## P&L по категориям\n" + self._format_pnl_table(data["pnl_by_category"]))
        
        if "abc_results" in data:
            parts.append("## ABC-анализ\n" + self._format_abc_table(data["abc_results"]))
        
        return "\n\n".join(parts)
    
    def _format_pnl_table(self, pnl_list: List[PnLResult]) -> str:
        """Форматирование P&L в текстовую таблицу."""
        
        lines = ["| SKU | Выручка | Расходы | Прибыль | Маржа |"]
        lines.append("|-----|---------|---------|---------|-------|")
        
        for p in pnl_list[:20]:
            lines.append(
                f"| {p.sku} | {p.net_revenue:,.0f} | {p.total_expenses:,.0f} | "
                f"{p.net_profit:,.0f} | {p.net_margin_pct:.1f}% |"
            )
        
        return "\n".join(lines)
    
    def _format_abc_table(self, abc_list: List[ABCResult]) -> str:
        """Форматирование ABC в текстовую таблицу."""
        
        lines = ["| SKU | Класс | Прибыль | Маржа |"]
        lines.append("|-----|-------|---------|-------|")
        
        for a in abc_list[:20]:
            lines.append(
                f"| {a.sku} | {a.abc_class} | {a.net_profit:,.0f} | {a.net_margin_pct:.1f}% |"
            )
        
        return "\n".join(lines)

3.7 Pipeline Orchestrator

3.7.1 Общий workflow

3.7.2 Реализация оркестратора

class CFOPipelineOrchestrator:
    """Оркестратор CFO Pipeline."""
    
    def __init__(
        self,
        ingestion_service: DataIngestionService,
        cost_mapper: CostMapper,
        pnl_calculator: PnLCalculator,
        abc_analyzer: ABCAnalyzer,
        anomaly_detector: AnomalyDetector,
        insight_generator: InsightGenerator,
        notification_service: NotificationService,
        db_session
    ):
        self.ingestion = ingestion_service
        self.cost_mapper = cost_mapper
        self.pnl_calculator = pnl_calculator
        self.abc_analyzer = abc_analyzer
        self.anomaly_detector = anomaly_detector
        self.insight_generator = insight_generator
        self.notifications = notification_service
        self.db = db_session
    
    async def run_daily_pipeline(self, target_date: date = None):
        """Ежедневный pipeline расчёта P&L."""
        
        target_date = target_date or date.today() - timedelta(days=1)
        
        logger.info(f"Starting daily CFO pipeline for {target_date}")
        
        # 1. Импорт данных
        import_result = await self.ingestion.import_all(
            date_from=target_date,
            date_to=target_date
        )
        logger.info(f"Imported {import_result.saved} transactions")
        
        # 2. Загрузка себестоимости
        await self.cost_mapper.load_cost_prices()
        
        # 3. Загрузка транзакций
        transactions = await self._load_transactions(target_date, target_date)
        
        # 4. Расчёт P&L по SKU
        skus = set(t.sku for t in transactions)
        pnl_results = []
        
        for sku in skus:
            pnl = self.pnl_calculator.calculate_sku_pnl(
                transactions, sku, target_date, target_date
            )
            if pnl:
                pnl_results.append(pnl)
        
        # 5. Сохранение P&L
        await self._save_pnl_results(pnl_results)
        
        # 6. Детекция аномалий
        anomalies = await self.anomaly_detector.detect(pnl_results)
        
        # 7. Отправка алертов
        await self._send_alerts(pnl_results, anomalies)
        
        # 8. Уведомление об окончании
        await self.notifications.send(
            user_ids=await self._get_admin_ids(),
            event_type="cfo.data_imported",
            data={
                "date": target_date.isoformat(),
                "transactions": import_result.saved,
                "skus": len(pnl_results),
                "anomalies": len(anomalies)
            },
            level="info"
        )
        
        logger.info(f"Daily CFO pipeline completed for {target_date}")
    
    async def run_weekly_abc(self):
        """Еженедельный ABC-анализ."""
        
        period_end = date.today() - timedelta(days=1)
        period_start = period_end - timedelta(days=6)
        
        logger.info(f"Starting weekly ABC analysis for {period_start}{period_end}")
        
        # Загрузка P&L за неделю
        pnl_results = await self._load_pnl_results(period_start, period_end)
        
        # ABC-анализ
        abc_results, summary = self.abc_analyzer.analyze(pnl_results)
        
        # Сохранение результатов
        await self._save_abc_results(abc_results, summary)
        
        # Генерация инсайтов
        pnl_summary = self._calculate_pnl_summary(pnl_results)
        top_profitable = sorted(pnl_results, key=lambda x: x.net_profit, reverse=True)[:5]
        top_loss_makers = sorted(pnl_results, key=lambda x: x.net_profit)[:5]
        anomalies = await self.anomaly_detector.detect(pnl_results)
        
        insights = await self.insight_generator.generate_insights(
            pnl_summary, summary, top_profitable, top_loss_makers, anomalies
        )
        
        # Сохранение инсайтов
        await self._save_insights(insights, period_start, period_end)
        
        logger.info("Weekly ABC analysis completed")
    
    async def _send_alerts(
        self,
        pnl_results: List[PnLResult],
        anomalies: List[Anomaly]
    ):
        """Отправка алертов."""
        
        # Алерты об убыточных SKU
        loss_makers = [p for p in pnl_results if p.net_profit < 0]
        
        if loss_makers:
            await self.notifications.send(
                user_ids=await self._get_senior_director_ids(),
                event_type="cfo.sku_negative_margin",
                data={
                    "count": len(loss_makers),
                    "total_loss": sum(p.net_profit for p in loss_makers),
                    "top_5": [
                        {"sku": p.sku, "loss": p.net_profit}
                        for p in sorted(loss_makers, key=lambda x: x.net_profit)[:5]
                    ]
                },
                level="warning"
            )
        
        # Алерты об аномалиях
        critical_anomalies = [a for a in anomalies if a.severity == "critical"]
        
        if critical_anomalies:
            await self.notifications.send(
                user_ids=await self._get_admin_ids(),
                event_type="cfo.anomaly_detected",
                data={
                    "count": len(critical_anomalies),
                    "anomalies": [
                        {"sku": a.sku, "type": a.anomaly_type, "description": a.description}
                        for a in critical_anomalies[:10]
                    ]
                },
                level="warning"
            )

Документ подготовлен: Январь 2026
Версия: 1.0
Статус: Черновик