Files
2025-09-05 09:49:23 +00:00

177 lines
5.5 KiB
Python

# services/redis_service.py
import redis
import json
import uuid
from config import settings
from datetime import datetime
from typing import List, Optional
# TTL di default: 30 giorni
DEFAULT_TTL_SECONDS = 30 * 24 * 60 * 60
# Connessione principale
r = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
decode_responses=True
)
# -------------------------
# Nuovo: helper per WS
# -------------------------
def publish_session_event(user_id: str, event_type: str, **kwargs):
"""
Pubblica un evento sul canale WS dell'utente.
event_type: "full_list", "created", "updated", "deleted"
kwargs: dati extra (es. session, session_id)
"""
channel = f"sessions:{user_id}"
payload = {"type": event_type}
payload.update(kwargs)
try:
r.publish(channel, json.dumps(payload))
except Exception as e:
print(f"[Redis] Errore publish su {channel}: {e}")
# -------------------------
# Chat message operations
# -------------------------
def save_chat(user_id: str, session_id: str, message: dict):
key = f"chatHistory:{user_id}:{session_id}"
enriched = {**message, "timestamp": datetime.utcnow().isoformat() + "Z"}
r.rpush(key, json.dumps(enriched))
r.expire(key, DEFAULT_TTL_SECONDS)
_update_session_stats(user_id, session_id)
refresh_session_ttl(user_id, session_id)
def get_chat(user_id: str, session_id: str, limit: Optional[int] = None):
key = f"chatHistory:{user_id}:{session_id}"
data = r.lrange(key, 0, -1)
messages = [json.loads(item) for item in data]
messages.sort(key=lambda m: m.get("timestamp", ""))
if limit:
messages = messages[-limit:]
return messages
def clear_chat(user_id: str, session_id: str):
key = f"chatHistory:{user_id}:{session_id}"
r.delete(key)
_update_session_stats(user_id, session_id, reset=True)
refresh_session_ttl(user_id, session_id)
# -------------------------
# Session metadata ops
# -------------------------
def create_session(user_id: str, first_message: str, model_name: Optional[str] = None) -> dict:
session_id = str(uuid.uuid4())
created_at = datetime.utcnow().isoformat() + "Z"
session_name = (first_message.strip()[:50] or "New Chat")
meta = {
"session_id": session_id,
"created_at": created_at,
"session_name": session_name,
"message_count": 0,
"history_size_bytes": 0,
"model_name": model_name # <-- Add this line
}
meta_key = f"chatSession:{user_id}:{session_id}"
index_key = f"chatSessionsIndex:{user_id}"
r.set(meta_key, json.dumps(meta), ex=DEFAULT_TTL_SECONDS)
score = float(datetime.utcnow().timestamp())
r.zadd(index_key, {session_id: score})
r.expire(index_key, DEFAULT_TTL_SECONDS)
# Notifica WS
publish_session_event(user_id, "created", session=meta)
return meta
def get_sessions(user_id: str) -> List[dict]:
key = f"chatSessionsIndex:{user_id}"
session_ids = r.zrevrange(key, 0, -1)
sessions = []
for sid in session_ids:
session_key = f"chatSession:{user_id}:{sid}"
raw = r.get(session_key)
if raw:
sessions.append(json.loads(raw))
return sessions
def get_session_meta(user_id: str, session_id: str) -> Optional[dict]:
raw = r.get(f"chatSession:{user_id}:{session_id}")
return json.loads(raw) if raw else None
def update_session_meta(user_id: str, session_id: str, **updates) -> Optional[dict]:
meta = get_session_meta(user_id, session_id)
if not meta:
return None
meta.update(updates)
r.set(f"chatSession:{user_id}:{session_id}", json.dumps(meta), ex=DEFAULT_TTL_SECONDS)
refresh_session_ttl(user_id, session_id)
# Notifica WS
publish_session_event(user_id, "updated", session=meta)
return meta
def delete_session(user_id: str, session_id: str):
r.delete(f"chatSession:{user_id}:{session_id}")
r.delete(f"chatHistory:{user_id}:{session_id}")
r.zrem(f"chatSessionsIndex:{user_id}", session_id)
# Notifica WS
publish_session_event(user_id, "deleted", session_id=session_id)
# -------------------------
# TTL management
# -------------------------
def extend_session_ttl(user_id: str, session_id: str, extra_seconds: int):
keys = [
f"chatSession:{user_id}:{session_id}",
f"chatHistory:{user_id}:{session_id}",
f"chatSessionsIndex:{user_id}"
]
for key in keys:
if r.exists(key):
current_ttl = r.ttl(key)
if current_ttl > 0:
r.expire(key, current_ttl + extra_seconds)
else:
r.expire(key, extra_seconds)
def refresh_session_ttl(user_id: str, session_id: str):
keys = [
f"chatSession:{user_id}:{session_id}",
f"chatHistory:{user_id}:{session_id}",
f"chatSessionsIndex:{user_id}"
]
for key in keys:
if r.exists(key):
r.expire(key, DEFAULT_TTL_SECONDS)
# -------------------------
# Internal helpers
# -------------------------
def _update_session_stats(user_id: str, session_id: str, reset: bool = False):
meta = get_session_meta(user_id, session_id)
if not meta:
return
if reset:
meta["message_count"] = 0
meta["history_size_bytes"] = 0
else:
key = f"chatHistory:{user_id}:{session_id}"
messages = r.lrange(key, 0, -1)
meta["message_count"] = len(messages)
meta["history_size_bytes"] = sum(len(m.encode("utf-8")) for m in messages)
r.set(f"chatSession:{user_id}:{session_id}", json.dumps(meta), ex=DEFAULT_TTL_SECONDS)