392 lines
17 KiB
Python
392 lines
17 KiB
Python
"""Task Worker for sequential review processing"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.database import AsyncSessionLocal
|
|
from app.models import ReviewTask, PullRequest, Repository, Review
|
|
from app.models.review_task import TaskStatusEnum
|
|
from app.models.review import ReviewStatusEnum
|
|
from app.agents.reviewer import ReviewerAgent
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ReviewTaskWorker:
|
|
"""Worker that processes review tasks sequentially"""
|
|
|
|
def __init__(self):
|
|
self.running = False
|
|
self.current_task_id = None
|
|
self.poll_interval = 10 # секунд между проверками
|
|
|
|
async def start(self):
|
|
"""Start the worker"""
|
|
self.running = True
|
|
logger.info("🚀 Task Worker запущен")
|
|
|
|
# Очищаем зависшие задачи при старте
|
|
await self._cleanup_stuck_tasks()
|
|
|
|
while self.running:
|
|
try:
|
|
await self._process_next_task()
|
|
except Exception as e:
|
|
logger.error(f"❌ Ошибка в Task Worker: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Подождать перед следующей проверкой
|
|
await asyncio.sleep(self.poll_interval)
|
|
|
|
async def stop(self):
|
|
"""Stop the worker"""
|
|
self.running = False
|
|
logger.info("⏹️ Task Worker остановлен")
|
|
|
|
async def _cleanup_stuck_tasks(self):
|
|
"""Cleanup tasks that were IN_PROGRESS when server stopped"""
|
|
async with AsyncSessionLocal() as db:
|
|
try:
|
|
# Находим все задачи в статусе IN_PROGRESS
|
|
stuck_query = select(ReviewTask).where(
|
|
ReviewTask.status == TaskStatusEnum.IN_PROGRESS
|
|
)
|
|
result = await db.execute(stuck_query)
|
|
stuck_tasks = result.scalars().all()
|
|
|
|
if stuck_tasks:
|
|
logger.info(f"🔧 Найдено {len(stuck_tasks)} зависших задач, возвращаем в очередь...")
|
|
for task in stuck_tasks:
|
|
logger.info(f" ↩️ Задача #{task.id} (PR #{task.pull_request_id}) → PENDING")
|
|
task.status = TaskStatusEnum.PENDING
|
|
task.started_at = None
|
|
# Не увеличиваем retry_count, это не была ошибка
|
|
|
|
await db.commit()
|
|
logger.info(f"✅ Зависшие задачи очищены и возвращены в очередь")
|
|
else:
|
|
logger.info("✅ Зависших задач не найдено")
|
|
|
|
# Также очищаем зависшие reviews (которые были в процессе работы)
|
|
stuck_review_statuses = [
|
|
ReviewStatusEnum.FETCHING,
|
|
ReviewStatusEnum.ANALYZING,
|
|
ReviewStatusEnum.COMMENTING
|
|
]
|
|
stuck_reviews_query = select(Review).where(
|
|
Review.status.in_(stuck_review_statuses)
|
|
)
|
|
result = await db.execute(stuck_reviews_query)
|
|
stuck_reviews = result.scalars().all()
|
|
|
|
if stuck_reviews:
|
|
logger.info(f"🔧 Найдено {len(stuck_reviews)} зависших reviews, помечаем как failed...")
|
|
for review in stuck_reviews:
|
|
logger.info(f" ⚠️ Review #{review.id} (статус: {review.status}) → FAILED")
|
|
review.status = ReviewStatusEnum.FAILED
|
|
review.error_message = "Review прерван при перезапуске сервера"
|
|
from datetime import datetime
|
|
review.completed_at = datetime.utcnow()
|
|
|
|
await db.commit()
|
|
logger.info(f"✅ Зависшие reviews помечены как failed")
|
|
else:
|
|
logger.info("✅ Зависших reviews не найдено")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Ошибка при очистке зависших задач: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
async def _process_next_task(self):
|
|
"""Process next pending task"""
|
|
async with AsyncSessionLocal() as db:
|
|
# Проверяем есть ли уже выполняющаяся задача
|
|
in_progress_query = select(ReviewTask).where(
|
|
ReviewTask.status == TaskStatusEnum.IN_PROGRESS
|
|
)
|
|
result = await db.execute(in_progress_query)
|
|
in_progress = result.scalar_one_or_none()
|
|
|
|
if in_progress:
|
|
# Уже есть задача в работе, ждем
|
|
logger.debug(f"⏳ Задача #{in_progress.id} уже выполняется")
|
|
return
|
|
|
|
# Берем следующую pending задачу (с приоритетом)
|
|
pending_query = select(ReviewTask).where(
|
|
ReviewTask.status == TaskStatusEnum.PENDING
|
|
).order_by(
|
|
ReviewTask.priority.desc(), # HIGH > NORMAL > LOW
|
|
ReviewTask.created_at.asc() # Старые первыми
|
|
).limit(1)
|
|
|
|
result = await db.execute(pending_query)
|
|
task = result.scalar_one_or_none()
|
|
|
|
if not task:
|
|
# Нет задач в очереди
|
|
return
|
|
|
|
logger.info(f"\n{'='*80}")
|
|
logger.info(f"📋 Начало обработки задачи #{task.id}")
|
|
logger.info(f" PR ID: {task.pull_request_id}")
|
|
logger.info(f" Приоритет: {task.priority}")
|
|
logger.info(f"={'='*80}\n")
|
|
|
|
# Отмечаем задачу как in_progress
|
|
task.status = TaskStatusEnum.IN_PROGRESS
|
|
task.started_at = datetime.utcnow()
|
|
self.current_task_id = task.id
|
|
await db.commit()
|
|
|
|
try:
|
|
# Выполняем review
|
|
await self._execute_review(task, db)
|
|
|
|
# Успешно завершено
|
|
task.status = TaskStatusEnum.COMPLETED
|
|
task.completed_at = datetime.utcnow()
|
|
logger.info(f"✅ Задача #{task.id} успешно завершена")
|
|
|
|
except Exception as e:
|
|
# Ошибка при выполнении
|
|
task.retry_count += 1
|
|
task.error_message = str(e)
|
|
|
|
if task.retry_count >= task.max_retries:
|
|
# Превышено количество попыток
|
|
task.status = TaskStatusEnum.FAILED
|
|
task.completed_at = datetime.utcnow()
|
|
logger.error(f"❌ Задача #{task.id} провалена после {task.retry_count} попыток: {e}")
|
|
else:
|
|
# Вернуть в pending для повторной попытки
|
|
task.status = TaskStatusEnum.PENDING
|
|
logger.warning(f"⚠️ Задача #{task.id} вернулась в очередь (попытка {task.retry_count}/{task.max_retries}): {e}")
|
|
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
finally:
|
|
self.current_task_id = None
|
|
await db.commit()
|
|
|
|
async def _execute_review(self, task: ReviewTask, db: AsyncSession):
|
|
"""Execute review for the task"""
|
|
# Get PR with repository
|
|
result = await db.execute(
|
|
select(PullRequest).where(PullRequest.id == task.pull_request_id)
|
|
)
|
|
pull_request = result.scalar_one_or_none()
|
|
|
|
if not pull_request:
|
|
raise ValueError(f"PullRequest {task.pull_request_id} not found")
|
|
|
|
# Get repository
|
|
result = await db.execute(
|
|
select(Repository).where(Repository.id == pull_request.repository_id)
|
|
)
|
|
repository = result.scalar_one_or_none()
|
|
|
|
if not repository:
|
|
raise ValueError(f"Repository {pull_request.repository_id} not found")
|
|
|
|
# Check if review already exists and is not failed
|
|
existing_review = await db.execute(
|
|
select(Review).where(
|
|
Review.pull_request_id == pull_request.id
|
|
).order_by(Review.started_at.desc())
|
|
)
|
|
review = existing_review.scalar_one_or_none()
|
|
|
|
if review and review.status not in ["failed", "pending"]:
|
|
logger.info(f" Review already exists with status: {review.status}")
|
|
return
|
|
|
|
# Create new review if doesn't exist
|
|
if not review:
|
|
review = Review(
|
|
pull_request_id=pull_request.id,
|
|
status="pending"
|
|
)
|
|
db.add(review)
|
|
await db.commit()
|
|
await db.refresh(review)
|
|
|
|
# Run review agent with streaming
|
|
logger.info(f" 🤖 Запуск AI review для PR #{pull_request.pr_number}")
|
|
|
|
# Import broadcast function
|
|
from app.main import manager
|
|
from datetime import datetime as dt
|
|
|
|
# Send initial "review started" message
|
|
logger.info(f" 📢 Отправка начального сообщения о старте review...")
|
|
try:
|
|
# Save initial event to database
|
|
from app.models.review_event import ReviewEvent
|
|
initial_db_event = ReviewEvent(
|
|
review_id=review.id,
|
|
event_type="review_started",
|
|
message=f"Начало review для PR #{pull_request.pr_number}",
|
|
data={
|
|
"repository_id": repository.id,
|
|
"repository_name": f"{repository.repo_owner}/{repository.repo_name}"
|
|
}
|
|
)
|
|
db.add(initial_db_event)
|
|
await db.commit()
|
|
logger.info(f" 💾 Начальное событие сохранено в БД: {initial_db_event.id}")
|
|
|
|
# Broadcast initial message
|
|
initial_message = {
|
|
"type": "review_started",
|
|
"review_id": review.id,
|
|
"pr_number": pull_request.pr_number,
|
|
"timestamp": dt.utcnow().isoformat(),
|
|
"data": {
|
|
"message": f"Начало review для PR #{pull_request.pr_number}",
|
|
"repository_id": repository.id,
|
|
"repository_name": f"{repository.repo_owner}/{repository.repo_name}"
|
|
}
|
|
}
|
|
await manager.broadcast(initial_message)
|
|
logger.info(f" ✅ Начальное сообщение отправлено: {len(manager.active_connections)} подключений")
|
|
except Exception as e:
|
|
logger.error(f" ❌ Ошибка отправки начального сообщения: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Create event handler
|
|
async def on_review_event(event: dict):
|
|
"""Handle review events and broadcast to clients"""
|
|
print(f"\n{'*'*80}")
|
|
print(f"CALLBACK INVOKED!")
|
|
print(f" Event type: {event.get('type')}")
|
|
print(f" Event step: {event.get('step')}")
|
|
print(f" Event message: {event.get('message')}")
|
|
print(f" Active WS connections: {len(manager.active_connections)}")
|
|
print(f"{'*'*80}")
|
|
|
|
try:
|
|
# Prepare event data
|
|
event_data = {
|
|
"type": event.get("type", "agent_update"),
|
|
"review_id": review.id,
|
|
"pr_number": pull_request.pr_number,
|
|
"timestamp": dt.utcnow().isoformat(),
|
|
"data": event
|
|
}
|
|
|
|
print(f" Prepared event_data: {event_data}")
|
|
logger.info(f" 🔔 Broadcasting event: type={event.get('type')}, connections={len(manager.active_connections)}")
|
|
|
|
# Save event to database (НЕ сохраняем llm_chunk - их слишком много)
|
|
if event.get("type") != "llm_chunk":
|
|
from app.models.review_event import ReviewEvent
|
|
db_event = ReviewEvent(
|
|
review_id=review.id,
|
|
event_type=event.get("type", "agent_update"),
|
|
step=event.get("step"),
|
|
message=event.get("message"),
|
|
data=event
|
|
)
|
|
db.add(db_event)
|
|
await db.commit()
|
|
print(f" ✓ Event saved to DB: {db_event.id}")
|
|
logger.debug(f" 💾 Event saved to DB: {db_event.id}")
|
|
|
|
# Broadcast to all connected clients (отправляем все, включая llm_chunk)
|
|
print(f" Broadcasting to {len(manager.active_connections)} connections...")
|
|
await manager.broadcast(event_data)
|
|
print(f" ✓ Broadcast completed")
|
|
|
|
# Log the event
|
|
if event.get("type") == "agent_step":
|
|
step = event.get("step", "unknown")
|
|
logger.info(f" 📍 Step: {step}")
|
|
elif event.get("type") == "llm_chunk":
|
|
# Не логируем каждый chunk, слишком много
|
|
pass
|
|
elif event.get("type") == "llm_message":
|
|
message = event.get("message", "")[:100]
|
|
logger.info(f" 💬 LLM: {message}...")
|
|
except Exception as e:
|
|
print(f" ❌ ERROR in callback: {e}")
|
|
logger.error(f" ❌ Ошибка broadcast события: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
agent = ReviewerAgent(db)
|
|
await agent.run_review_stream(
|
|
review_id=review.id,
|
|
pr_number=pull_request.pr_number,
|
|
repository_id=repository.id,
|
|
on_event=on_review_event
|
|
)
|
|
|
|
logger.info(f" ✅ Review завершен для PR #{pull_request.pr_number}")
|
|
|
|
# Send completion message
|
|
try:
|
|
# Save completion event to database
|
|
from app.models.review_event import ReviewEvent
|
|
completion_db_event = ReviewEvent(
|
|
review_id=review.id,
|
|
event_type="review_completed",
|
|
message=f"Review завершен для PR #{pull_request.pr_number}",
|
|
data={}
|
|
)
|
|
db.add(completion_db_event)
|
|
await db.commit()
|
|
logger.info(f" 💾 Событие завершения сохранено в БД: {completion_db_event.id}")
|
|
|
|
# Broadcast completion message
|
|
completion_message = {
|
|
"type": "review_completed",
|
|
"review_id": review.id,
|
|
"pr_number": pull_request.pr_number,
|
|
"timestamp": dt.utcnow().isoformat(),
|
|
"data": {
|
|
"message": f"Review завершен для PR #{pull_request.pr_number}"
|
|
}
|
|
}
|
|
await manager.broadcast(completion_message)
|
|
logger.info(f" 📢 Сообщение о завершении отправлено: {len(manager.active_connections)} подключений")
|
|
except Exception as e:
|
|
logger.error(f" ❌ Ошибка отправки сообщения о завершении: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
# Global worker instance
|
|
_worker_instance: ReviewTaskWorker | None = None
|
|
|
|
|
|
async def start_worker():
|
|
"""Start the global worker instance"""
|
|
global _worker_instance
|
|
if _worker_instance is None:
|
|
_worker_instance = ReviewTaskWorker()
|
|
# Запускаем в фоне
|
|
asyncio.create_task(_worker_instance.start())
|
|
|
|
|
|
async def stop_worker():
|
|
"""Stop the global worker instance"""
|
|
global _worker_instance
|
|
if _worker_instance:
|
|
await _worker_instance.stop()
|
|
_worker_instance = None
|
|
|
|
|
|
def get_worker() -> ReviewTaskWorker | None:
|
|
"""Get the current worker instance"""
|
|
return _worker_instance
|
|
|