300 lines
11 KiB
Python
300 lines
11 KiB
Python
"""Tools for the reviewer agent"""
|
|
|
|
import json
|
|
import re
|
|
from typing import List, Dict, Any, Optional
|
|
from langchain_ollama import OllamaLLM
|
|
from langchain_core.output_parsers import JsonOutputParser
|
|
from langchain_core.prompts import PromptTemplate
|
|
from app.agents.prompts import DIFF_REVIEW_PROMPT, CODE_REVIEW_PROMPT
|
|
|
|
|
|
class CodeAnalyzer:
|
|
"""Tool for analyzing code with Ollama"""
|
|
|
|
def __init__(self, ollama_base_url: str, model: str):
|
|
self.llm = OllamaLLM(
|
|
base_url=ollama_base_url,
|
|
model=model,
|
|
temperature=0.3, # Увеличили для более внимательного анализа
|
|
format="json" # Форсируем JSON формат
|
|
)
|
|
# Используем JsonOutputParser для гарантированного JSON
|
|
self.json_parser = JsonOutputParser()
|
|
|
|
def _extract_json_from_response(self, response: str) -> Dict[str, Any]:
|
|
"""Extract JSON from LLM response"""
|
|
# Remove markdown code blocks if present
|
|
response = response.strip()
|
|
if response.startswith('```'):
|
|
response = re.sub(r'^```(?:json)?\s*', '', response)
|
|
response = re.sub(r'\s*```$', '', response)
|
|
|
|
# Try to find JSON in the response
|
|
json_match = re.search(r'\{[\s\S]*\}', response)
|
|
if json_match:
|
|
try:
|
|
json_str = json_match.group()
|
|
print(f" 🔍 Найден JSON: {json_str[:200]}...")
|
|
return json.loads(json_str)
|
|
except json.JSONDecodeError as e:
|
|
print(f" ❌ Ошибка парсинга JSON: {e}")
|
|
print(f" 📄 JSON строка: {json_str[:500]}")
|
|
else:
|
|
print(f" ❌ JSON не найден в ответе!")
|
|
print(f" 📄 Ответ: {response[:500]}")
|
|
|
|
# If no valid JSON found, return empty comments
|
|
return {"comments": []}
|
|
|
|
async def generate_summary(
|
|
self,
|
|
all_comments: List[Dict[str, Any]],
|
|
pr_title: str = "",
|
|
pr_description: str = ""
|
|
) -> str:
|
|
"""Generate overall review summary in markdown"""
|
|
if not all_comments:
|
|
return """## 🤖 AI Code Review
|
|
|
|
✅ **Отличная работа!** Серьезных проблем не обнаружено.
|
|
|
|
Код выглядит хорошо и соответствует стандартам."""
|
|
|
|
# Группируем по severity
|
|
errors = [c for c in all_comments if c.get('severity', '').upper() == 'ERROR']
|
|
warnings = [c for c in all_comments if c.get('severity', '').upper() == 'WARNING']
|
|
infos = [c for c in all_comments if c.get('severity', '').upper() == 'INFO']
|
|
|
|
summary = f"""## 🤖 AI Code Review
|
|
|
|
### 📊 Статистика
|
|
|
|
- **Всего проблем:** {len(all_comments)}
|
|
"""
|
|
|
|
if errors:
|
|
summary += f"- ❌ **Критичных:** {len(errors)}\n"
|
|
if warnings:
|
|
summary += f"- ⚠️ **Важных:** {len(warnings)}\n"
|
|
if infos:
|
|
summary += f"- ℹ️ **Рекомендаций:** {len(infos)}\n"
|
|
|
|
summary += "\n### 💡 Рекомендации\n\n"
|
|
|
|
if errors:
|
|
summary += "⚠️ **Найдены критичные проблемы!** Пожалуйста, исправьте их перед мержем в main.\n\n"
|
|
elif warnings:
|
|
summary += "Найдены важные замечания. Рекомендуется исправить перед мержем.\n\n"
|
|
else:
|
|
summary += "Проблемы не критичны, но рекомендуется учесть.\n\n"
|
|
|
|
summary += "📝 **Детальные комментарии для каждой проблемы опубликованы ниже.**\n"
|
|
|
|
return summary
|
|
|
|
async def analyze_diff(
|
|
self,
|
|
file_path: str,
|
|
diff: str,
|
|
language: Optional[str] = None,
|
|
pr_title: str = "",
|
|
pr_description: str = ""
|
|
) -> List[Dict[str, Any]]:
|
|
"""Analyze code diff and return comments"""
|
|
|
|
if not diff or not diff.strip():
|
|
print(f"⚠️ Пустой diff для {file_path}")
|
|
return []
|
|
|
|
# Add PR context if available
|
|
pr_context = ""
|
|
if pr_title or pr_description:
|
|
pr_context = f"\n\n**КОНТЕКСТ PR:**\n"
|
|
if pr_title:
|
|
pr_context += f"Название: {pr_title}\n"
|
|
if pr_description:
|
|
pr_context += f"Описание: {pr_description}\n"
|
|
pr_context += "\nОБЯЗАТЕЛЬНО проверь: соответствует ли код описанию PR!\n"
|
|
|
|
# Получаем инструкции по формату JSON от парсера
|
|
format_instructions = self.json_parser.get_format_instructions()
|
|
|
|
prompt = DIFF_REVIEW_PROMPT.format(
|
|
file_path=file_path,
|
|
diff=diff,
|
|
pr_context=pr_context,
|
|
format_instructions=format_instructions
|
|
)
|
|
|
|
print("\n" + "="*80)
|
|
print(f"🔍 АНАЛИЗ ФАЙЛА: {file_path}")
|
|
print("="*80)
|
|
|
|
if pr_title or pr_description:
|
|
print(f"\n📋 КОНТЕКСТ PR:")
|
|
print("-" * 80)
|
|
if pr_title:
|
|
print(f"Название: {pr_title}")
|
|
if pr_description:
|
|
desc_short = pr_description[:200] + ("..." if len(pr_description) > 200 else "")
|
|
print(f"Описание: {desc_short}")
|
|
print("-" * 80)
|
|
|
|
print(f"\n📝 DIFF ({len(diff)} символов):")
|
|
print("-" * 80)
|
|
# Показываем первые 800 символов diff
|
|
print(diff[:800] + ("...\n[обрезано]" if len(diff) > 800 else ""))
|
|
print("-" * 80)
|
|
print(f"\n💭 ПРОМПТ ({len(prompt)} символов):")
|
|
print("-" * 80)
|
|
print(prompt[:500] + "...")
|
|
print("-" * 80)
|
|
|
|
try:
|
|
print(f"\n⏳ Отправка запроса к Ollama ({self.llm.model})...")
|
|
|
|
# Создаем chain с LLM и JSON парсером
|
|
chain = self.llm | self.json_parser
|
|
|
|
# Получаем результат
|
|
result = await chain.ainvoke(prompt)
|
|
|
|
print(f"\n🤖 ОТВЕТ AI (распарсен через JsonOutputParser):")
|
|
print("-" * 80)
|
|
print(json.dumps(result, ensure_ascii=False, indent=2)[:500] + "...")
|
|
print("-" * 80)
|
|
|
|
comments = result.get("comments", [])
|
|
|
|
if comments:
|
|
print(f"\n✅ Найдено комментариев: {len(comments)}")
|
|
for i, comment in enumerate(comments, 1):
|
|
print(f"\n {i}. Строка {comment.get('line', '?')}:")
|
|
print(f" Severity: {comment.get('severity', '?')}")
|
|
print(f" Message: {comment.get('message', '?')[:100]}...")
|
|
else:
|
|
print("\n⚠️ Комментариев не найдено! AI не нашел проблем.")
|
|
|
|
print("="*80 + "\n")
|
|
|
|
return comments
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ ОШИБКА при анализе {file_path}: {e}")
|
|
print(f" Тип ошибки: {type(e).__name__}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Fallback: попытка извлечь JSON вручную
|
|
print("\n🔄 Попытка fallback парсинга...")
|
|
try:
|
|
if hasattr(e, 'args') and len(e.args) > 0:
|
|
response_text = str(e.args[0])
|
|
result = self._extract_json_from_response(response_text)
|
|
return result.get("comments", [])
|
|
except:
|
|
pass
|
|
|
|
return []
|
|
|
|
async def analyze_code(
|
|
self,
|
|
file_path: str,
|
|
code: str,
|
|
language: str = "python",
|
|
patch_info: str = ""
|
|
) -> List[Dict[str, Any]]:
|
|
"""Analyze full code content and return comments"""
|
|
|
|
if not code or not code.strip():
|
|
return []
|
|
|
|
prompt = CODE_REVIEW_PROMPT.format(
|
|
file_path=file_path,
|
|
code=code,
|
|
language=language,
|
|
patch_info=patch_info
|
|
)
|
|
|
|
try:
|
|
response = await self.llm.ainvoke(prompt)
|
|
result = self._extract_json_from_response(response)
|
|
return result.get("comments", [])
|
|
except Exception as e:
|
|
print(f"Error analyzing code for {file_path}: {e}")
|
|
return []
|
|
|
|
|
|
def detect_language(file_path: str) -> str:
|
|
"""Detect programming language from file extension"""
|
|
extension_map = {
|
|
'.py': 'python',
|
|
'.js': 'javascript',
|
|
'.ts': 'typescript',
|
|
'.tsx': 'typescript',
|
|
'.jsx': 'javascript',
|
|
'.java': 'java',
|
|
'.go': 'go',
|
|
'.rs': 'rust',
|
|
'.cpp': 'cpp',
|
|
'.c': 'c',
|
|
'.cs': 'csharp',
|
|
'.php': 'php',
|
|
'.rb': 'ruby',
|
|
'.swift': 'swift',
|
|
'.kt': 'kotlin',
|
|
'.scala': 'scala',
|
|
'.sh': 'bash',
|
|
'.sql': 'sql',
|
|
'.html': 'html',
|
|
'.css': 'css',
|
|
'.scss': 'scss',
|
|
'.yaml': 'yaml',
|
|
'.yml': 'yaml',
|
|
'.json': 'json',
|
|
'.xml': 'xml',
|
|
'.md': 'markdown',
|
|
}
|
|
|
|
ext = '.' + file_path.split('.')[-1] if '.' in file_path else ''
|
|
return extension_map.get(ext.lower(), 'text')
|
|
|
|
|
|
def should_review_file(file_path: str) -> bool:
|
|
"""Determine if file should be reviewed"""
|
|
# Skip binary, generated, and config files
|
|
skip_extensions = {
|
|
'.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico',
|
|
'.pdf', '.zip', '.tar', '.gz',
|
|
'.lock', '.min.js', '.min.css',
|
|
'.pyc', '.pyo', '.class', '.o',
|
|
}
|
|
|
|
skip_patterns = [
|
|
'node_modules/',
|
|
'venv/',
|
|
'.git/',
|
|
'dist/',
|
|
'build/',
|
|
'__pycache__/',
|
|
'.next/',
|
|
'.nuxt/',
|
|
'package-lock.json',
|
|
'yarn.lock',
|
|
'poetry.lock',
|
|
]
|
|
|
|
# Check extension
|
|
ext = '.' + file_path.split('.')[-1] if '.' in file_path else ''
|
|
if ext.lower() in skip_extensions:
|
|
return False
|
|
|
|
# Check patterns
|
|
for pattern in skip_patterns:
|
|
if pattern in file_path:
|
|
return False
|
|
|
|
return True
|
|
|