# from fastapi import APIRouter, HTTPException # import json # from pydantic import BaseModel # from typing import Optional, List, Dict # import redis, os, uuid, json # from fastapi import HTTPException # from datetime import datetime # from dotenv import load_dotenv # from fastapi import APIRouter # import dotenv # dotenv.load_dotenv() # load_dotenv() # Redis_url = os.getenv("REDIS_URL") # class ChatRequest(BaseModel): # user_login_id: str # session_id: Optional[str] = None # query: str # org_id: Optional[str] = None # metadata: Optional[Dict] = None # class ChatResponse(BaseModel): # session_id: str # user_message: str # assistant_response: str # is_new_session: bool # session_title: str # timestamp: str # class MessageResponse(BaseModel): # message_id: str # role: str # content: str # timestamp: str # class ChatHistoryResponse(BaseModel): # session_id: str # title: str # created_at: str # message_count: int # messages: List[MessageResponse] # redis_session_route_new = APIRouter(prefix="/main_chatbot", tags=["Redis_session_main_chatbot"]) # def get_redis_client(): # try: # REDIS_URL = os.getenv("REDIS_URL") # if REDIS_URL: # return redis.from_url(REDIS_URL, decode_responses=True) # return redis.StrictRedis( # host=os.getenv("REDIS_HOST", "localhost"), # port=int(os.getenv("REDIS_PORT", 6379)), # password=os.getenv("REDIS_PASSWORD"), # decode_responses=True # ) # except Exception as e: # raise HTTPException(status_code=500, detail=f"Redis connection error: {e}") # redis_client = get_redis_client() # def create_session(user_login_id, org_id=None, metadata=None): # session_id = str(uuid.uuid4()) # data = { # "session_id": session_id, # "user_login_id": user_login_id, # "org_id": org_id, # "created_at": datetime.now().isoformat(), # "title": "New Chat", # "message_count": 0, # "metadata": metadata or {} # } # redis_client.setex(f"session:{user_login_id}:{session_id}", 86400, json.dumps(data)) # redis_client.setex(f"messages:{user_login_id}:{session_id}", 86400, json.dumps([])) # return data # def get_session(user_login_id, session_id): # key = f"session:{user_login_id}:{session_id}" # data = redis_client.get(key) # if not data: # raise HTTPException(status_code=404, detail="Session not found") # return json.loads(data) # def add_message(user_login_id, session_id, role, content): # msg = { # "message_id": str(uuid.uuid4()), # "role": role, # "content": content, # "timestamp": datetime.now().isoformat() # } # key = f"messages:{user_login_id}:{session_id}" # messages = json.loads(redis_client.get(key) or "[]") # messages.append(msg) # redis_client.setex(key, 86400, json.dumps(messages)) # session = get_session(user_login_id, session_id) # session["message_count"] = len(messages) # redis_client.setex(f"session:{user_login_id}:{session_id}", 86400, json.dumps(session)) # return msg["message_id"] # def get_message_history(user_login_id, session_id, limit=None): # key = f"messages:{user_login_id}:{session_id}" # data = redis_client.get(key) # if not data: # return [] # messages = json.loads(data) # return messages[-limit:] if limit else messages # def generate_session_title(user_login_id, session_id): # messages = get_message_history(user_login_id, session_id) # first = next((m["content"] for m in messages if m["role"] == "user"), "") # title = " ".join(first.split()[:6]) + ("..." if len(first.split()) > 6 else "") # session = get_session(user_login_id, session_id) # session["title"] = title or "New Chat" # redis_client.setex(f"session:{user_login_id}:{session_id}", 86400, json.dumps(session)) # return title or "New Chat" # def update_session_title_if_needed(user_login_id, session_id, is_new): # session = get_session(user_login_id, session_id) # if is_new or session.get("title") == "New Chat": # generate_session_title(user_login_id, session_id) # def format_conversation_context(messages, max_messages=5): # if not messages: # return "" # return "\n".join(f"{m['role']}: {m['content']}" for m in messages[-max_messages:]) # @redis_session_route_new.get("/history/{user_login_id}/{session_id}", response_model=ChatHistoryResponse) # async def get_chat_history(user_login_id: str, session_id: str): # session_data = get_session(user_login_id, session_id) # messages = get_message_history(user_login_id, session_id) # message_responses = [ # MessageResponse(**msg) for msg in messages # ] # return ChatHistoryResponse( # session_id=session_id, # title=session_data.get("title", "New Chat"), # created_at=session_data.get("created_at"), # message_count=len(messages), # messages=message_responses # ) # @redis_session_route_new.get("/sessions/{user_login_id}") # async def get_user_sessions(user_login_id: str): # sessions = [] # pattern = f"session:{user_login_id}:*" # for key in redis_client.scan_iter(match=pattern): # session_data = redis_client.get(key) # if session_data: # sessions.append(json.loads(session_data)) # sessions.sort(key=lambda x: x.get("created_at", ""), reverse=True) # return { # "user_login_id": user_login_id, # "total_sessions": len(sessions), # "sessions": sessions # } # @redis_session_route_new.delete("/sessions/{user_login_id}/{session_id}") # async def delete_session(user_login_id: str, session_id: str): # get_session(user_login_id, session_id) # Validate existence # redis_client.delete(f"session:{user_login_id}:{session_id}") # redis_client.delete(f"messages:{user_login_id}:{session_id}") # return {"message": "Session deleted", "session_id": session_id} # @redis_session_route_new.get("/health") # async def health(): # try: # redis_client.ping() # status = "connected" # total_sessions = len(list(redis_client.scan_iter(match="session:*"))) # except: # status = "disconnected" # total_sessions = 0 # return { # "status": "ok", # "redis_status": status, # "total_sessions": total_sessions, # "session_ttl": "24 hours" # } # @redis_session_route_new.put("/sessions/{user_login_id}/{session_id}/title") # async def update_session_title(user_login_id: str, session_id: str, title: str): # key = f"session:{user_login_id}:{session_id}" # session_data = redis_client.get(key) # if not session_data: # raise HTTPException(status_code=404, detail="Session not found") # session = json.loads(session_data) # session["title"] = title # redis_client.setex(key, 86400, json.dumps(session)) # return {"message": "Title updated"} from fastapi import APIRouter, HTTPException import json from pydantic import BaseModel from typing import Optional, List, Dict, Tuple import redis, os, uuid from datetime import datetime from dotenv import load_dotenv load_dotenv() Redis_url = os.getenv("REDIS_URL") class ChatRequest(BaseModel): user_login_id: str session_id: Optional[str] = None query: str org_id: Optional[str] = None metadata: Optional[Dict] = None class ChatResponse(BaseModel): session_id: str user_message: str assistant_response: str is_new_session: bool session_title: str timestamp: str class MessageResponse(BaseModel): message_id: str role: str content: str timestamp: str convo_id: str class ChatHistoryResponse(BaseModel): session_id: str title: str created_at: str message_count: int messages: List[MessageResponse] redis_session_route_new = APIRouter(prefix="/main_chatbot", tags=["Redis_session_main_chatbot"]) def get_redis_client(): try: REDIS_URL = os.getenv("REDIS_URL") if REDIS_URL: return redis.from_url(REDIS_URL, decode_responses=True) return redis.StrictRedis( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", 6379)), password=os.getenv("REDIS_PASSWORD"), decode_responses=True ) except Exception as e: raise HTTPException(status_code=500, detail=f"Redis connection error: {e}") redis_client = get_redis_client() def create_session(user_login_id, org_id=None, metadata=None): session_id = str(uuid.uuid4()) data = { "session_id": session_id, "user_login_id": user_login_id, "org_id": org_id, "created_at": datetime.now().isoformat(), "title": "New Chat", "message_count": 0, "metadata": metadata or {} } redis_client.setex(f"session:{user_login_id}:{session_id}", 86400, json.dumps(data)) redis_client.setex(f"messages:{user_login_id}:{session_id}", 86400, json.dumps([])) return data def get_session(user_login_id, session_id): key = f"session:{user_login_id}:{session_id}" data = redis_client.get(key) if not data: raise HTTPException(status_code=404, detail="Session not found") return json.loads(data) def generate_convo_id(suffix: str = "ss") -> str: """ ✅ Generate a new conversation ID with the given suffix. Args: suffix: "ss" for multiagent, "dd" for data_analyst Returns: convo_id: UUID with suffix (e.g., "ee87ae15-44f0-4c73-a98f-cba756efbdss") """ base_uuid = str(uuid.uuid4()) convo_id = base_uuid[:-2] + suffix print(f"🆔 Generated convo_id: {convo_id}") return convo_id def add_message(user_login_id, session_id, role, content, convo_id=None, suffix="ss") -> Tuple[str, str]: """ ✅ Add a message to Redis and generate/reuse convo_id. Logic: - For USER messages: Generate NEW convo_id if not provided - For ASSISTANT messages: Reuse the convo_id from last user message Args: user_login_id: User's login ID session_id: Current session ID role: 'user' or 'assistant' content: Message content convo_id: Optional - if provided, use it; if None, generate for user or fetch for assistant suffix: "ss" for multiagent, "dd" for data_analyst Returns: Tuple[message_id, convo_id]: Both IDs for tracking """ # ✅ LOGIC: Generate or fetch convo_id based on role if role == "user": # Generate NEW convo_id for user message if convo_id is None: convo_id = generate_convo_id(suffix) print(f"👤 User message - using convo_id: {convo_id}") elif role == "assistant": # Fetch convo_id from the last user message if convo_id is None: messages = get_message_history(user_login_id, session_id) # Find the last user message for msg in reversed(messages): if msg["role"] == "user": convo_id = msg.get("convo_id") break # Fallback: if no user message found, generate new if not convo_id: convo_id = generate_convo_id(suffix) print(f"🤖 Assistant message - reusing convo_id: {convo_id}") # Create message msg = { "message_id": str(uuid.uuid4()), "role": role, "content": content, "timestamp": datetime.now().isoformat(), "convo_id": convo_id } # Store in Redis key = f"messages:{user_login_id}:{session_id}" messages = json.loads(redis_client.get(key) or "[]") messages.append(msg) redis_client.setex(key, 86400, json.dumps(messages)) # Update session message count session = get_session(user_login_id, session_id) session["message_count"] = len(messages) redis_client.setex(f"session:{user_login_id}:{session_id}", 86400, json.dumps(session)) print(f"✅ Stored {role} message - message_id: {msg['message_id']}, convo_id: {convo_id}") return msg["message_id"], convo_id def get_message_history(user_login_id, session_id, limit=None): key = f"messages:{user_login_id}:{session_id}" data = redis_client.get(key) if not data: return [] messages = json.loads(data) return messages[-limit:] if limit else messages def generate_session_title(user_login_id, session_id): messages = get_message_history(user_login_id, session_id) first = next((m["content"] for m in messages if m["role"] == "user"), "") title = " ".join(first.split()[:6]) + ("..." if len(first.split()) > 6 else "") session = get_session(user_login_id, session_id) session["title"] = title or "New Chat" redis_client.setex(f"session:{user_login_id}:{session_id}", 86400, json.dumps(session)) return title or "New Chat" def update_session_title_if_needed(user_login_id, session_id, is_new): session = get_session(user_login_id, session_id) if is_new or session.get("title") == "New Chat": generate_session_title(user_login_id, session_id) def format_conversation_context(messages, max_messages=5): if not messages: return "" return "\n".join(f"{m['role']}: {m['content']}" for m in messages[-max_messages:]) @redis_session_route_new.get("/history/{user_login_id}/{session_id}", response_model=ChatHistoryResponse) async def get_chat_history(user_login_id: str, session_id: str): session_data = get_session(user_login_id, session_id) messages = get_message_history(user_login_id, session_id) message_responses = [ MessageResponse(**msg) for msg in messages ] return ChatHistoryResponse( session_id=session_id, title=session_data.get("title", "New Chat"), created_at=session_data.get("created_at"), message_count=len(messages), messages=message_responses ) @redis_session_route_new.get("/sessions/{user_login_id}") async def get_user_sessions(user_login_id: str): sessions = [] pattern = f"session:{user_login_id}:*" for key in redis_client.scan_iter(match=pattern): session_data = redis_client.get(key) if session_data: sessions.append(json.loads(session_data)) sessions.sort(key=lambda x: x.get("created_at", ""), reverse=True) return { "user_login_id": user_login_id, "total_sessions": len(sessions), "sessions": sessions } @redis_session_route_new.delete("/sessions/{user_login_id}/{session_id}") async def delete_session(user_login_id: str, session_id: str): get_session(user_login_id, session_id) redis_client.delete(f"session:{user_login_id}:{session_id}") redis_client.delete(f"messages:{user_login_id}:{session_id}") return {"message": "Session deleted", "session_id": session_id} @redis_session_route_new.get("/health") async def health(): try: redis_client.ping() status = "connected" total_sessions = len(list(redis_client.scan_iter(match="session:*"))) except: status = "disconnected" total_sessions = 0 return { "status": "ok", "redis_status": status, "total_sessions": total_sessions, "session_ttl": "24 hours" } @redis_session_route_new.put("/sessions/{user_login_id}/{session_id}/title") async def update_session_title(user_login_id: str, session_id: str, title: str): key = f"session:{user_login_id}:{session_id}" session_data = redis_client.get(key) if not session_data: raise HTTPException(status_code=404, detail="Session not found") session = json.loads(session_data) session["title"] = title redis_client.setex(key, 86400, json.dumps(session)) return {"message": "Title updated"}