fix: Correct LangGraph event handling - events are tuples not dicts + add test scripts

This commit is contained in:
Primakov Alexandr Alexandrovich
2025-10-13 13:46:35 +03:00
parent c9dc486011
commit cfba28f913
4 changed files with 674 additions and 20 deletions
+228
View File
@@ -0,0 +1,228 @@
"""
Упрощенный тест LangGraph без реального review
Проверяет только механизм стриминга событий
"""
import asyncio
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Any
import operator
# Простое состояние для теста
class SimpleState(TypedDict):
counter: Annotated[int, operator.add]
messages: list[str]
step: str
# Простые ноды
async def node_1(state: SimpleState) -> SimpleState:
"""Первая нода"""
print(" [NODE 1] Выполняется...")
await asyncio.sleep(0.5)
return {
"counter": 1,
"messages": ["Node 1 completed"],
"step": "node_1"
}
async def node_2(state: SimpleState) -> SimpleState:
"""Вторая нода"""
print(" [NODE 2] Выполняется...")
await asyncio.sleep(0.5)
return {
"counter": 1,
"messages": ["Node 2 completed"],
"step": "node_2"
}
async def node_3(state: SimpleState) -> SimpleState:
"""Третья нода"""
print(" [NODE 3] Выполняется...")
await asyncio.sleep(0.5)
return {
"counter": 1,
"messages": ["Node 3 completed"],
"step": "node_3"
}
def create_test_graph():
"""Создает тестовый граф"""
workflow = StateGraph(SimpleState)
# Добавляем ноды
workflow.add_node("node_1", node_1)
workflow.add_node("node_2", node_2)
workflow.add_node("node_3", node_3)
# Определяем связи
workflow.set_entry_point("node_1")
workflow.add_edge("node_1", "node_2")
workflow.add_edge("node_2", "node_3")
workflow.add_edge("node_3", END)
return workflow.compile()
async def test_stream_modes():
"""Тестирует разные режимы стриминга"""
graph = create_test_graph()
initial_state: SimpleState = {
"counter": 0,
"messages": [],
"step": "start"
}
# Тест 1: updates
print("\n" + "="*80)
print("ТЕСТ 1: stream_mode=['updates']")
print("="*80)
event_count = 0
async for event in graph.astream(initial_state, stream_mode=["updates"]):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
print(f" Content: {event}")
print(f"\n✅ Получено событий: {event_count}")
# Тест 2: messages
print("\n" + "="*80)
print("ТЕСТ 2: stream_mode=['messages']")
print("="*80)
event_count = 0
async for event in graph.astream(initial_state, stream_mode=["messages"]):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
print(f" Content: {event}")
print(f"\n✅ Получено событий: {event_count}")
# Тест 3: updates + messages
print("\n" + "="*80)
print("ТЕСТ 3: stream_mode=['updates', 'messages']")
print("="*80)
event_count = 0
async for event in graph.astream(initial_state, stream_mode=["updates", "messages"]):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
if isinstance(event, dict):
print(f" Keys: {list(event.keys())}")
elif isinstance(event, tuple):
print(f" Tuple[0] type: {type(event[0])}")
print(f" Tuple[1] type: {type(event[1])}")
print(f" Content: {event}")
print(f"\n✅ Получено событий: {event_count}")
# Тест 4: values
print("\n" + "="*80)
print("ТЕСТ 4: stream_mode=['values']")
print("="*80)
event_count = 0
async for event in graph.astream(initial_state, stream_mode=["values"]):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
print(f" Content: {event}")
print(f"\n✅ Получено событий: {event_count}")
# Тест 5: debug (все режимы)
print("\n" + "="*80)
print("ТЕСТ 5: stream_mode=['updates', 'messages', 'values', 'debug']")
print("="*80)
event_count = 0
async for event in graph.astream(
initial_state,
stream_mode=["updates", "messages", "values", "debug"]
):
event_count += 1
print(f"\n📨 Event #{event_count}")
print(f" Type: {type(event)}")
if isinstance(event, tuple) and len(event) >= 2:
print(f" Event type (tuple[0]): {event[0]}")
print(f" Event data (tuple[1]): {event[1]}")
else:
print(f" Content: {event}")
print(f"\n✅ Получено событий: {event_count}")
async def test_with_callback():
"""Тест с использованием callback для обработки событий"""
print("\n" + "="*80)
print("ТЕСТ 6: Callback обработка событий")
print("="*80)
graph = create_test_graph()
initial_state: SimpleState = {
"counter": 0,
"messages": [],
"step": "start"
}
collected_events = []
async def event_callback(event_type: str, event_data: Any):
"""Callback для обработки событий"""
collected_events.append({
"type": event_type,
"data": event_data
})
print(f" 🔔 Callback: {event_type}")
# Симуляция обработки событий с callback
event_count = 0
async for event in graph.astream(initial_state, stream_mode=["updates", "messages"]):
event_count += 1
print(f"\n📨 Event #{event_count}: {type(event)}")
# Обрабатываем событие
if isinstance(event, tuple) and len(event) >= 2:
await event_callback(str(event[0]), event[1])
elif isinstance(event, dict):
for node_name, node_data in event.items():
await event_callback(f"node_update_{node_name}", node_data)
else:
await event_callback("unknown", event)
print(f"\n✅ Всего событий: {event_count}")
print(f"✅ Callback вызовов: {len(collected_events)}")
print(f"\n📋 Собранные события:")
for i, evt in enumerate(collected_events, 1):
print(f" {i}. {evt['type']}")
async def main():
print("\n" + "="*80)
print("ПРОСТОЙ ТЕСТ LANGGRAPH STREAMING")
print("="*80 + "\n")
await test_stream_modes()
await test_with_callback()
print("\n" + "="*80)
print("ТЕСТИРОВАНИЕ ЗАВЕРШЕНО")
print("="*80 + "\n")
if __name__ == "__main__":
asyncio.run(main())