Skip to content

Раздел 3: AI Pipeline

Проект: Интеллектуальная система управления репутацией
Модуль: Reputation / AI Pipeline
Версия: 2.1
Дата: Январь 2026


3.1 Назначение и архитектура

Назначение

AI Pipeline — компонент модуля Reputation, отвечающий за интеллектуальную обработку отзывов и вопросов:

ФункцияОписаниеAI-модель
КлассификацияОпределение тональности, тем, категорийGPT-5 mini
RAG-поискПолучение данных о товареKnowledge API
Генерация ответаСоздание персонализированного ответаGPT-5 mini

Архитектура Pipeline

Последовательность обработки


3.2 Celery Tasks

Реестр задач AI Pipeline

ЗадачаТипОчередьОписание
reputation.analyze_itemasyncdefaultПолный цикл анализа
reputation.classify_itemasyncdefaultТолько классификация
reputation.generate_responseasyncdefaultГенерация ответа
reputation.regenerate_responseasyncdefaultПерегенерация с инструкциями

Task: analyze_item

python
@shared_task(name="reputation.analyze_item", bind=True, max_retries=3)
def analyze_item(self, item_id: int):
    """Полный цикл AI-обработки отзыва/вопроса."""
    
    with get_db_session() as db:
        # 1. Загрузка данных
        item = db.query(ReputationItem).get(item_id)
        if not item or item.status != 'new':
            logger.warning(f"Item {item_id} already processed or not found")
            return
        
        item.status = 'analyzing'
        db.commit()
        
        try:
            # 2. Классификация (sentiment, tags)
            classification = classify_text(
                text=item.client_text,
                item_type=item.item_type,
                rating=item.rating
            )
            
            # 3. Сохранение анализа
            item.ai_analysis = {
                "sentiment": classification["sentiment"],
                "sentiment_score": classification["score"],
                "tags": classification["tags"],
                "category": classification["category"],
                "key_points": classification["key_points"],
                "analyzed_at": datetime.utcnow().isoformat()
            }
            db.commit()
            
            # 4. Генерация ответа
            generate_response.delay(item_id)
            
        except Exception as e:
            item.status = 'error'
            item.error_message = str(e)
            db.commit()
            raise self.retry(exc=e, countdown=60)

Task: generate_response

python
@shared_task(name="reputation.generate_response", bind=True, max_retries=3)
def generate_response(self, item_id: int, instructions: str = None):
    """Генерация ответа с использованием RAG."""
    
    with get_db_session() as db:
        item = db.query(ReputationItem).get(item_id)
        
        try:
            # 1. RAG: поиск данных о товаре
            product_context = knowledge_api.search(
                query=f"артикул {item.sku} размер состав характеристики",
                filters={"brand_id": [item.brand_id, "all"]},
                top_k=5
            )
            
            # 2. Формирование контекста
            context = build_generation_context(
                item=item,
                product_data=product_context,
                instructions=instructions
            )
            
            # 3. Генерация
            draft = gpt_generate(context)
            
            # 4. Валидация
            validation = validate_response(draft, item)
            if not validation.is_valid:
                draft = gpt_generate(context, correction=validation.issues)
            
            # 5. Сохранение
            response = ReputationResponse(
                item_id=item_id,
                draft_text=draft,
                generation_model="gpt-5-mini",
                status='draft'
            )
            db.add(response)
            
            item.status = 'pending_review'
            db.commit()
            
            # 6. Уведомление
            notify_new_item(item)
            
        except Exception as e:
            item.status = 'error'
            item.error_message = str(e)
            db.commit()
            raise self.retry(exc=e, countdown=60)

3.3 Классификация

Назначение

Классификация определяет:

  • Тональность (sentiment)
  • Тематические теги
  • Категорию проблемы
  • Ключевые аспекты отзыва

Промпт классификации

python
CLASSIFICATION_PROMPT = """
Проанализируй отзыв покупателя на товар одежды.

Отзыв: {client_text}
Тип: {item_type}
Оценка: {rating}/5

Определи:
1. sentiment: positive / neutral / negative
2. sentiment_score: число от 0.0 до 1.0
3. tags: список тегов (размер, качество, доставка, упаковка, цена, внешний_вид)
4. category: основная категория (sizing, quality, delivery, packaging, price, appearance, other)
5. key_points: список ключевых моментов из отзыва (1-3 пункта)

Ответ в JSON:
```json
\{
    "sentiment": "...",
    "sentiment_score": 0.0,
    "tags": ["...", "..."],
    "category": "...",
    "key_points": ["...", "..."]
\}

"""


### Реализация классификации

