This commit is contained in:
Primakov Alexandr Alexandrovich
2025-10-12 23:15:09 +03:00
commit 09cdd06307
88 changed files with 15007 additions and 0 deletions
+299
View File
@@ -0,0 +1,299 @@
"""Tools for the reviewer agent"""
import json
import re
from typing import List, Dict, Any, Optional
from langchain_ollama import OllamaLLM
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from app.agents.prompts import DIFF_REVIEW_PROMPT, CODE_REVIEW_PROMPT
class CodeAnalyzer:
"""Tool for analyzing code with Ollama"""
def __init__(self, ollama_base_url: str, model: str):
self.llm = OllamaLLM(
base_url=ollama_base_url,
model=model,
temperature=0.3, # Увеличили для более внимательного анализа
format="json" # Форсируем JSON формат
)
# Используем JsonOutputParser для гарантированного JSON
self.json_parser = JsonOutputParser()
def _extract_json_from_response(self, response: str) -> Dict[str, Any]:
"""Extract JSON from LLM response"""
# Remove markdown code blocks if present
response = response.strip()
if response.startswith('```'):
response = re.sub(r'^```(?:json)?\s*', '', response)
response = re.sub(r'\s*```$', '', response)
# Try to find JSON in the response
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
try:
json_str = json_match.group()
print(f" 🔍 Найден JSON: {json_str[:200]}...")
return json.loads(json_str)
except json.JSONDecodeError as e:
print(f" ❌ Ошибка парсинга JSON: {e}")
print(f" 📄 JSON строка: {json_str[:500]}")
else:
print(f" ❌ JSON не найден в ответе!")
print(f" 📄 Ответ: {response[:500]}")
# If no valid JSON found, return empty comments
return {"comments": []}
async def generate_summary(
self,
all_comments: List[Dict[str, Any]],
pr_title: str = "",
pr_description: str = ""
) -> str:
"""Generate overall review summary in markdown"""
if not all_comments:
return """## 🤖 AI Code Review
✅ **Отличная работа!** Серьезных проблем не обнаружено.
Код выглядит хорошо и соответствует стандартам."""
# Группируем по severity
errors = [c for c in all_comments if c.get('severity', '').upper() == 'ERROR']
warnings = [c for c in all_comments if c.get('severity', '').upper() == 'WARNING']
infos = [c for c in all_comments if c.get('severity', '').upper() == 'INFO']
summary = f"""## 🤖 AI Code Review
### 📊 Статистика
- **Всего проблем:** {len(all_comments)}
"""
if errors:
summary += f"- ❌ **Критичных:** {len(errors)}\n"
if warnings:
summary += f"- ⚠️ **Важных:** {len(warnings)}\n"
if infos:
summary += f"- ℹ️ **Рекомендаций:** {len(infos)}\n"
summary += "\n### 💡 Рекомендации\n\n"
if errors:
summary += "⚠️ **Найдены критичные проблемы!** Пожалуйста, исправьте их перед мержем в main.\n\n"
elif warnings:
summary += "Найдены важные замечания. Рекомендуется исправить перед мержем.\n\n"
else:
summary += "Проблемы не критичны, но рекомендуется учесть.\n\n"
summary += "📝 **Детальные комментарии для каждой проблемы опубликованы ниже.**\n"
return summary
async def analyze_diff(
self,
file_path: str,
diff: str,
language: Optional[str] = None,
pr_title: str = "",
pr_description: str = ""
) -> List[Dict[str, Any]]:
"""Analyze code diff and return comments"""
if not diff or not diff.strip():
print(f"⚠️ Пустой diff для {file_path}")
return []
# Add PR context if available
pr_context = ""
if pr_title or pr_description:
pr_context = f"\n\n**КОНТЕКСТ PR:**\n"
if pr_title:
pr_context += f"Название: {pr_title}\n"
if pr_description:
pr_context += f"Описание: {pr_description}\n"
pr_context += "\nОБЯЗАТЕЛЬНО проверь: соответствует ли код описанию PR!\n"
# Получаем инструкции по формату JSON от парсера
format_instructions = self.json_parser.get_format_instructions()
prompt = DIFF_REVIEW_PROMPT.format(
file_path=file_path,
diff=diff,
pr_context=pr_context,
format_instructions=format_instructions
)
print("\n" + "="*80)
print(f"🔍 АНАЛИЗ ФАЙЛА: {file_path}")
print("="*80)
if pr_title or pr_description:
print(f"\n📋 КОНТЕКСТ PR:")
print("-" * 80)
if pr_title:
print(f"Название: {pr_title}")
if pr_description:
desc_short = pr_description[:200] + ("..." if len(pr_description) > 200 else "")
print(f"Описание: {desc_short}")
print("-" * 80)
print(f"\n📝 DIFF ({len(diff)} символов):")
print("-" * 80)
# Показываем первые 800 символов diff
print(diff[:800] + ("...\n[обрезано]" if len(diff) > 800 else ""))
print("-" * 80)
print(f"\n💭 ПРОМПТ ({len(prompt)} символов):")
print("-" * 80)
print(prompt[:500] + "...")
print("-" * 80)
try:
print(f"\n⏳ Отправка запроса к Ollama ({self.llm.model})...")
# Создаем chain с LLM и JSON парсером
chain = self.llm | self.json_parser
# Получаем результат
result = await chain.ainvoke(prompt)
print(f"\n🤖 ОТВЕТ AI (распарсен через JsonOutputParser):")
print("-" * 80)
print(json.dumps(result, ensure_ascii=False, indent=2)[:500] + "...")
print("-" * 80)
comments = result.get("comments", [])
if comments:
print(f"\n✅ Найдено комментариев: {len(comments)}")
for i, comment in enumerate(comments, 1):
print(f"\n {i}. Строка {comment.get('line', '?')}:")
print(f" Severity: {comment.get('severity', '?')}")
print(f" Message: {comment.get('message', '?')[:100]}...")
else:
print("\n⚠️ Комментариев не найдено! AI не нашел проблем.")
print("="*80 + "\n")
return comments
except Exception as e:
print(f"\n❌ ОШИБКА при анализе {file_path}: {e}")
print(f" Тип ошибки: {type(e).__name__}")
import traceback
traceback.print_exc()
# Fallback: попытка извлечь JSON вручную
print("\n🔄 Попытка fallback парсинга...")
try:
if hasattr(e, 'args') and len(e.args) > 0:
response_text = str(e.args[0])
result = self._extract_json_from_response(response_text)
return result.get("comments", [])
except:
pass
return []
async def analyze_code(
self,
file_path: str,
code: str,
language: str = "python",
patch_info: str = ""
) -> List[Dict[str, Any]]:
"""Analyze full code content and return comments"""
if not code or not code.strip():
return []
prompt = CODE_REVIEW_PROMPT.format(
file_path=file_path,
code=code,
language=language,
patch_info=patch_info
)
try:
response = await self.llm.ainvoke(prompt)
result = self._extract_json_from_response(response)
return result.get("comments", [])
except Exception as e:
print(f"Error analyzing code for {file_path}: {e}")
return []
def detect_language(file_path: str) -> str:
"""Detect programming language from file extension"""
extension_map = {
'.py': 'python',
'.js': 'javascript',
'.ts': 'typescript',
'.tsx': 'typescript',
'.jsx': 'javascript',
'.java': 'java',
'.go': 'go',
'.rs': 'rust',
'.cpp': 'cpp',
'.c': 'c',
'.cs': 'csharp',
'.php': 'php',
'.rb': 'ruby',
'.swift': 'swift',
'.kt': 'kotlin',
'.scala': 'scala',
'.sh': 'bash',
'.sql': 'sql',
'.html': 'html',
'.css': 'css',
'.scss': 'scss',
'.yaml': 'yaml',
'.yml': 'yaml',
'.json': 'json',
'.xml': 'xml',
'.md': 'markdown',
}
ext = '.' + file_path.split('.')[-1] if '.' in file_path else ''
return extension_map.get(ext.lower(), 'text')
def should_review_file(file_path: str) -> bool:
"""Determine if file should be reviewed"""
# Skip binary, generated, and config files
skip_extensions = {
'.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico',
'.pdf', '.zip', '.tar', '.gz',
'.lock', '.min.js', '.min.css',
'.pyc', '.pyo', '.class', '.o',
}
skip_patterns = [
'node_modules/',
'venv/',
'.git/',
'dist/',
'build/',
'__pycache__/',
'.next/',
'.nuxt/',
'package-lock.json',
'yarn.lock',
'poetry.lock',
]
# Check extension
ext = '.' + file_path.split('.')[-1] if '.' in file_path else ''
if ext.lower() in skip_extensions:
return False
# Check patterns
for pattern in skip_patterns:
if pattern in file_path:
return False
return True