from fastapi import FastAPI, HTTPException, Query as QueryParam, Request from fastapi.responses import JSONResponse from pydantic import BaseModel, Field from typing import Optional, List, Dict, Any import os import json import redis import uuid from datetime import datetime from dotenv import load_dotenv from fastapi import APIRouter load_dotenv() # Redis_session_router_old = FastAPI(title="Redis Session Management API") Redis_session_router_old = APIRouter(prefix="/Redis", tags=["Redis_Management_old"]) # ==================== CONFIGURATION ==================== # Redis Configuration REDIS_URL = os.getenv("REDIS_URL") REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1") REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) REDIS_PASSWORD = os.getenv("REDIS_PASSWORD") # ==================== REDIS CLIENT INITIALIZATION ==================== def get_redis_client(): """Initialize Redis client with fallback to local Redis""" try: if REDIS_URL: # Use deployed Redis URL redis_client = redis.from_url( REDIS_URL, decode_responses=True, socket_connect_timeout=5, socket_timeout=5 ) # Test connection redis_client.ping() print(f"āœ… Connected to deployed Redis: {REDIS_URL}") return redis_client else: # Use local Redis redis_client = redis.StrictRedis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=True, socket_connect_timeout=5, socket_timeout=5 ) # Test connection redis_client.ping() print(f"āœ… Connected to local Redis: {REDIS_HOST}:{REDIS_PORT}") return redis_client except Exception as e: print(f"āŒ Redis connection failed: {e}") raise HTTPException(status_code=500, detail=f"Redis connection failed: {str(e)}") # Initialize Redis client # redis_client = get_redis_client() # ==================== PYDANTIC MODELS ==================== class SessionResponse(BaseModel): session_id: str userLoginId: int orgId: int created_at: str status: str title: Optional[str] = "New Chat" class MessageResponse(BaseModel): message_id: str session_id: str role: str # "user" or "assistant" message: str timestamp: str class ChatHistoryResponse(BaseModel): session_id: str messages: List[MessageResponse] total_messages: int class UpdateSessionTitleRequest(BaseModel): new_title: str class CreateSessionRequest(BaseModel): userLoginId: int orgId: int auth_token: str class AddMessageRequest(BaseModel): session_id: str role: str message: str # ==================== SESSION MANAGEMENT FUNCTIONS ==================== def create_session(userLoginId: int, orgId: int, auth_token: str) -> dict: """Create a new chat session""" session_id = str(uuid.uuid4()) session_data = { "session_id": session_id, "userLoginId": userLoginId, "orgId": orgId, "auth_token": auth_token, "created_at": datetime.now().isoformat(), "status": "active", "title": "New Chat" } # Store session in Redis with 24 hour TTL redis_client.setex( f"session:{session_id}", 86400, # 24 hours json.dumps(session_data) ) # Initialize empty chat history redis_client.setex( f"chat:{session_id}", 86400, # 24 hours json.dumps([]) ) # Initialize conversation memory redis_client.setex( f"memory:{session_id}", 86400, # 24 hours json.dumps([]) ) return session_data def get_session(session_id: str) -> dict: """Get session data from Redis""" session_data = redis_client.get(f"session:{session_id}") if not session_data: raise HTTPException(status_code=404, detail="Session not found or expired") return json.loads(session_data) def add_message_to_session(session_id: str, role: str, message: str) -> str: """Add message to session chat history""" message_id = str(uuid.uuid4()) message_data = { "message_id": message_id, "session_id": session_id, "role": role, "message": message, "timestamp": datetime.now().isoformat() } # Get current chat history chat_history = redis_client.get(f"chat:{session_id}") if chat_history: messages = json.loads(chat_history) else: messages = [] # Add new message messages.Redis_session_routerend(message_data) # Update chat history in Redis with extended TTL redis_client.setex( f"chat:{session_id}", 86400, # 24 hours json.dumps(messages) ) return message_id def get_session_memory(session_id: str) -> List[Dict]: """Get conversation memory for session""" memory_data = redis_client.get(f"memory:{session_id}") if memory_data: return json.loads(memory_data) return [] def update_session_memory(session_id: str, messages: List[Dict]): """Update conversation memory for session""" redis_client.setex( f"memory:{session_id}", 86400, # 24 hours json.dumps(messages) ) def generate_session_title(session_id: str) -> str: """Generate a title for the session based on chat history""" try: # Check session session_data = redis_client.get(f"session:{session_id}") if session_data: session = json.loads(session_data) if "user_title" in session: # Don't override user-defined titles return session["user_title"] # Get chat history chat_data = redis_client.get(f"chat:{session_id}") if not chat_data: return "New Chat" messages = json.loads(chat_data) if not messages: return "New Chat" # Get first user message first_user_message = next( (msg["message"] for msg in messages if msg["role"] == "user"), None ) if not first_user_message: return "New Chat" # Create a simple title from first message words = first_user_message.split()[:6] title = " ".join(words) + ("..." if len(first_user_message.split()) > 6 else "") # Save to session if session_data: session["generated_title"] = title if not session.get("user_title"): session["title"] = title redis_client.setex(f"session:{session_id}", 86400, json.dumps(session)) return title except Exception as e: print(f"Error in generate_session_title: {e}") return "New Chat" def update_session_title(session_id: str): """Update session title after first message""" try: # Get session data session_data = redis_client.get(f"session:{session_id}") if not session_data: return session = json.loads(session_data) # Only update if current title is "New Chat" if session.get("title", "New Chat") == "New Chat": new_title = generate_session_title(session_id) session["title"] = new_title # Update session in Redis redis_client.setex( f"session:{session_id}", 86400, # 24 hours json.dumps(session) ) except Exception as e: print(f"Error updating session title: {e}") pass def get_user_sessions(userLoginId: int) -> List[dict]: """Get all sessions for a user with generated titles""" sessions = [] # Scan for all session keys for key in redis_client.scan_iter(match="session:*"): session_data = redis_client.get(key) if session_data: session = json.loads(session_data) if session["userLoginId"] == userLoginId: # Generate title based on chat history session["title"] = generate_session_title(session["session_id"]) sessions.Redis_session_routerend(session) # Sort sessions by created_at (most recent first) sessions.sort(key=lambda x: x["created_at"], reverse=True) return sessions def get_user_sessions(userLoginId: int) -> List[dict]: """Get all sessions for a user with generated titles""" sessions = [] # Scan for all session keys for key in redis_client.scan_iter(match="session:*"): session_data = redis_client.get(key) if session_data: session = json.loads(session_data) if session["userLoginId"] == userLoginId: # Generate title based on chat history session["title"] = generate_session_title(session["session_id"]) sessions.append(session) # Sort sessions by created_at (most recent first) sessions.sort(key=lambda x: x["created_at"], reverse=True) return sessions def delete_session(session_id: str): """Delete session and associated data""" # Delete session data redis_client.delete(f"session:{session_id}") # Delete chat history redis_client.delete(f"chat:{session_id}") # Delete memory redis_client.delete(f"memory:{session_id}") # ==================== MIDDLEWARE ==================== # ==================== API ENDPOINTS ==================== @Redis_session_router_old.post("/sessions", response_model=SessionResponse) def create_new_session(request: CreateSessionRequest): """Create a new chat session""" try: session_data = create_session(request.userLoginId, request.orgId, request.auth_token) return SessionResponse(**session_data) except Exception as e: raise HTTPException(status_code=500, detail=f"Error creating session: {str(e)}") # @Redis_session_router_old.get("/sessions") # def list_user_sessions(userLoginId: int): # """List all sessions for a user""" # try: # sessions = get_user_sessions(userLoginId) # print(sessions) # return { # "userLoginId": userLoginId, # "total_sessions": len(sessions), # "sessions": sessions # } # except Exception as e: # raise HTTPException(status_code=500, detail=f"Error fetching sessions: {str(e)}") @Redis_session_router_old.get("/sessions") def list_user_sessions(userLoginId: int): """List all sessions for a user""" try: sessions = get_user_sessions(userLoginId) return { "userLoginId": userLoginId, "total_sessions": len(sessions), "sessions": sessions } except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching sessions: {str(e)}") @Redis_session_router_old.get("/sessions/{session_id}") def get_session_details(session_id: str): """Get details of a specific session""" try: session_data = get_session(session_id) return session_data except Exception as e: raise HTTPException(status_code=404, detail=f"Session not found: {str(e)}") @Redis_session_router_old.delete("/sessions/{session_id}") def delete_user_session(session_id: str): """Delete/close a session""" try: # Verify session exists get_session(session_id) # Delete session delete_session(session_id) return { "message": f"Session {session_id} deleted successfully", "session_id": session_id } except Exception as e: raise HTTPException(status_code=500, detail=f"Error deleting session: {str(e)}") @Redis_session_router_old.get("/sessions/{session_id}/history", response_model=ChatHistoryResponse) def get_session_history( session_id: str, n: int = QueryParam(50, description="Number of recent messages to return") ): """Get chat history for a session""" try: # Verify session exists get_session(session_id) # Get chat history chat_data = redis_client.get(f"chat:{session_id}") if not chat_data: return ChatHistoryResponse( session_id=session_id, messages=[], total_messages=0 ) messages = json.loads(chat_data) # Get the last n messages (or all if less than n) recent_messages = messages[-n:] if len(messages) > n else messages # Convert to MessageResponse objects message_responses = [MessageResponse(**msg) for msg in recent_messages] return ChatHistoryResponse( session_id=session_id, messages=message_responses, total_messages=len(messages) ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching chat history: {str(e)}") @Redis_session_router_old.post("/sessions/{session_id}/messages") def add_message(session_id: str, request: AddMessageRequest): """Add a message to a session""" try: # Verify session exists get_session(session_id) # Validate role if request.role not in ["user", "assistant"]: raise HTTPException(status_code=400, detail="Role must be 'user' or 'assistant'") # Add message message_id = add_message_to_session(session_id, request.role, request.message) # Update title if it's the first user message if request.role == "user": update_session_title(session_id) return { "message_id": message_id, "session_id": session_id, "role": request.role, "message": request.message, "timestamp": datetime.now().isoformat() } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Error adding message: {str(e)}") @Redis_session_router_old.put("/sessions/{session_id}/title") def update_session_title_endpoint(session_id: str, request: UpdateSessionTitleRequest): """Update the user-defined title of an existing session""" try: session_data = redis_client.get(f"session:{session_id}") if not session_data: raise HTTPException(status_code=404, detail="Session not found or expired") session = json.loads(session_data) new_title = request.new_title.strip() if not new_title: raise HTTPException(status_code=400, detail="New title cannot be empty") if len(new_title) > 100: raise HTTPException(status_code=400, detail="Title cannot exceed 100 characters") old_title = session.get("title", "New Chat") session["user_title"] = new_title session["title"] = new_title session["last_updated"] = datetime.now().isoformat() redis_client.setex(f"session:{session_id}", 86400, json.dumps(session)) return { "message": "Session title updated successfully", "session_id": session_id, "old_title": old_title, "new_title": new_title } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}") @Redis_session_router_old.put("/sessions/{session_id}/refresh-title") def refresh_session_title(session_id: str): """Manually refresh/regenerate session title""" try: # Verify session exists session_data = get_session(session_id) # Generate new title new_title = generate_session_title(session_id) # Update session session_data["title"] = new_title redis_client.setex( f"session:{session_id}", 86400, # 24 hours json.dumps(session_data) ) return { "session_id": session_id, "new_title": new_title, "message": "Session title updated successfully" } except Exception as e: raise HTTPException(status_code=500, detail=f"Error updating session title: {str(e)}") @Redis_session_router_old.get("/sessions/{session_id}/memory") def get_session_memory_endpoint(session_id: str): """Get conversation memory for a session""" try: # Verify session exists get_session(session_id) memory = get_session_memory(session_id) return { "session_id": session_id, "memory": memory, "total_items": len(memory) } except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching memory: {str(e)}") # ==================== SYSTEM ENDPOINTS ==================== @Redis_session_router_old.get("/redis-info") def redis_info(): """Get Redis connection information""" try: info = redis_client.info() return { "redis_connected": True, "redis_version": info.get("redis_version"), "used_memory": info.get("used_memory_human"), "connected_clients": info.get("connected_clients"), "total_keys": redis_client.dbsize() } except Exception as e: return { "redis_connected": False, "error": str(e) } @Redis_session_router_old.get("/health") def health(): """System health check""" try: redis_client.ping() redis_status = "connected" except: redis_status = "disconnected" total_sessions = 0 if redis_status == "connected": try: total_sessions = len(list(redis_client.scan_iter(match="session:*"))) except: pass return { "status": "ok", "redis_status": redis_status, "session_management": "enabled", "total_sessions": total_sessions, "ttl": "24 hours" } @Redis_session_router_old.get("/") def root(): """Root endpoint with API information""" return { "service": "Redis Session Management API", "version": "1.0.0", "endpoints": { "sessions": { "POST /sessions": "Create new session", "GET /sessions?userLoginId={id}": "List user sessions", "GET /sessions/{id}": "Get session details", "DELETE /sessions/{id}": "Delete session", "GET /sessions/{id}/history": "Get chat history", "POST /sessions/{id}/messages": "Add message to session", "PUT /sessions/{id}/title": "Update session title", "PUT /sessions/{id}/refresh-title": "Refresh session title", "GET /sessions/{id}/memory": "Get session memory" }, "system": { "GET /health": "Health check", "GET /redis-info": "Redis connection info" } } } # ==================== RUN SERVER ==================== # if __name__ == "__main__": # import uvicorn # try: # uvicorn.run(Redis_session_router_old, host="0.0.0.0", port=8000) # except KeyboardInterrupt: # print("\nšŸ›‘ Server stopped gracefully") # except Exception as e: # print(f"āŒ Server error: {e}")