Раздел 3: AI Pipeline
Проект: ÐŸÑ€ÐµÐ´Ð¸ÐºÑ‚Ð¸Ð²Ð½Ð°Ñ Ð°Ð½Ð°Ð»Ð¸Ñ‚Ð¸ÐºÐ° товарных ниш
Модуль: Scout / AI Pipeline
ВерÑиÑ: 1.0
Дата: Январь 2026
3.1 Обзор AI Pipeline
Ðрхитектура pipeline
РаÑпределение AI-моделей
| Ðтап | Модель | Ðазначение | Tokens/Ð·Ð°Ð¿Ñ€Ð¾Ñ |
|---|---|---|---|
| Trend Mining | GPT-5 mini | ÐормализациÑ, Ð°Ð³Ñ€ÐµÐ³Ð°Ñ†Ð¸Ñ Ñ‚Ñ€ÐµÐ½Ð´Ð¾Ð² | ~500 |
| Competitor Analysis | GPT-5 mini | Ðнализ Ñтруктуры конкурентов | ~800 |
| AI Verdict | Claude Opus 4.5 | Финальный вердикт, рекомендации | ~2000 |
3.2 Input Parser
3.2.1 Ðазначение
Разбор и Ð½Ð¾Ñ€Ð¼Ð°Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ Ð²Ñ…Ð¾Ð´Ð½Ñ‹Ñ… данных от Ð¿Ð¾Ð»ÑŒÐ·Ð¾Ð²Ð°Ñ‚ÐµÐ»Ñ Ð² Ñтруктурированный формат.
3.2.2 Поддерживаемые форматы
3.2.3 РеализациÑ
# services/input_parser.py
import re
from typing import Optional, List, Tuple
from dataclasses import dataclass, field
from urllib.parse import urlparse, parse_qs
@dataclass
class ParsedInput:
"""Разобранный ввод пользователÑ."""
marketplaces: List[str]
query: str
category_url: Optional[str] = None
cogs: float = 0.0
cogs_min: Optional[float] = None
cogs_max: Optional[float] = None
raw_input: str = ""
parse_confidence: float = 1.0
class InputParser:
"""ПарÑер входных данных пользователÑ."""
# Паттерны URL
URL_PATTERNS = {
"wildberries": {
"category": r"wildberries\.ru/catalog/([^/\?]+(?:/[^/\?]+)*)",
"search": r"wildberries\.ru/catalog/0/search\.aspx\?.*search=([^&]+)",
"product": r"wildberries\.ru/catalog/(\d+)/detail"
},
"ozon": {
"category": r"ozon\.ru/category/([^/\?]+)-(\d+)",
"search": r"ozon\.ru/search/?\?.*text=([^&]+)",
"product": r"ozon\.ru/product/[^/]+-(\d+)"
},
"yandex_market": {
"category": r"market\.yandex\.ru/catalog--([^/]+)/(\d+)",
"search": r"market\.yandex\.ru/search\?.*text=([^&]+)",
"product": r"market\.yandex\.ru/product/(\d+)"
}
}
# Паттерны COGS
COGS_PATTERNS = [
r"(?:cogs|ÑебеÑтоимоÑть|закупк[аи]|закупочн\w+\s*цен\w*)[:\s]+(\d+(?:\.\d+)?)",
r"(\d+(?:\.\d+)?)\s*(?:руб|₽|рублей)",
r"(?:от\s+)?(\d+)\s*(?:до\s+(\d+))?(?:\s*руб|\s*₽)?",
]
# МаркетплейÑÑ‹ в текÑте
MP_KEYWORDS = {
"wildberries": ["wildberries", "вайлдберриз", "wb", "вб"],
"ozon": ["ozon", "озон"],
"yandex_market": ["ÑндекÑ.маркет", "ÑÐ½Ð´ÐµÐºÑ Ð¼Ð°Ñ€ÐºÐµÑ‚", "yandex market", "ym", "Ñм"]
}
def parse(self, user_input: str) -> ParsedInput:
"""
ПарÑинг входных данных.
Args:
user_input: Строка от пользователÑ
Returns:
ParsedInput Ñ Ñ€Ð°Ð·Ð¾Ð±Ñ€Ð°Ð½Ð½Ñ‹Ð¼Ð¸ данными
"""
raw_input = user_input.strip()
# Попытка найти URL
url_result = self._parse_url(raw_input)
# Извлечение COGS
cogs, cogs_min, cogs_max = self._extract_cogs(raw_input)
if url_result:
# URL найден
marketplace, query, category_url = url_result
return ParsedInput(
marketplaces=[marketplace],
query=query,
category_url=category_url,
cogs=cogs,
cogs_min=cogs_min,
cogs_max=cogs_max,
raw_input=raw_input,
parse_confidence=0.95
)
else:
# ТекÑтовый запроÑ
marketplaces = self._detect_marketplaces(raw_input)
query = self._extract_query(raw_input)
return ParsedInput(
marketplaces=marketplaces if marketplaces else ["wildberries", "ozon", "yandex_market"],
query=query,
category_url=None,
cogs=cogs,
cogs_min=cogs_min,
cogs_max=cogs_max,
raw_input=raw_input,
parse_confidence=0.8 if marketplaces else 0.6
)
def _parse_url(self, text: str) -> Optional[Tuple[str, str, str]]:
"""
ПарÑинг URL маркетплейÑа.
Returns:
(marketplace, query, full_url) или None
"""
# ПоиÑк URL в текÑте
url_match = re.search(r"https?://[^\s]+", text)
if not url_match:
return None
url = url_match.group(0)
for marketplace, patterns in self.URL_PATTERNS.items():
# Проверка категории
cat_match = re.search(patterns["category"], url)
if cat_match:
query = cat_match.group(1).replace("-", " ").replace("/", " > ")
return (marketplace, query, url)
# Проверка поиÑка
search_match = re.search(patterns["search"], url)
if search_match:
from urllib.parse import unquote
query = unquote(search_match.group(1))
return (marketplace, query, url)
return None
def _extract_cogs(self, text: str) -> Tuple[float, Optional[float], Optional[float]]:
"""
Извлечение закупочной цены.
Returns:
(cogs, cogs_min, cogs_max)
"""
text_lower = text.lower()
# Паттерн диапазона "от X до Y"
range_match = re.search(r"от\s+(\d+)\s*(?:до\s+(\d+))?", text_lower)
if range_match:
cogs_min = float(range_match.group(1))
cogs_max = float(range_match.group(2)) if range_match.group(2) else None
cogs = (cogs_min + cogs_max) / 2 if cogs_max else cogs_min
return (cogs, cogs_min, cogs_max)
# ПроÑтое чиÑло Ñ ÐºÐ¾Ð½Ñ‚ÐµÐºÑтом
for pattern in self.COGS_PATTERNS:
match = re.search(pattern, text_lower)
if match:
cogs = float(match.group(1))
return (cogs, None, None)
# ПроÑто чиÑло в конце
number_match = re.search(r"(\d+(?:\.\d+)?)\s*(?:руб|₽|$)", text_lower)
if number_match:
return (float(number_match.group(1)), None, None)
return (0.0, None, None)
def _detect_marketplaces(self, text: str) -> List[str]:
"""Определение маркетплейÑов в текÑте."""
text_lower = text.lower()
detected = []
for mp, keywords in self.MP_KEYWORDS.items():
for keyword in keywords:
if keyword in text_lower:
detected.append(mp)
break
return detected
def _extract_query(self, text: str) -> str:
"""Извлечение поиÑкового запроÑа из текÑта."""
# Удаление URL
text = re.sub(r"https?://[^\s]+", "", text)
# Удаление COGS
text = re.sub(r"(?:cogs|ÑебеÑтоимоÑть|закупк\w+)[:\s]+\d+[^\s]*", "", text, flags=re.I)
text = re.sub(r"\d+\s*(?:руб|₽|рублей)", "", text)
text = re.sub(r"от\s+\d+\s*(?:до\s+\d+)?", "", text)
# Удаление названий маркетплейÑов
for keywords in self.MP_KEYWORDS.values():
for kw in keywords:
text = re.sub(rf"\b{kw}\b", "", text, flags=re.I)
# Удаление Ñлужебных Ñлов
stop_words = [
"проанализируй", "анализ", "оцени", "оценка", "ниш[ауе]",
"категори[Ñию]", "товар\w*", "на", "длÑ", "по"
]
for word in stop_words:
text = re.sub(rf"\b{word}\b", "", text, flags=re.I)
# ОчиÑтка
text = re.sub(r"\s+", " ", text).strip()
text = re.sub(r"^[,\s]+|[,\s]+$", "", text)
return text3.2.4 Примеры парÑинга
| Ввод | Результат |
|---|---|
https://www.wildberries.ru/catalog/zhenshchinam/odezhda/platya, 500₽ | marketplace=wildberries, query="zhenshchinam odezhda platya", cogs=500 |
летние Ð¿Ð»Ð°Ñ‚ÑŒÑ Ð½Ð° вб, закупка 450 рублей | marketplace=wildberries, query="летние платьÑ", cogs=450 |
Оцени нишу детÑких комбинезонов, от 800 до 1200 | marketplaces=[all], query="детÑких комбинезонов", cogs=1000, cogs_min=800, cogs_max=1200 |
3.3 Trend Miner
3.3.1 Ðазначение
Сбор, Ð½Ð¾Ñ€Ð¼Ð°Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ Ð¸ анализ данных о динамике ÑпроÑа Ñ Ð¸Ñпользованием GPT-5 mini Ð´Ð»Ñ Ð¾Ð±Ñ€Ð°Ð±Ð¾Ñ‚ÐºÐ¸.
3.3.2 Ðлгоритм работы
3.3.3 Структуры данных
# services/trend_miner.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, date
from enum import Enum
class TrendStatus(Enum):
GREEN = "green" # > 0.15 — роÑÑ‚
YELLOW = "yellow" # 0 to 0.15 — Ñтабильно
RED = "red" # < 0 — падение
@dataclass
class MonthlyVolume:
"""МеÑÑчный объём запроÑов."""
month: date
volume: int
source: str
@dataclass
class TrendResult:
"""Результат анализа трендов."""
query: str
period_months: int
# ОÑновные метрики
trend_slope: float
trend_status: TrendStatus
confidence: float
# Объёмы
total_volume: int
avg_monthly_volume: int
peak_volume: int
min_volume: int
# Временной Ñ€Ñд
monthly_data: List[MonthlyVolume]
# СезонноÑть
seasonality_detected: bool
seasonality_peak_month: Optional[int]
# СвÑзанные запроÑÑ‹
related_queries: List[Dict[str, any]]
# Метаданные
sources_used: List[str]
sources_failed: List[str]
collected_at: datetime
@property
def is_growing(self) -> bool:
return self.trend_slope > 0.15
@property
def is_declining(self) -> bool:
return self.trend_slope < 03.3.4 Промпт Ð´Ð»Ñ Ð½Ð¾Ñ€Ð¼Ð°Ð»Ð¸Ð·Ð°Ñ†Ð¸Ð¸ (GPT-5 mini)
# prompts/trend_prompts.py
TREND_NORMALIZATION_SYSTEM = """
Ты — аналитик данных, ÑпециализирующийÑÑ Ð½Ð° e-commerce трендах.
Ð¢Ð²Ð¾Ñ Ð·Ð°Ð´Ð°Ñ‡Ð° — нормализовать и агрегировать данные о поиÑковых трендах из разных иÑточников.
Правила:
1. Приводи вÑе данные к единой шкале (0-100)
2. Учитывай разницу в абÑолютных значениÑÑ… между иÑточниками
3. Ð’Ñ‹ÑвлÑй аномалии и выброÑÑ‹
4. ОпределÑй Ñезонные паттерны
5. Отвечай Ñтрого в JSON формате
"""
TREND_NORMALIZATION_USER = """
Данные о трендах Ð´Ð»Ñ Ð·Ð°Ð¿Ñ€Ð¾Ñа "{query}":
Wordstat (ЯндекÑ):
{wordstat_data}
Ozon Analytics:
{ozon_data}
WB Analytics:
{wb_data}
Внешние ÑервиÑÑ‹:
{external_data}
Проанализируй данные и верни JSON:
{{
"normalized_trend_slope": <float от -1 до 1>,
"confidence": <float от 0 до 1>,
"monthly_trend": [
{{"month": "YYYY-MM", "normalized_value": <0-100>}},
...
],
"seasonality": {{
"detected": <bool>,
"peak_month": <1-12 или null>,
"pattern": "<опиÑание>"
}},
"data_quality": {{
"sources_agreement": <float 0-1>,
"outliers_detected": <int>,
"notes": "<заметки>"
}}
}}
"""3.3.5 Ð ÐµÐ°Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ Trend Miner
# services/trend_miner.py
import asyncio
import numpy as np
from typing import Dict, List, Optional
from datetime import datetime, timedelta
class TrendMiner:
"""Ðнализатор трендов ÑпроÑа."""
def __init__(
self,
data_aggregator,
ai_client,
cache
):
self.aggregator = data_aggregator
self.ai = ai_client
self.cache = cache
async def analyze(
self,
parsed_input: ParsedInput
) -> TrendResult:
"""
Полный анализ трендов Ð´Ð»Ñ Ð·Ð°Ð¿Ñ€Ð¾Ñа.
Args:
parsed_input: Разобранный ввод пользователÑ
Returns:
TrendResult Ñ Ð¼ÐµÑ‚Ñ€Ð¸ÐºÐ°Ð¼Ð¸
"""
query = parsed_input.query
# Проверка кÑша
cached = await self.cache.get_trend(query)
if cached:
return self._deserialize_trend(cached)
# Сбор данных из иÑточников
raw_data = await self.aggregator.collect_trend_data(
query=query,
marketplaces=parsed_input.marketplaces,
timeout=30
)
# ÐÐ¾Ñ€Ð¼Ð°Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ Ñ‡ÐµÑ€ÐµÐ· AI
normalized = await self._normalize_with_ai(query, raw_data)
# РаÑчёт метрик
result = self._calculate_metrics(query, raw_data, normalized)
# КÑширование
await self.cache.set_trend(query, self._serialize_trend(result))
return result
async def _normalize_with_ai(
self,
query: str,
raw_data: Dict
) -> Dict:
"""ÐÐ¾Ñ€Ð¼Ð°Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ Ð´Ð°Ð½Ð½Ñ‹Ñ… через GPT-5 mini."""
from prompts.trend_prompts import (
TREND_NORMALIZATION_SYSTEM,
TREND_NORMALIZATION_USER
)
prompt = TREND_NORMALIZATION_USER.format(
query=query,
wordstat_data=self._format_source_data(raw_data.get("wordstat", {})),
ozon_data=self._format_source_data(raw_data.get("ozon_analytics", {})),
wb_data=self._format_source_data(raw_data.get("wb_analytics", {})),
external_data=self._format_source_data(raw_data.get("external", {}))
)
response = await self.ai.complete(
model="gpt-5-mini",
system=TREND_NORMALIZATION_SYSTEM,
user=prompt,
response_format="json"
)
return response
def _calculate_metrics(
self,
query: str,
raw_data: Dict,
normalized: Dict
) -> TrendResult:
"""РаÑчёт финальных метрик."""
# Trend Slope из нормализованных данных
trend_slope = normalized.get("normalized_trend_slope", 0)
# Определение ÑтатуÑа
if trend_slope > 0.15:
trend_status = TrendStatus.GREEN
elif trend_slope >= 0:
trend_status = TrendStatus.YELLOW
else:
trend_status = TrendStatus.RED
# Сбор monthly_data
monthly_data = []
for item in normalized.get("monthly_trend", []):
monthly_data.append(MonthlyVolume(
month=datetime.strptime(item["month"], "%Y-%m").date(),
volume=item["normalized_value"],
source="aggregated"
))
# Объёмы из Wordstat
wordstat = raw_data.get("wordstat", {})
total_volume = wordstat.get("total_shows", 0)
volumes = [m.volume for m in monthly_data] if monthly_data else [0]
# СезонноÑть
seasonality = normalized.get("seasonality", {})
# ИÑточники
sources_used = [k for k, v in raw_data.items() if v and "error" not in v]
sources_failed = [k for k, v in raw_data.items() if v and "error" in v]
return TrendResult(
query=query,
period_months=3,
trend_slope=round(trend_slope, 4),
trend_status=trend_status,
confidence=normalized.get("confidence", 0.5),
total_volume=total_volume,
avg_monthly_volume=int(np.mean(volumes)) if volumes else 0,
peak_volume=max(volumes) if volumes else 0,
min_volume=min(volumes) if volumes else 0,
monthly_data=monthly_data,
seasonality_detected=seasonality.get("detected", False),
seasonality_peak_month=seasonality.get("peak_month"),
related_queries=wordstat.get("related_phrases", [])[:10],
sources_used=sources_used,
sources_failed=sources_failed,
collected_at=datetime.utcnow()
)
def _format_source_data(self, data: Dict) -> str:
"""Форматирование данных иÑточника Ð´Ð»Ñ Ð¿Ñ€Ð¾Ð¼Ð¿Ñ‚Ð°."""
if not data or "error" in data:
return "Данные недоÑтупны"
import json
return json.dumps(data, ensure_ascii=False, indent=2)
def _calculate_trend_slope_simple(
self,
monthly_values: List[int]
) -> float:
"""
ПроÑтой раÑчёт trend slope без AI.
ИÑпользуетÑÑ ÐºÐ°Ðº fallback.
"""
if len(monthly_values) < 2:
return 0.0
x = np.arange(len(monthly_values))
y = np.array(monthly_values)
# Ð›Ð¸Ð½ÐµÐ¹Ð½Ð°Ñ Ñ€ÐµÐ³Ñ€ÐµÑÑиÑ
slope, _ = np.polyfit(x, y, 1)
# ÐормализациÑ
avg_y = np.mean(y)
if avg_y == 0:
return 0.0
normalized = slope / avg_y
return max(-1.0, min(1.0, normalized))3.4 Competitor Analyzer
3.4.1 Ðазначение
Ðнализ конкурентной Ñреды в категории: раÑчёт Monopoly Rate, анализ ценовой Ñтруктуры, оценка барьеров входа.
3.4.2 Ðлгоритм работы
3.4.3 Структуры данных
# services/competitor_analyzer.py
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
from enum import Enum
class MonopolyStatus(Enum):
GREEN = "green" # < 50%
YELLOW = "yellow" # 50-70%
RED = "red" # > 70%
class CompetitionLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class EntryBarrier(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
@dataclass
class SellerStats:
"""СтатиÑтика продавца."""
name: str
products_count: int
share: float
avg_position: float
avg_price: float
avg_rating: float
total_reviews: int
@dataclass
class PriceAnalysis:
"""Ðнализ цен в категории."""
avg: float
median: float
min: float
max: float
std: float
percentile_25: float
percentile_75: float
price_segments: Dict[str, int] # {"budget": 10, "medium": 25, "premium": 15}
@dataclass
class QualityAnalysis:
"""Ðнализ качеÑтва в категории."""
avg_rating: float
median_rating: float
avg_reviews_count: int
products_above_4_5: int
products_with_photos: int
products_with_video: int
avg_photos_count: float
@dataclass
class CompetitorResult:
"""Результат анализа конкурентов."""
marketplace: str
category: str
query: str
# Monopoly
monopoly_rate: float
monopoly_status: MonopolyStatus
top_sellers: List[SellerStats]
herfindahl_index: float
# Цены
price_analysis: PriceAnalysis
# КачеÑтво
quality_analysis: QualityAnalysis
# КонкуренциÑ
competition_level: CompetitionLevel
unique_sellers_count: int
# Барьеры входа
entry_barrier: EntryBarrier
entry_barrier_score: float # 0-1
barrier_factors: List[str]
# Метаданные
products_analyzed: int
analyzed_at: datetime3.4.4 РаÑчёт Monopoly Rate
def calculate_monopoly_metrics(
sellers: List[SellerStats]
) -> Dict:
"""
РаÑчёт метрик монополизации.
Monopoly Rate = Ð´Ð¾Ð»Ñ Ð¢ÐžÐŸ-3 продавцов
Herfindahl Index = Ñумма квадратов долей вÑех продавцов
"""
if not sellers:
return {
"monopoly_rate": 0,
"herfindahl_index": 0,
"status": MonopolyStatus.GREEN
}
# Сортировка по доле
sorted_sellers = sorted(sellers, key=lambda x: x.share, reverse=True)
# Monopoly Rate (ТОП-3)
top_3_share = sum(s.share for s in sorted_sellers[:3])
# Herfindahl Index
hhi = sum(s.share ** 2 for s in sorted_sellers)
# СтатуÑ
if top_3_share < 0.5:
status = MonopolyStatus.GREEN
elif top_3_share < 0.7:
status = MonopolyStatus.YELLOW
else:
status = MonopolyStatus.RED
return {
"monopoly_rate": round(top_3_share, 4),
"herfindahl_index": round(hhi, 4),
"status": status
}3.4.5 Оценка барьеров входа
def calculate_entry_barrier(
monopoly_rate: float,
avg_rating: float,
avg_reviews: int,
price_std: float,
avg_price: float
) -> Dict:
"""
Оценка барьеров входа в нишу.
Факторы:
- Ð’Ñ‹ÑÐ¾ÐºÐ°Ñ Ð¼Ð¾Ð½Ð¾Ð¿Ð¾Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ
- Ð’Ñ‹Ñокие Ñ‚Ñ€ÐµÐ±Ð¾Ð²Ð°Ð½Ð¸Ñ Ðº рейтингу
- Много отзывов у конкурентов
- Ðизкий Ñ€Ð°Ð·Ð±Ñ€Ð¾Ñ Ñ†ÐµÐ½ (Ñ†ÐµÐ½Ð¾Ð²Ð°Ñ Ð²Ð¾Ð¹Ð½Ð°)
"""
score = 0.0
factors = []
# ÐœÐ¾Ð½Ð¾Ð¿Ð¾Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ (Ð²ÐµÑ 0.3)
if monopoly_rate > 0.7:
score += 0.3
factors.append("Ð’Ñ‹ÑÐ¾ÐºÐ°Ñ Ð¼Ð¾Ð½Ð¾Ð¿Ð¾Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ Ñ€Ñ‹Ð½ÐºÐ° (ТОП-3 > 70%)")
elif monopoly_rate > 0.5:
score += 0.15
factors.append("Ð£Ð¼ÐµÑ€ÐµÐ½Ð½Ð°Ñ Ð¼Ð¾Ð½Ð¾Ð¿Ð¾Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ (ТОП-3 > 50%)")
# Ð¢Ñ€ÐµÐ±Ð¾Ð²Ð°Ð½Ð¸Ñ Ðº рейтингу (Ð²ÐµÑ 0.25)
if avg_rating > 4.7:
score += 0.25
factors.append("Ð’Ñ‹Ñокие Ñ‚Ñ€ÐµÐ±Ð¾Ð²Ð°Ð½Ð¸Ñ Ðº рейтингу (>4.7)")
elif avg_rating > 4.5:
score += 0.12
factors.append("Средние Ñ‚Ñ€ÐµÐ±Ð¾Ð²Ð°Ð½Ð¸Ñ Ðº рейтингу (>4.5)")
# КоличеÑтво отзывов (Ð²ÐµÑ 0.25)
if avg_reviews > 500:
score += 0.25
factors.append("Много отзывов у конкурентов (>500)")
elif avg_reviews > 200:
score += 0.12
factors.append("Среднее количеÑтво отзывов (>200)")
# Ð¦ÐµÐ½Ð¾Ð²Ð°Ñ ÐºÐ¾Ð½ÐºÑƒÑ€ÐµÐ½Ñ†Ð¸Ñ (Ð²ÐµÑ 0.2)
cv = price_std / avg_price if avg_price > 0 else 0 # КоÑффициент вариации
if cv < 0.2:
score += 0.2
factors.append("Ðизкий Ñ€Ð°Ð·Ð±Ñ€Ð¾Ñ Ñ†ÐµÐ½ — Ñ†ÐµÐ½Ð¾Ð²Ð°Ñ Ð²Ð¾Ð¹Ð½Ð°")
elif cv < 0.3:
score += 0.1
factors.append("Умеренный Ñ€Ð°Ð·Ð±Ñ€Ð¾Ñ Ñ†ÐµÐ½")
# Определение уровнÑ
if score > 0.6:
barrier = EntryBarrier.HIGH
elif score > 0.3:
barrier = EntryBarrier.MEDIUM
else:
barrier = EntryBarrier.LOW
return {
"barrier": barrier,
"score": round(score, 2),
"factors": factors
}3.4.6 Промпт Ð´Ð»Ñ Ð°Ð½Ð°Ð»Ð¸Ð·Ð° (GPT-5 mini)
# prompts/competitor_prompts.py
COMPETITOR_ANALYSIS_SYSTEM = """
Ты — аналитик e-commerce, ÑпециализирующийÑÑ Ð½Ð° конкурентном анализе.
Ð¢Ð²Ð¾Ñ Ð·Ð°Ð´Ð°Ñ‡Ð° — проанализировать данные о конкурентах в категории и выÑвить паттерны.
Правила:
1. Группируй продавцов и раÑÑчитывай их доли
2. Ðнализируй ценовые Ñегменты
3. Оценивай качеÑтво товаров по рейтингам
4. Ð’Ñ‹ÑвлÑй лидеров и их Ñтратегии
5. Отвечай Ñтрого в JSON формате
"""
COMPETITOR_ANALYSIS_USER = """
Данные ТОП-50 товаров в категории "{category}" на {marketplace}:
{products_data}
Проанализируй и верни JSON:
{{
"sellers_analysis": [
{{
"name": "<название продавца>",
"products_count": <int>,
"estimated_share": <float 0-1>,
"avg_position": <float>,
"price_strategy": "<budget/medium/premium>",
"strengths": ["<ÑÐ¸Ð»ÑŒÐ½Ð°Ñ Ñторона>", ...]
}},
...
],
"price_segments": {{
"budget": {{"count": <int>, "price_range": "<от-до>"}},
"medium": {{"count": <int>, "price_range": "<от-до>"}},
"premium": {{"count": <int>, "price_range": "<от-до>"}}
}},
"market_insights": {{
"dominant_strategy": "<опиÑание>",
"gap_opportunities": ["<возможноÑть>", ...],
"risks": ["<риÑк>", ...]
}}
}}
"""3.4.7 Ð ÐµÐ°Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ Competitor Analyzer
# services/competitor_analyzer.py
from typing import Dict, List
from datetime import datetime
class CompetitorAnalyzer:
"""Ðнализатор конкурентной Ñреды."""
def __init__(
self,
watcher_client,
ai_client
):
self.watcher = watcher_client
self.ai = ai_client
async def analyze(
self,
parsed_input: ParsedInput
) -> Dict[str, CompetitorResult]:
"""
Ðнализ конкурентов по вÑем маркетплейÑам.
Returns:
{marketplace: CompetitorResult}
"""
results = {}
for mp in parsed_input.marketplaces:
result = await self._analyze_marketplace(
marketplace=mp,
query=parsed_input.query,
category_url=parsed_input.category_url
)
results[mp] = result
return results
async def _analyze_marketplace(
self,
marketplace: str,
query: str,
category_url: str = None
) -> CompetitorResult:
"""Ðнализ одного маркетплейÑа."""
# Получение данных из Watcher
watcher_data = await self.watcher.get_category_analysis(
marketplace=marketplace,
category_url=category_url,
search_query=query if not category_url else None,
limit=50
)
data = watcher_data.get("data", {})
# ПарÑинг продавцов
sellers = [
SellerStats(
name=s["name"],
products_count=s["products_in_top"],
share=s["share"],
avg_position=s["avg_position"],
avg_price=s["avg_price"],
avg_rating=s["avg_rating"],
total_reviews=s.get("total_reviews", 0)
)
for s in data.get("sellers", [])
]
# РаÑчёт monopoly
monopoly = calculate_monopoly_metrics(sellers)
# Ðнализ цен
price_stats = data.get("price_stats", {})
price_analysis = PriceAnalysis(
avg=price_stats.get("avg", 0),
median=price_stats.get("median", 0),
min=price_stats.get("min", 0),
max=price_stats.get("max", 0),
std=price_stats.get("std", 0),
percentile_25=price_stats.get("percentile_25", 0),
percentile_75=price_stats.get("percentile_75", 0),
price_segments=await self._analyze_price_segments(data)
)
# Ðнализ качеÑтва
quality_stats = data.get("quality_stats", {})
quality_analysis = QualityAnalysis(
avg_rating=quality_stats.get("avg_rating", 0),
median_rating=quality_stats.get("median_rating", 0),
avg_reviews_count=quality_stats.get("avg_reviews_count", 0),
products_above_4_5=quality_stats.get("products_rating_above_4_5", 0),
products_with_photos=quality_stats.get("products_with_photos", 0),
products_with_video=quality_stats.get("products_with_video", 0),
avg_photos_count=quality_stats.get("avg_photos_count", 0)
)
# Барьеры входа
barrier = calculate_entry_barrier(
monopoly_rate=monopoly["monopoly_rate"],
avg_rating=quality_analysis.avg_rating,
avg_reviews=quality_analysis.avg_reviews_count,
price_std=price_analysis.std,
avg_price=price_analysis.avg
)
# Уровень конкуренции
competition = self._determine_competition_level(
unique_sellers=data.get("competition_metrics", {}).get("unique_sellers", 0),
monopoly_rate=monopoly["monopoly_rate"]
)
return CompetitorResult(
marketplace=marketplace,
category=data.get("category", query),
query=query,
monopoly_rate=monopoly["monopoly_rate"],
monopoly_status=monopoly["status"],
top_sellers=sellers[:10],
herfindahl_index=monopoly["herfindahl_index"],
price_analysis=price_analysis,
quality_analysis=quality_analysis,
competition_level=competition,
unique_sellers_count=data.get("competition_metrics", {}).get("unique_sellers", 0),
entry_barrier=barrier["barrier"],
entry_barrier_score=barrier["score"],
barrier_factors=barrier["factors"],
products_analyzed=data.get("products_count", 0),
analyzed_at=datetime.utcnow()
)
async def _analyze_price_segments(self, data: Dict) -> Dict[str, int]:
"""Ðнализ ценовых Ñегментов через AI."""
# Ð£Ð¿Ñ€Ð¾Ñ‰Ñ‘Ð½Ð½Ð°Ñ Ð²ÐµÑ€ÑÐ¸Ñ Ð±ÐµÐ· AI
products = data.get("products", [])
if not products:
return {"budget": 0, "medium": 0, "premium": 0}
prices = [p.get("price", 0) for p in products if p.get("price")]
if not prices:
return {"budget": 0, "medium": 0, "premium": 0}
import numpy as np
p33 = np.percentile(prices, 33)
p66 = np.percentile(prices, 66)
return {
"budget": len([p for p in prices if p <= p33]),
"medium": len([p for p in prices if p33 < p <= p66]),
"premium": len([p for p in prices if p > p66])
}
def _determine_competition_level(
self,
unique_sellers: int,
monopoly_rate: float
) -> CompetitionLevel:
"""Определение ÑƒÑ€Ð¾Ð²Ð½Ñ ÐºÐ¾Ð½ÐºÑƒÑ€ÐµÐ½Ñ†Ð¸Ð¸."""
if unique_sellers > 20 and monopoly_rate < 0.5:
return CompetitionLevel.HIGH
elif unique_sellers > 10 or monopoly_rate < 0.7:
return CompetitionLevel.MEDIUM
else:
return CompetitionLevel.LOW3.5 Unit Calculator
3.5.1 Ðазначение
РаÑчёт unit-Ñкономики: прибыльноÑть, маржинальноÑть, точка безубыточноÑти.
3.5.2 Формулы раÑчёта
Ð’Ð°Ð»Ð¾Ð²Ð°Ñ Ð¿Ñ€Ð¸Ð±Ñ‹Ð»ÑŒ = Цена продажи - СебеÑтоимоÑть
РаÑходы МП = КомиÑÑÐ¸Ñ + ЛогиÑтика + Возвраты + Хранение + Ðквайринг
ЧиÑÑ‚Ð°Ñ Ð¿Ñ€Ð¸Ð±Ñ‹Ð»ÑŒ = Цена продажи - СебеÑтоимоÑть - РаÑходы МП
Маржа % = ЧиÑÑ‚Ð°Ñ Ð¿Ñ€Ð¸Ð±Ñ‹Ð»ÑŒ / Цена продажи × 100
Точка безубыточноÑти = СебеÑтоимоÑть / (1 - Overhead%)
Цена Ð´Ð»Ñ Ð¼Ð°Ñ€Ð¶Ð¸ X% = СебеÑтоимоÑть / (1 - Overhead% - X%)3.5.3 Структуры данных
# services/unit_calculator.py
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class MarginStatus(Enum):
GREEN = "green" # > 25%
YELLOW = "yellow" # 15-25%
RED = "red" # < 15%
@dataclass
class MarketplaceRates:
"""Ставки раÑходов маркетплейÑа."""
marketplace: str
category: str
commission_pct: float
logistics_pct: float
return_logistics_pct: float
storage_pct: float
acquiring_pct: float
@property
def total_overhead_pct(self) -> float:
return (
self.commission_pct +
self.logistics_pct +
self.return_logistics_pct +
self.storage_pct +
self.acquiring_pct
)
@dataclass
class UnitEconomics:
"""Результат раÑчёта unit-Ñкономики."""
marketplace: str
# Входные данные
selling_price: float
cogs: float
# РаÑходы в рублÑÑ…
commission: float
logistics: float
return_logistics: float
storage: float
acquiring: float
total_expenses: float
# РаÑходы в процентах
commission_pct: float
logistics_pct: float
return_logistics_pct: float
storage_pct: float
acquiring_pct: float
total_overhead_pct: float
# Результаты
gross_profit: float
net_profit: float
gross_margin_pct: float
net_margin_pct: float
margin_status: MarginStatus
# Дополнительные раÑчёты
break_even_price: float
target_price_25: float
target_price_30: float
cogs_for_25_margin: float # При текущей цене
# Ставки по умолчанию
DEFAULT_RATES = {
"wildberries": MarketplaceRates(
marketplace="wildberries",
category="default",
commission_pct=15.0,
logistics_pct=5.0,
return_logistics_pct=3.0,
storage_pct=1.0,
acquiring_pct=0.0
),
"ozon": MarketplaceRates(
marketplace="ozon",
category="default",
commission_pct=18.0,
logistics_pct=6.0,
return_logistics_pct=4.0,
storage_pct=1.5,
acquiring_pct=0.0
),
"yandex_market": MarketplaceRates(
marketplace="yandex_market",
category="default",
commission_pct=15.0,
logistics_pct=7.0,
return_logistics_pct=4.0,
storage_pct=1.0,
acquiring_pct=1.5
)
}3.5.4 Ð ÐµÐ°Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ Unit Calculator
# services/unit_calculator.py
from typing import Dict, Optional
from dataclasses import asdict
class UnitCalculator:
"""КалькулÑтор unit-Ñкономики."""
def __init__(self, db_session):
self.db = db_session
async def calculate(
self,
marketplace: str,
selling_price: float,
cogs: float,
category: str = "default"
) -> UnitEconomics:
"""
РаÑчёт unit-Ñкономики.
Args:
marketplace: Код маркетплейÑа
selling_price: Цена продажи
cogs: СебеÑтоимоÑть
category: ÐšÐ°Ñ‚ÐµÐ³Ð¾Ñ€Ð¸Ñ Ñ‚Ð¾Ð²Ð°Ñ€Ð° (Ð´Ð»Ñ Ñпецифичных Ñтавок)
"""
# Получение Ñтавок
rates = await self._get_rates(marketplace, category)
# РаÑчёт раÑходов в рублÑÑ…
commission = selling_price * rates.commission_pct / 100
logistics = selling_price * rates.logistics_pct / 100
return_logistics = selling_price * rates.return_logistics_pct / 100
storage = selling_price * rates.storage_pct / 100
acquiring = selling_price * rates.acquiring_pct / 100
total_expenses = commission + logistics + return_logistics + storage + acquiring
# Прибыль
gross_profit = selling_price - cogs
net_profit = selling_price - cogs - total_expenses
# Маржа
if selling_price > 0:
gross_margin_pct = (gross_profit / selling_price) * 100
net_margin_pct = (net_profit / selling_price) * 100
else:
gross_margin_pct = 0
net_margin_pct = 0
# Ð¡Ñ‚Ð°Ñ‚ÑƒÑ Ð¼Ð°Ñ€Ð¶Ð¸
if net_margin_pct > 25:
margin_status = MarginStatus.GREEN
elif net_margin_pct >= 15:
margin_status = MarginStatus.YELLOW
else:
margin_status = MarginStatus.RED
# Дополнительные раÑчёты
overhead_rate = rates.total_overhead_pct / 100
# Точка безубыточноÑти
if overhead_rate < 1:
break_even_price = cogs / (1 - overhead_rate)
else:
break_even_price = float('inf')
# Цена Ð´Ð»Ñ Ð¼Ð°Ñ€Ð¶Ð¸ 25%
target_25_rate = 1 - overhead_rate - 0.25
if target_25_rate > 0:
target_price_25 = cogs / target_25_rate
else:
target_price_25 = float('inf')
# Цена Ð´Ð»Ñ Ð¼Ð°Ñ€Ð¶Ð¸ 30%
target_30_rate = 1 - overhead_rate - 0.30
if target_30_rate > 0:
target_price_30 = cogs / target_30_rate
else:
target_price_30 = float('inf')
# COGS Ð´Ð»Ñ Ð¼Ð°Ñ€Ð¶Ð¸ 25% при текущей цене
# net_margin = (price - cogs - overhead) / price = 0.25
# price - cogs - price * overhead_rate = 0.25 * price
# cogs = price * (1 - overhead_rate - 0.25)
cogs_for_25_margin = selling_price * (1 - overhead_rate - 0.25)
return UnitEconomics(
marketplace=marketplace,
selling_price=round(selling_price, 2),
cogs=round(cogs, 2),
commission=round(commission, 2),
logistics=round(logistics, 2),
return_logistics=round(return_logistics, 2),
storage=round(storage, 2),
acquiring=round(acquiring, 2),
total_expenses=round(total_expenses, 2),
commission_pct=rates.commission_pct,
logistics_pct=rates.logistics_pct,
return_logistics_pct=rates.return_logistics_pct,
storage_pct=rates.storage_pct,
acquiring_pct=rates.acquiring_pct,
total_overhead_pct=rates.total_overhead_pct,
gross_profit=round(gross_profit, 2),
net_profit=round(net_profit, 2),
gross_margin_pct=round(gross_margin_pct, 2),
net_margin_pct=round(net_margin_pct, 2),
margin_status=margin_status,
break_even_price=round(break_even_price, 2),
target_price_25=round(target_price_25, 2),
target_price_30=round(target_price_30, 2),
cogs_for_25_margin=round(max(0, cogs_for_25_margin), 2)
)
async def calculate_range(
self,
marketplace: str,
selling_price: float,
cogs_min: float,
cogs_max: float,
category: str = "default"
) -> Dict[str, UnitEconomics]:
"""
РаÑчёт Ð´Ð»Ñ Ð´Ð¸Ð°Ð¿Ð°Ð·Ð¾Ð½Ð° COGS.
Returns:
{"min": UnitEconomics, "avg": UnitEconomics, "max": UnitEconomics}
"""
cogs_avg = (cogs_min + cogs_max) / 2
return {
"min": await self.calculate(marketplace, selling_price, cogs_min, category),
"avg": await self.calculate(marketplace, selling_price, cogs_avg, category),
"max": await self.calculate(marketplace, selling_price, cogs_max, category)
}
async def _get_rates(
self,
marketplace: str,
category: str
) -> MarketplaceRates:
"""Получение Ñтавок из БД или default."""
# Попытка получить из БД
query = """
SELECT * FROM scout_marketplace_rates
WHERE marketplace = $1 AND (category = $2 OR category = 'default')
ORDER BY CASE WHEN category = $2 THEN 0 ELSE 1 END
LIMIT 1
"""
row = await self.db.fetchrow(query, marketplace, category)
if row:
return MarketplaceRates(
marketplace=row["marketplace"],
category=row["category"],
commission_pct=row["commission_pct"],
logistics_pct=row["logistics_pct"],
return_logistics_pct=row["return_logistics_pct"],
storage_pct=row["storage_pct"],
acquiring_pct=row["acquiring_pct"]
)
# Fallback на default
return DEFAULT_RATES.get(marketplace, DEFAULT_RATES["wildberries"])
async def update_rates(
self,
marketplace: str,
rates: MarketplaceRates,
user_id: int
):
"""Обновление Ñтавок (Senior+)."""
query = """
INSERT INTO scout_marketplace_rates
(marketplace, category, commission_pct, logistics_pct,
return_logistics_pct, storage_pct, acquiring_pct, updated_by)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (marketplace, category)
DO UPDATE SET
commission_pct = EXCLUDED.commission_pct,
logistics_pct = EXCLUDED.logistics_pct,
return_logistics_pct = EXCLUDED.return_logistics_pct,
storage_pct = EXCLUDED.storage_pct,
acquiring_pct = EXCLUDED.acquiring_pct,
updated_by = EXCLUDED.updated_by,
updated_at = NOW()
"""
await self.db.execute(
query,
rates.marketplace,
rates.category,
rates.commission_pct,
rates.logistics_pct,
rates.return_logistics_pct,
rates.storage_pct,
rates.acquiring_pct,
user_id
)3.6 AI Verdict Engine
3.6.1 Ðазначение
Формирование финального вердикта Ñ Ð¸Ñпользованием Claude Opus 4.5 Ð´Ð»Ñ Ð³Ð»ÑƒÐ±Ð¾ÐºÐ¾Ð³Ð¾ анализа и рекомендаций.
3.6.2 Ðлгоритм Ñ„Ð¾Ñ€Ð¼Ð¸Ñ€Ð¾Ð²Ð°Ð½Ð¸Ñ Ð²ÐµÑ€Ð´Ð¸ÐºÑ‚Ð°
3.6.3 Правила Ð¾Ð¿Ñ€ÐµÐ´ÐµÐ»ÐµÐ½Ð¸Ñ Ð²ÐµÑ€Ð´Ð¸ÐºÑ‚Ð°
# services/verdict_engine.py
from enum import Enum
from typing import Tuple
class Verdict(Enum):
GO = "GO"
CONSIDER = "CONSIDER"
RISKY = "RISKY"
def determine_verdict(
trend_status: str,
monopoly_status: str,
margin_status: str
) -> Tuple[Verdict, str]:
"""
Определение вердикта на оÑнове ÑтатуÑов метрик.
Returns:
(Verdict, color)
"""
statuses = [trend_status, monopoly_status, margin_status]
green_count = statuses.count("green")
yellow_count = statuses.count("yellow")
red_count = statuses.count("red")
# RISKY: еÑть краÑный или вÑе жёлтые
if red_count > 0:
return (Verdict.RISKY, "red")
if yellow_count == 3:
return (Verdict.RISKY, "red")
# GO: вÑе зелёные или 2 зелёные + 1 жёлтаÑ
if green_count == 3:
return (Verdict.GO, "green")
if green_count == 2 and yellow_count == 1:
return (Verdict.GO, "green")
# CONSIDER: оÑтальное
return (Verdict.CONSIDER, "yellow")3.6.4 Промпт Ð´Ð»Ñ Claude Opus 4.5
# prompts/verdict_prompts.py
VERDICT_SYSTEM_PROMPT = """
Ты — Ñтарший аналитик e-commerce, ÑпециализирующийÑÑ Ð½Ð° оценке товарных ниш Ð´Ð»Ñ Ð²Ñ‹Ñ…Ð¾Ð´Ð° на маркетплейÑÑ‹.
Ð¢Ð²Ð¾Ñ Ð·Ð°Ð´Ð°Ñ‡Ð° — проанализировать данные о нише и дать обоÑнованную рекомендацию.
Принципы анализа:
1. ОбъективноÑть — опирайÑÑ Ñ‚Ð¾Ð»ÑŒÐºÐ¾ на данные
2. КонкретноÑть — указывай чиÑла и проценты
3. Actionable — рекомендации должны быть выполнимы
4. РиÑк-ориентированноÑть — вÑегда указывай риÑки
Формат ответа — Ñтрого JSON.
"""
VERDICT_USER_PROMPT = """
## Ðнализ ниши: {query}
### ИÑходные данные
- Ð—Ð°ÐºÑƒÐ¿Ð¾Ñ‡Ð½Ð°Ñ Ñ†ÐµÐ½Ð° (COGS): {cogs} ₽
- МаркетплейÑÑ‹: {marketplaces}
### Тренды ÑпроÑа
- Trend Slope: {trend_slope} ({trend_status})
- Средний объём запроÑов: {avg_volume}/меÑ
- Динамика: {trend_direction}
- СезонноÑть: {seasonality}
- УверенноÑть в данных: {trend_confidence}%
### Конкурентный анализ
{competitor_analysis}
### Unit-Ñкономика
{unit_economics}
### Предварительный вердикт
Ðа оÑнове правил: **{preliminary_verdict}**
---
Проанализируй данные и верни JSON:
{{
"verdict": "{preliminary_verdict}",
"confidence": <float 0-1, Ñ‚Ð²Ð¾Ñ ÑƒÐ²ÐµÑ€ÐµÐ½Ð½Ð¾Ñть в вердикте>,
"summary": "<2-3 предложениÑ: главный вывод>",
"detailed_analysis": {{
"trend_assessment": "<оценка трендов>",
"competition_assessment": "<оценка конкуренции>",
"economics_assessment": "<оценка Ñкономики>"
}},
"key_metrics": {{
"trend_slope": {trend_slope},
"monopoly_rate": <float>,
"expected_margin": <float %>
}},
"recommendations": [
"<ÐºÐ¾Ð½ÐºÑ€ÐµÑ‚Ð½Ð°Ñ Ñ€ÐµÐºÐ¾Ð¼ÐµÐ½Ð´Ð°Ñ†Ð¸Ñ 1>",
"<ÐºÐ¾Ð½ÐºÑ€ÐµÑ‚Ð½Ð°Ñ Ñ€ÐµÐºÐ¾Ð¼ÐµÐ½Ð´Ð°Ñ†Ð¸Ñ 2>",
...
],
"risks": [
{{
"risk": "<опиÑание риÑка>",
"probability": "<low/medium/high>",
"mitigation": "<как Ñнизить>"
}},
...
],
"opportunities": [
"<возможноÑть 1>",
"<возможноÑть 2>",
...
],
"action_plan": {{
"if_go": ["<шаг 1>", "<шаг 2>", ...],
"if_not": ["<альтернатива 1>", ...]
}},
"price_recommendations": {{
"optimal_price": <float>,
"min_viable_price": <float>,
"premium_price": <float>,
"reasoning": "<обоÑнование>"
}}
}}
"""
COMPETITOR_SECTION_TEMPLATE = """
#### {marketplace}
- Monopoly Rate: {monopoly_rate}% ({monopoly_status})
- Уникальных продавцов: {unique_sellers}
- СреднÑÑ Ñ†ÐµÐ½Ð°: {avg_price} ₽
- Ценовой диапазон: {price_min} — {price_max} ₽
- Средний рейтинг: {avg_rating}
- Барьер входа: {entry_barrier} (score: {barrier_score})
ТОП-3 продавца:
{top_sellers}
"""
UNIT_ECONOMICS_TEMPLATE = """
#### {marketplace}
- Цена продажи: {selling_price} ₽
- СебеÑтоимоÑть: {cogs} ₽
- Overhead: {overhead}%
- ЧиÑÑ‚Ð°Ñ Ð¿Ñ€Ð¸Ð±Ñ‹Ð»ÑŒ: {net_profit} ₽
- ЧиÑÑ‚Ð°Ñ Ð¼Ð°Ñ€Ð¶Ð°: {net_margin}% ({margin_status})
- Точка безубыточноÑти: {break_even} ₽
- Цена Ð´Ð»Ñ Ð¼Ð°Ñ€Ð¶Ð¸ 25%: {target_price_25} ₽
"""3.6.5 Ð ÐµÐ°Ð»Ð¸Ð·Ð°Ñ†Ð¸Ñ Verdict Engine
# services/verdict_engine.py
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
import uuid
@dataclass
class VerdictMetrics:
"""Ключевые метрики Ð´Ð»Ñ Ð²ÐµÑ€Ð´Ð¸ÐºÑ‚Ð°."""
trend_slope: float
trend_status: str
monopoly_rate: float
monopoly_status: str
expected_margin: float
margin_status: str
@dataclass
class VerdictResult:
"""Полный результат анализа."""
analysis_id: str
query: str
marketplaces: List[str]
# Вердикт
verdict: Verdict
color: str
confidence: float
# Метрики
metrics: VerdictMetrics
# AI-анализ
summary: str
detailed_analysis: Dict[str, str]
recommendations: List[str]
risks: List[Dict]
opportunities: List[str]
action_plan: Dict[str, List[str]]
price_recommendations: Dict
# ИÑходные данные
trend_result: 'TrendResult'
competitor_results: Dict[str, 'CompetitorResult']
unit_economics: Dict[str, 'UnitEconomics']
# Метаданные
user_id: int
cogs_input: float
cogs_range: Optional[tuple]
data_sources: List[str]
analyzed_at: datetime
processing_time_ms: int
class VerdictEngine:
"""Движок Ñ„Ð¾Ñ€Ð¼Ð¸Ñ€Ð¾Ð²Ð°Ð½Ð¸Ñ Ð²ÐµÑ€Ð´Ð¸ÐºÑ‚Ð°."""
def __init__(self, ai_client):
self.ai = ai_client
async def generate_verdict(
self,
trend_result: 'TrendResult',
competitor_results: Dict[str, 'CompetitorResult'],
unit_economics: Dict[str, 'UnitEconomics'],
parsed_input: 'ParsedInput',
user_id: int
) -> VerdictResult:
"""
Ð“ÐµÐ½ÐµÑ€Ð°Ñ†Ð¸Ñ Ð¿Ð¾Ð»Ð½Ð¾Ð³Ð¾ вердикта.
"""
start_time = datetime.utcnow()
# ÐÐ³Ñ€ÐµÐ³Ð°Ñ†Ð¸Ñ Ð¼ÐµÑ‚Ñ€Ð¸Ðº
metrics = self._aggregate_metrics(
trend_result,
competitor_results,
unit_economics
)
# Предварительный вердикт по правилам
preliminary_verdict, color = determine_verdict(
metrics.trend_status,
metrics.monopoly_status,
metrics.margin_status
)
# Формирование контекÑта Ð´Ð»Ñ AI
context = self._build_context(
parsed_input,
trend_result,
competitor_results,
unit_economics,
metrics,
preliminary_verdict
)
# Ð—Ð°Ð¿Ñ€Ð¾Ñ Ðº Claude Opus 4.5
ai_response = await self._call_ai(context)
# ПарÑинг ответа
analysis_id = str(uuid.uuid4())
processing_time = int((datetime.utcnow() - start_time).total_seconds() * 1000)
return VerdictResult(
analysis_id=analysis_id,
query=parsed_input.query,
marketplaces=parsed_input.marketplaces,
verdict=preliminary_verdict,
color=color,
confidence=ai_response.get("confidence", 0.7),
metrics=metrics,
summary=ai_response.get("summary", ""),
detailed_analysis=ai_response.get("detailed_analysis", {}),
recommendations=ai_response.get("recommendations", []),
risks=ai_response.get("risks", []),
opportunities=ai_response.get("opportunities", []),
action_plan=ai_response.get("action_plan", {}),
price_recommendations=ai_response.get("price_recommendations", {}),
trend_result=trend_result,
competitor_results=competitor_results,
unit_economics=unit_economics,
user_id=user_id,
cogs_input=parsed_input.cogs,
cogs_range=(parsed_input.cogs_min, parsed_input.cogs_max) if parsed_input.cogs_min else None,
data_sources=trend_result.sources_used,
analyzed_at=datetime.utcnow(),
processing_time_ms=processing_time
)
def _aggregate_metrics(
self,
trend: 'TrendResult',
competitors: Dict[str, 'CompetitorResult'],
economics: Dict[str, 'UnitEconomics']
) -> VerdictMetrics:
"""ÐÐ³Ñ€ÐµÐ³Ð°Ñ†Ð¸Ñ Ð¼ÐµÑ‚Ñ€Ð¸Ðº из вÑех иÑточников."""
# Trend
trend_slope = trend.trend_slope
trend_status = trend.trend_status.value
# Monopoly — Ñреднее по маркетплейÑам
monopoly_rates = [c.monopoly_rate for c in competitors.values()]
avg_monopoly = sum(monopoly_rates) / len(monopoly_rates) if monopoly_rates else 0
if avg_monopoly < 0.5:
monopoly_status = "green"
elif avg_monopoly < 0.7:
monopoly_status = "yellow"
else:
monopoly_status = "red"
# Margin — Ñреднее по маркетплейÑам
margins = [e.net_margin_pct for e in economics.values()]
avg_margin = sum(margins) / len(margins) if margins else 0
if avg_margin > 25:
margin_status = "green"
elif avg_margin >= 15:
margin_status = "yellow"
else:
margin_status = "red"
return VerdictMetrics(
trend_slope=trend_slope,
trend_status=trend_status,
monopoly_rate=round(avg_monopoly, 4),
monopoly_status=monopoly_status,
expected_margin=round(avg_margin, 2),
margin_status=margin_status
)
def _build_context(
self,
parsed_input: 'ParsedInput',
trend: 'TrendResult',
competitors: Dict[str, 'CompetitorResult'],
economics: Dict[str, 'UnitEconomics'],
metrics: VerdictMetrics,
preliminary_verdict: Verdict
) -> str:
"""ПоÑтроение контекÑта Ð´Ð»Ñ AI."""
from prompts.verdict_prompts import (
VERDICT_USER_PROMPT,
COMPETITOR_SECTION_TEMPLATE,
UNIT_ECONOMICS_TEMPLATE
)
# Ð¡ÐµÐºÑ†Ð¸Ñ ÐºÐ¾Ð½ÐºÑƒÑ€ÐµÐ½Ñ‚Ð¾Ð²
competitor_sections = []
for mp, comp in competitors.items():
top_sellers_text = "\n".join([
f" {i+1}. {s.name} — {s.share*100:.1f}% (рейтинг {s.avg_rating})"
for i, s in enumerate(comp.top_sellers[:3])
])
section = COMPETITOR_SECTION_TEMPLATE.format(
marketplace=mp,
monopoly_rate=round(comp.monopoly_rate * 100, 1),
monopoly_status=comp.monopoly_status.value,
unique_sellers=comp.unique_sellers_count,
avg_price=round(comp.price_analysis.avg),
price_min=round(comp.price_analysis.min),
price_max=round(comp.price_analysis.max),
avg_rating=comp.quality_analysis.avg_rating,
entry_barrier=comp.entry_barrier.value,
barrier_score=comp.entry_barrier_score,
top_sellers=top_sellers_text
)
competitor_sections.append(section)
# Ð¡ÐµÐºÑ†Ð¸Ñ unit-Ñкономики
economics_sections = []
for mp, econ in economics.items():
section = UNIT_ECONOMICS_TEMPLATE.format(
marketplace=mp,
selling_price=round(econ.selling_price),
cogs=round(econ.cogs),
overhead=round(econ.total_overhead_pct, 1),
net_profit=round(econ.net_profit),
net_margin=round(econ.net_margin_pct, 1),
margin_status=econ.margin_status.value,
break_even=round(econ.break_even_price),
target_price_25=round(econ.target_price_25)
)
economics_sections.append(section)
# Финальный промпт
return VERDICT_USER_PROMPT.format(
query=parsed_input.query,
cogs=parsed_input.cogs,
marketplaces=", ".join(parsed_input.marketplaces),
trend_slope=trend.trend_slope,
trend_status=metrics.trend_status,
avg_volume=trend.avg_monthly_volume,
trend_direction="роÑÑ‚" if trend.is_growing else ("падение" if trend.is_declining else "Ñтабильно"),
seasonality="да" if trend.seasonality_detected else "нет",
trend_confidence=round(trend.confidence * 100),
competitor_analysis="\n".join(competitor_sections),
unit_economics="\n".join(economics_sections),
preliminary_verdict=preliminary_verdict.value
)
async def _call_ai(self, context: str) -> Dict:
"""Вызов Claude Opus 4.5."""
from prompts.verdict_prompts import VERDICT_SYSTEM_PROMPT
response = await self.ai.complete(
model="claude-opus-4-5-20250514",
system=VERDICT_SYSTEM_PROMPT,
user=context,
response_format="json",
max_tokens=2000
)
return response3.7 Полный Pipeline
3.7.1 ОркеÑтратор
# services/analysis_orchestrator.py
from typing import Optional
from datetime import datetime
class AnalysisOrchestrator:
"""ОркеÑтратор полного анализа ниши."""
def __init__(
self,
input_parser: InputParser,
trend_miner: TrendMiner,
competitor_analyzer: CompetitorAnalyzer,
unit_calculator: UnitCalculator,
verdict_engine: VerdictEngine,
history_manager: 'HistoryManager'
):
self.parser = input_parser
self.trend = trend_miner
self.competitor = competitor_analyzer
self.unit = unit_calculator
self.verdict = verdict_engine
self.history = history_manager
async def analyze(
self,
user_input: str,
user_id: int
) -> VerdictResult:
"""
Полный анализ ниши.
Args:
user_input: Сырой ввод пользователÑ
user_id: ID пользователÑ
Returns:
VerdictResult Ñ Ð¿Ð¾Ð»Ð½Ñ‹Ð¼ анализом
"""
# 1. ПарÑинг ввода
parsed = self.parser.parse(user_input)
if parsed.cogs <= 0:
raise ValueError("Ðе указана Ð·Ð°ÐºÑƒÐ¿Ð¾Ñ‡Ð½Ð°Ñ Ñ†ÐµÐ½Ð° (COGS)")
# 2. Параллельный Ñбор данных
import asyncio
trend_task = self.trend.analyze(parsed)
competitor_task = self.competitor.analyze(parsed)
trend_result, competitor_results = await asyncio.gather(
trend_task,
competitor_task
)
# 3. РаÑчёт unit-Ñкономики
unit_economics = {}
for mp, comp in competitor_results.items():
avg_price = comp.price_analysis.avg
if parsed.cogs_min and parsed.cogs_max:
# Диапазон COGS
econ = await self.unit.calculate_range(
marketplace=mp,
selling_price=avg_price,
cogs_min=parsed.cogs_min,
cogs_max=parsed.cogs_max
)
unit_economics[mp] = econ["avg"]
else:
# Одно значение COGS
unit_economics[mp] = await self.unit.calculate(
marketplace=mp,
selling_price=avg_price,
cogs=parsed.cogs
)
# 4. Формирование вердикта
result = await self.verdict.generate_verdict(
trend_result=trend_result,
competitor_results=competitor_results,
unit_economics=unit_economics,
parsed_input=parsed,
user_id=user_id
)
# 5. Сохранение в иÑторию
await self.history.save(result)
return result3.8 Обработка ошибок
3.8.1 Типы ошибок AI Pipeline
| Ошибка | Причина | ДейÑтвие |
|---|---|---|
ParsingError | Ðе удалоÑÑŒ разобрать ввод | ЗапроÑить уточнение |
MissingCOGSError | Ðе указана Ð·Ð°ÐºÑƒÐ¿Ð¾Ñ‡Ð½Ð°Ñ Ñ†ÐµÐ½Ð° | ЗапроÑить COGS |
DataSourceError | ИÑточник данных недоÑтупен | ИÑпользовать fallback |
AIServiceError | Ошибка AI-ÑервиÑа | Retry Ñ backoff |
TimeoutError | Превышено Ð²Ñ€ÐµÐ¼Ñ Ð¾Ð¶Ð¸Ð´Ð°Ð½Ð¸Ñ | ЧаÑтичный результат |
3.8.2 Graceful Degradation
async def analyze_with_fallback(
self,
user_input: str,
user_id: int
) -> VerdictResult:
"""Ðнализ Ñ graceful degradation."""
try:
return await self.analyze(user_input, user_id)
except DataSourceError as e:
# Продолжаем Ñ Ð´Ð¾Ñтупными данными
logger.warning(f"Data source failed: {e}")
return await self._analyze_partial(user_input, user_id)
except AIServiceError as e:
# ИÑпользуем rule-based вердикт
logger.error(f"AI service failed: {e}")
return await self._analyze_without_ai(user_input, user_id)Документ подготовлен: Январь 2026
ВерÑиÑ: 1.0
СтатуÑ: Черновик