154 lines
7.0 KiB
Python
154 lines
7.0 KiB
Python
"""Управление токенами GigaChat."""
|
|
import base64
|
|
import os
|
|
import time
|
|
import uuid
|
|
from typing import Optional
|
|
from urllib.parse import urlencode
|
|
|
|
import aiohttp
|
|
from aiohttp import FormData
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
|
|
class TokenManager:
|
|
"""Менеджер токенов для GigaChat API."""
|
|
|
|
def __init__(
|
|
self,
|
|
client_id: Optional[str] = None,
|
|
client_secret: Optional[str] = None,
|
|
auth_url: Optional[str] = None,
|
|
credentials: Optional[str] = None,
|
|
):
|
|
# Приоритет: переданные параметры > переменные окружения > .env файл
|
|
self.credentials = credentials or os.environ.get("GIGACHAT_CREDENTIALS") or os.getenv("GIGACHAT_CREDENTIALS")
|
|
self.client_id = client_id or os.environ.get("GIGACHAT_CLIENT_ID") or os.getenv("GIGACHAT_CLIENT_ID")
|
|
self.client_secret = client_secret or os.environ.get("GIGACHAT_CLIENT_SECRET") or os.getenv("GIGACHAT_CLIENT_SECRET")
|
|
self.auth_url = auth_url or os.environ.get("GIGACHAT_AUTH_URL") or os.getenv(
|
|
"GIGACHAT_AUTH_URL", "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"
|
|
)
|
|
self._access_token: Optional[str] = None
|
|
self._expires_at: float = 0
|
|
|
|
async def get_token(self, force_refresh: bool = False) -> str:
|
|
"""
|
|
Получить актуальный токен доступа.
|
|
|
|
Args:
|
|
force_refresh: Принудительно обновить токен
|
|
|
|
Returns:
|
|
Токен доступа
|
|
"""
|
|
if not force_refresh and self._access_token and time.time() < self._expires_at:
|
|
return self._access_token
|
|
|
|
# Определяем, какой вариант используется: готовый ключ или client_id/client_secret
|
|
if self.credentials:
|
|
# Используем готовый ключ авторизации (уже закодированный в Base64)
|
|
# Убираем префикс "Basic " если он есть
|
|
credentials_key = self.credentials.strip().replace('\n', '').replace('\r', '')
|
|
if credentials_key.startswith('Basic '):
|
|
credentials_key = credentials_key[6:]
|
|
|
|
connector = aiohttp.TCPConnector(ssl=False)
|
|
async with aiohttp.ClientSession(connector=connector) as session:
|
|
headers = {
|
|
"Authorization": f"Basic {credentials_key}",
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"Accept": "application/json",
|
|
"RqUID": str(uuid.uuid4())
|
|
}
|
|
|
|
form_data = {
|
|
"scope": "GIGACHAT_API_PERS"
|
|
}
|
|
|
|
async with session.post(
|
|
self.auth_url,
|
|
headers=headers,
|
|
data=form_data,
|
|
) as response:
|
|
if response.status != 200:
|
|
error_text = await response.text()
|
|
raise Exception(f"Failed to get token: {response.status} - {error_text}")
|
|
|
|
data = await response.json()
|
|
self._access_token = data["access_token"]
|
|
expires_in = data.get("expires_in", 1800)
|
|
self._expires_at = time.time() + expires_in - 300
|
|
|
|
return self._access_token
|
|
elif self.client_id and self.client_secret:
|
|
# Очищаем от пробелов и переносов
|
|
client_secret = self.client_secret.strip().replace('\n', '').replace('\r', '')
|
|
|
|
# Проверяем, является ли client_secret уже закодированным ключом Base64
|
|
# Если secret начинается с букв/цифр и длиннее 50 символов, это уже ключ авторизации
|
|
is_already_encoded = len(client_secret) > 50 and all(c.isalnum() or c in '+/=' for c in client_secret)
|
|
|
|
if is_already_encoded:
|
|
# Это уже готовый ключ авторизации в Base64
|
|
encoded_credentials = client_secret
|
|
print(f"DEBUG: Using pre-encoded authorization key (length: {len(encoded_credentials)})")
|
|
else:
|
|
# Это настоящий client_id и client_secret, нужно закодировать
|
|
client_id = self.client_id.strip().replace('\n', '').replace('\r', '')
|
|
|
|
if not client_id or not client_secret:
|
|
raise Exception("GIGACHAT_CLIENT_ID and GIGACHAT_CLIENT_SECRET cannot be empty after cleaning")
|
|
|
|
credentials_string = f"{client_id}:{client_secret}"
|
|
encoded_credentials = base64.b64encode(credentials_string.encode('utf-8')).decode('utf-8')
|
|
print(f"DEBUG: Encoded client_id:client_secret (length: {len(encoded_credentials)})")
|
|
|
|
connector = aiohttp.TCPConnector(ssl=False)
|
|
async with aiohttp.ClientSession(connector=connector) as session:
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"Accept": "application/json",
|
|
"RqUID": str(uuid.uuid4()),
|
|
"Authorization": f"Basic {encoded_credentials}"
|
|
}
|
|
|
|
payload = {"scope": "GIGACHAT_API_PERS"}
|
|
|
|
print(f"DEBUG: Authorization header starts with: Basic {encoded_credentials[:10]}...")
|
|
|
|
async with session.post(
|
|
self.auth_url,
|
|
headers=headers,
|
|
data=payload,
|
|
) as response:
|
|
if response.status != 200:
|
|
error_text = await response.text()
|
|
raise Exception(
|
|
f"Failed to get token: {response.status} - {error_text}. "
|
|
f"URL: {self.auth_url}"
|
|
)
|
|
|
|
data = await response.json()
|
|
self._access_token = data["access_token"]
|
|
expires_in = data.get("expires_in", 1800)
|
|
self._expires_at = time.time() + expires_in - 300
|
|
|
|
return self._access_token
|
|
else:
|
|
raise Exception(
|
|
"Either GIGACHAT_CREDENTIALS (ready authorization key) or "
|
|
"GIGACHAT_CLIENT_ID and GIGACHAT_CLIENT_SECRET must be set"
|
|
)
|
|
|
|
def is_token_valid(self) -> bool:
|
|
"""Проверить, действителен ли текущий токен."""
|
|
return self._access_token is not None and time.time() < self._expires_at
|
|
|
|
def clear_token(self):
|
|
"""Очистить токен (для тестирования)."""
|
|
self._access_token = None
|
|
self._expires_at = 0
|
|
|