177 lines
5.5 KiB
Python
177 lines
5.5 KiB
Python
# services/redis_service.py
|
|
|
|
import redis
|
|
import json
|
|
import uuid
|
|
from config import settings
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
# TTL di default: 30 giorni
|
|
DEFAULT_TTL_SECONDS = 30 * 24 * 60 * 60
|
|
|
|
# Connessione principale
|
|
r = redis.Redis(
|
|
host=settings.REDIS_HOST,
|
|
port=settings.REDIS_PORT,
|
|
db=settings.REDIS_DB,
|
|
decode_responses=True
|
|
)
|
|
|
|
# -------------------------
|
|
# Nuovo: helper per WS
|
|
# -------------------------
|
|
|
|
def publish_session_event(user_id: str, event_type: str, **kwargs):
|
|
"""
|
|
Pubblica un evento sul canale WS dell'utente.
|
|
event_type: "full_list", "created", "updated", "deleted"
|
|
kwargs: dati extra (es. session, session_id)
|
|
"""
|
|
channel = f"sessions:{user_id}"
|
|
payload = {"type": event_type}
|
|
payload.update(kwargs)
|
|
try:
|
|
r.publish(channel, json.dumps(payload))
|
|
except Exception as e:
|
|
print(f"[Redis] Errore publish su {channel}: {e}")
|
|
|
|
# -------------------------
|
|
# Chat message operations
|
|
# -------------------------
|
|
|
|
def save_chat(user_id: str, session_id: str, message: dict):
|
|
key = f"chatHistory:{user_id}:{session_id}"
|
|
enriched = {**message, "timestamp": datetime.utcnow().isoformat() + "Z"}
|
|
r.rpush(key, json.dumps(enriched))
|
|
r.expire(key, DEFAULT_TTL_SECONDS)
|
|
_update_session_stats(user_id, session_id)
|
|
refresh_session_ttl(user_id, session_id)
|
|
|
|
def get_chat(user_id: str, session_id: str, limit: Optional[int] = None):
|
|
key = f"chatHistory:{user_id}:{session_id}"
|
|
data = r.lrange(key, 0, -1)
|
|
messages = [json.loads(item) for item in data]
|
|
messages.sort(key=lambda m: m.get("timestamp", ""))
|
|
if limit:
|
|
messages = messages[-limit:]
|
|
return messages
|
|
|
|
def clear_chat(user_id: str, session_id: str):
|
|
key = f"chatHistory:{user_id}:{session_id}"
|
|
r.delete(key)
|
|
_update_session_stats(user_id, session_id, reset=True)
|
|
refresh_session_ttl(user_id, session_id)
|
|
|
|
# -------------------------
|
|
# Session metadata ops
|
|
# -------------------------
|
|
|
|
def create_session(user_id: str, first_message: str, model_name: Optional[str] = None) -> dict:
|
|
session_id = str(uuid.uuid4())
|
|
created_at = datetime.utcnow().isoformat() + "Z"
|
|
session_name = (first_message.strip()[:50] or "New Chat")
|
|
meta = {
|
|
"session_id": session_id,
|
|
"created_at": created_at,
|
|
"session_name": session_name,
|
|
"message_count": 0,
|
|
"history_size_bytes": 0,
|
|
"model_name": model_name # <-- Add this line
|
|
}
|
|
meta_key = f"chatSession:{user_id}:{session_id}"
|
|
index_key = f"chatSessionsIndex:{user_id}"
|
|
|
|
r.set(meta_key, json.dumps(meta), ex=DEFAULT_TTL_SECONDS)
|
|
score = float(datetime.utcnow().timestamp())
|
|
r.zadd(index_key, {session_id: score})
|
|
r.expire(index_key, DEFAULT_TTL_SECONDS)
|
|
|
|
# Notifica WS
|
|
publish_session_event(user_id, "created", session=meta)
|
|
|
|
return meta
|
|
|
|
def get_sessions(user_id: str) -> List[dict]:
|
|
key = f"chatSessionsIndex:{user_id}"
|
|
session_ids = r.zrevrange(key, 0, -1)
|
|
sessions = []
|
|
for sid in session_ids:
|
|
session_key = f"chatSession:{user_id}:{sid}"
|
|
raw = r.get(session_key)
|
|
if raw:
|
|
sessions.append(json.loads(raw))
|
|
return sessions
|
|
|
|
def get_session_meta(user_id: str, session_id: str) -> Optional[dict]:
|
|
raw = r.get(f"chatSession:{user_id}:{session_id}")
|
|
return json.loads(raw) if raw else None
|
|
|
|
def update_session_meta(user_id: str, session_id: str, **updates) -> Optional[dict]:
|
|
meta = get_session_meta(user_id, session_id)
|
|
if not meta:
|
|
return None
|
|
meta.update(updates)
|
|
r.set(f"chatSession:{user_id}:{session_id}", json.dumps(meta), ex=DEFAULT_TTL_SECONDS)
|
|
refresh_session_ttl(user_id, session_id)
|
|
|
|
# Notifica WS
|
|
publish_session_event(user_id, "updated", session=meta)
|
|
|
|
return meta
|
|
|
|
def delete_session(user_id: str, session_id: str):
|
|
r.delete(f"chatSession:{user_id}:{session_id}")
|
|
r.delete(f"chatHistory:{user_id}:{session_id}")
|
|
r.zrem(f"chatSessionsIndex:{user_id}", session_id)
|
|
|
|
# Notifica WS
|
|
publish_session_event(user_id, "deleted", session_id=session_id)
|
|
|
|
# -------------------------
|
|
# TTL management
|
|
# -------------------------
|
|
|
|
def extend_session_ttl(user_id: str, session_id: str, extra_seconds: int):
|
|
keys = [
|
|
f"chatSession:{user_id}:{session_id}",
|
|
f"chatHistory:{user_id}:{session_id}",
|
|
f"chatSessionsIndex:{user_id}"
|
|
]
|
|
for key in keys:
|
|
if r.exists(key):
|
|
current_ttl = r.ttl(key)
|
|
if current_ttl > 0:
|
|
r.expire(key, current_ttl + extra_seconds)
|
|
else:
|
|
r.expire(key, extra_seconds)
|
|
|
|
def refresh_session_ttl(user_id: str, session_id: str):
|
|
keys = [
|
|
f"chatSession:{user_id}:{session_id}",
|
|
f"chatHistory:{user_id}:{session_id}",
|
|
f"chatSessionsIndex:{user_id}"
|
|
]
|
|
for key in keys:
|
|
if r.exists(key):
|
|
r.expire(key, DEFAULT_TTL_SECONDS)
|
|
|
|
# -------------------------
|
|
# Internal helpers
|
|
# -------------------------
|
|
|
|
def _update_session_stats(user_id: str, session_id: str, reset: bool = False):
|
|
meta = get_session_meta(user_id, session_id)
|
|
if not meta:
|
|
return
|
|
if reset:
|
|
meta["message_count"] = 0
|
|
meta["history_size_bytes"] = 0
|
|
else:
|
|
key = f"chatHistory:{user_id}:{session_id}"
|
|
messages = r.lrange(key, 0, -1)
|
|
meta["message_count"] = len(messages)
|
|
meta["history_size_bytes"] = sum(len(m.encode("utf-8")) for m in messages)
|
|
r.set(f"chatSession:{user_id}:{session_id}", json.dumps(meta), ex=DEFAULT_TTL_SECONDS)
|
|
|