```python
async def classify_text(
    text: str,
    item_type: str,
    rating: Optional[int]
) -> dict:
    """Классификация текста отзыва/вопроса."""
    
    prompt = CLASSIFICATION_PROMPT.format(
        client_text=text,
        item_type=item_type,
        rating=rating or "нет"
    )
    
    response = await gpt_client.chat_completion(
        model="gpt-5-mini",
        messages=[
            {"role": "system", "content": "Ты эксперт по анализу отзывов."},
            {"role": "user", "content": prompt}
        ],
        temperature=0.3,
        max_tokens=500
    )
    
    result = json.loads(extract_json(response.content))
    
    # Валидация
    assert result["sentiment"] in ["positive", "neutral", "negative"]
    assert 0 <= result["sentiment_score"] <= 1
    
    return result

Категории классификации

КатегорияОписаниеТриггеры
sizingВопросы размера"большемерит", "маломерит", "размер"
qualityКачество товара"качество", "ткань", "швы", "материал"
deliveryДоставка"доставка", "курьер", "сроки"
packagingУпаковка"упаковка", "помятый", "порванный"
priceЦена"цена", "дорого", "скидка"
appearanceВнешний вид"цвет", "фото", "не соответствует"
otherПрочееВсё остальное

3.4 RAG-поиск

Интеграция с Knowledge API

python
async def get_product_context(sku: str, brand_id: str) -> dict:
    """Получение контекста о товаре через Knowledge API."""
    
    response = await knowledge_api.search(
        query=f"артикул {sku} состав размерная сетка уход",
        filters={
            "brand_id": [brand_id, "all"],
            "category": ["product"]
        },
        top_k=5
    )
    
    if not response.results:
        return {"found": False, "data": {}}
    
    # Извлечение структурированных данных
    product_data = {
        "found": True,
        "sku": sku,
        "name": extract_product_name(response.results),
        "composition": extract_composition(response.results),
        "sizing": extract_sizing_info(response.results),
        "care": extract_care_instructions(response.results),
        "raw_chunks": [r.text for r in response.results[:3]]
    }
    
    return product_data

Формирование контекста генерации

python
def build_generation_context(
    item: ReputationItem,
    product_data: dict,
    instructions: Optional[str] = None
) -> str:
    """Формирование контекста для генерации ответа."""
    
    context_parts = []
    
    # Информация об отзыве
    context_parts.append(f"""
## Отзыв покупателя
- Тип: {item.item_type}
- Оценка: {item.rating or 'нет'}/5
- Тональность: {item.ai_analysis.get('sentiment', 'unknown')}
- Категория: {item.ai_analysis.get('category', 'other')}

Текст: {item.client_text}
""")
    
    # Данные о товаре
    if product_data.get("found"):
        context_parts.append(f"""
## Информация о товаре
- Артикул: {product_data['sku']}
- Название: {product_data.get('name', 'н/д')}
- Состав: {product_data.get('composition', 'н/д')}
- Размерная сетка: {product_data.get('sizing', 'н/д')}
- Уход: {product_data.get('care', 'н/д')}
""")
    
    # Дополнительные инструкции
    if instructions:
        context_parts.append(f"""
## Инструкции менеджера
{instructions}
""")
    
    return "\n".join(context_parts)

3.5 Генерация ответа

Промпт генерации

python
GENERATION_SYSTEM_PROMPT = """
Ты — менеджер по работе с клиентами интернет-магазина женской одежды.

Твоя задача — написать ответ на отзыв или вопрос покупателя.

Правила:
1. Начни с благодарности за обращение
2. Обратись к покупателю по имени, если известно
3. Отвечай по существу, используя информацию о товаре
4. Если отзыв негативный — вырази сожаление и предложи решение
5. Если позитивный — подчеркни преимущества товара
6. Заверши приглашением к новым покупкам

Стиль:
- Дружелюбный, но профессиональный
- Без канцеляризмов и формализма
- Краткий (3-5 предложений)
- Без смайликов и эмодзи

Бренд: {brand_name}
"""

GENERATION_USER_PROMPT = """
{context}

## Задача
Напиши ответ на отзыв покупателя. Используй информацию о товаре для аргументации.
"""

Реализация генерации

python
async def gpt_generate(
    context: str,
    brand_id: str,
    correction: Optional[str] = None
) -> str:
    """Генерация ответа через GPT-5 mini."""
    
    brand_name = "Охана Маркет" if brand_id == "ohana_market" else "Охана Кидс"
    
    system_prompt = GENERATION_SYSTEM_PROMPT.format(brand_name=brand_name)
    user_prompt = GENERATION_USER_PROMPT.format(context=context)
    
    if correction:
        user_prompt += f"\n\nВнимание: {correction}"
    
    response = await gpt_client.chat_completion(
        model="gpt-5-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0.7,
        max_tokens=500
    )
    
    return response.content.strip()

Адаптация по бренду

БрендТонОсобенности
Охана МаркетСтильный, модныйАкцент на тренды, образы
Охана КидсТёплый, заботливыйАкцент на комфорт, безопасность

3.6 Валидация ответа

Правила валидации

ПравилоОписаниеДействие при нарушении
Длина50-500 символовПерегенерация
Запрещённые словаКонкуренты, негативФильтрация
Имя брендаДолжно быть корректнымЗамена
РелевантностьОтвет по темеПерегенерация

Реализация валидации

python
@dataclass
class ValidationResult:
    is_valid: bool
    issues: List[str]

def validate_response(text: str, item: ReputationItem) -> ValidationResult:
    """Валидация сгенерированного ответа."""
    
    issues = []
    
    # Проверка длины
    if len(text) < 50:
        issues.append("Ответ слишком короткий (< 50 символов)")
    elif len(text) > 500:
        issues.append("Ответ слишком длинный (> 500 символов)")
    
    # Проверка запрещённых слов
    forbidden = ["wildberries", "ozon", "lamoda", "конкурент"]
    for word in forbidden:
        if word.lower() in text.lower():
            issues.append(f"Содержит запрещённое слово: {word}")
    
    # Проверка бренда
    brand_name = "Охана Маркет" if item.brand_id == "ohana_market" else "Охана Кидс"
    wrong_brand = "Охана Кидс" if item.brand_id == "ohana_market" else "Охана Маркет"
    if wrong_brand in text:
        issues.append(f"Указан неправильный бренд: {wrong_brand}")
    
    # Проверка тона для негативных отзывов
    if item.rating and item.rating <= 2:
        if "сожалеем" not in text.lower() and "извин" not in text.lower():
            issues.append("Нет извинения в ответе на негативный отзыв")
    
    return ValidationResult(
        is_valid=len(issues) == 0,
        issues=issues
    )

3.7 Перегенерация

Сценарии перегенерации

СценарийИнициаторДействие
Ошибка валидацииСистемаАвтоматическая перегенерация
Запрос менеджераМенеджерПерегенерация с инструкциями
РедактированиеМенеджерСохранение как final_text

API перегенерации

python
@router.post("/items/{item_id}/regenerate")
async def regenerate_response(
    item_id: int,
    request: RegenerateRequest,
    user: User = Depends(get_current_user)
):
    """Перегенерация ответа с инструкциями."""
    
    # Проверка прав
    item = await get_item_with_access_check(item_id, user)
    
    # Инкремент счётчика
    response = item.response
    if response.regenerate_count >= 5:
        raise HTTPException(400, "Превышен лимит перегенераций (5)")
    
    response.regenerate_count += 1
    
    # Запуск перегенерации
    regenerate_response.delay(
        item_id=item_id,
        instructions=request.instructions
    )
    
    return {"status": "regenerating"}

3.8 Обработка ошибок

Единая политика

ПараметрЗначение
Timeout запроса к GPT30 секунд
Retry стратегияExponential backoff: 1с → 2с → 4с
Максимум попыток3

Типы ошибок

ОшибкаДействие
API TimeoutRetry с backoff
Rate LimitRetry через 60 сек
Invalid ResponseRetry с другим temperature
JSON Parse ErrorRetry с уточнённым промптом
Knowledge API ErrorПродолжить без контекста товара

3.9 Метрики и мониторинг

Метрики

МетрикаТипОписание
reputation_ai_duration_secondshistogramВремя обработки
reputation_ai_tokens_usedcounterИспользованные токены
reputation_ai_errors_totalcounterКоличество ошибок
reputation_regenerations_totalcounterКоличество перегенераций

Логирование

python
# Структура лога AI Pipeline
{
    "event": "ai_pipeline_completed",
    "item_id": 12345,
    "classification": {
        "sentiment": "positive",
        "category": "quality",
        "duration_ms": 450
    },
    "generation": {
        "model": "gpt-5-mini",
        "tokens_prompt": 350,
        "tokens_completion": 120,
        "duration_ms": 1200
    },
    "validation": {
        "is_valid": true,
        "issues": []
    },
    "total_duration_ms": 1850
}

Приложение А: Контрольные точки

КритерийПроверка
Классификация работаетai_analysis заполняется
RAG-поиск работаетКонтекст товара извлекается
Генерация работаетdraft_text создаётся
Валидация работаетНекорректные ответы отклоняются
Перегенерация работаетНовый текст при запросе

Приложение Б: Расширение в v2.0

При переходе на v2.0 планируется добавление:

КомпонентОписание
Vision AnalyzerАнализ фото на брак (GPT-4o Vision)
Cross-Sell EngineРекомендации сопутствующих товаров
analyze_photos taskОтдельная задача Vision-анализа

Документ подготовлен: Январь 2026
Версия: 2.1
Статус: Согласовано

Документация ADOLF Platform