# # import os # # import asyncio # # import dotenv # # from fastapi import FastAPI, HTTPException # # from fastapi.responses import JSONResponse # # import redis.asyncio as aioredis # # from agents import Agent, Runner # # from agents.extensions.memory import RedisSession # # # from transformers import pipeline # # from fastapi import APIRouter # # dotenv.load_dotenv() # # Redis_url = os.getenv("REDIS_URL") # # Redis_session_router = APIRouter(prefix="/Redis", tags=["Redis_Agent_Memory_Management"]) # # # ============ Transformers ============ # # # # try: # # # print("🚀 Loading summarization model (BART)...") # # # summarizer = pipeline("summarization", model="facebook/bart-large-cnn") # # # print("✅ Model ready!") # # # except Exception as e: # # # print(f"⚠️ Error loading model: {e}") # # # summarizer = None # # # ---------- Core Utility Functions ---------- # # # async def get_sessions(user_login_id: str): # # redis = await aioredis.from_url(Redis_url) # # pattern = f"{user_login_id}:*" # # keys = await redis.keys(pattern) # # sessions = [key.decode().replace(f"{user_login_id}:", "") for key in keys] # # await redis.close() # # return sessions # # async def get_session_history(user_login_id: str, session_id: str): # # """ # # Retrieve chat history for a given user's session. # # """ # # try: # # session = RedisSession.from_url( # # session_id, # # url=Redis_url, # # key_prefix=f"{user_login_id}:", # # ) # # if not await session.ping(): # # raise Exception("Redis connection failed") # # items = await session.get_items() # # history = [ # # {"role": msg.get("role", "unknown"), "content": msg.get("content", "")} # # for msg in items # # ] # # await session.close() # # return history # # except Exception as e: # # return {"error": str(e)} # # async def delete_session(user_login_id: str, session_id: str): # # session = RedisSession.from_url(session_id, url=Redis_url, key_prefix=f"{user_login_id}:") # # if not await session.ping(): # # raise HTTPException(status_code=500, detail="Redis connection failed") # # await session.clear_session() # # await session.close() # # return {"status": "success", "message": f"Session {session_id} deleted"} # # # ---------- API Endpoints ---------- # # # @Redis_session_router.get("/sessions/{user_login_id}") # # async def api_get_sessions(user_login_id: str): # # """List all sessions for a given user.""" # # try: # # sessions = await get_sessions(user_login_id) # # return {"user": user_login_id, "sessions": sessions, "count": len(sessions)} # # except Exception as e: # # raise HTTPException(status_code=500, detail=str(e)) # # @Redis_session_router.get("/sessions/{user_login_id}/{session_id}") # # async def api_get_session_history(user_login_id: str, session_id: str): # # """Get the chat history of a session. Optionally summarize with ?summarize=true.""" # # try: # # history = await get_session_history(user_login_id, session_id) # # if not history: # # raise HTTPException(status_code=404, detail="Session history not found") # # result = {"user": user_login_id, "session_id": session_id, "history": history} # # text = " ".join([msg["content"] for msg in history if msg["role"] == "user"]) # # summary = summarizer(text, max_length=130, min_length=30, do_sample=False)[0]["summary_text"] # # result["summary"] = summary # # return result # # except Exception as e: # # raise HTTPException(status_code=500, detail=str(e)) # # @Redis_session_router.delete("/sessions/{user_login_id}/{session_id}") # # async def api_delete_session(user_login_id: str, session_id: str): # # """Delete a session.""" # # try: # # result = await delete_session(user_login_id, session_id) # # return JSONResponse(content=result) # # except Exception as e: # # raise HTTPException(status_code=500, detail=str(e)) # # # ---------- Demo route (optional) ---------- # # # @Redis_session_router.get("/") # # async def root(): # # return {"message": "Redis Session Manager API running 🧠"} # # ---------- Local testing ---------- # # # if __name__ == "__main__": # # import uvicorn # # uvicorn.run("main:Redis_session_router", host="0.0.0.0", port=8000, reload=True) # import os # import asyncio # import dotenv # from fastapi import FastAPI, HTTPException, Query # from fastapi.responses import JSONResponse # import redis.asyncio as aioredis # from agents import Agent, Runner # from agents.extensions.memory import RedisSession # # from transformers import pipeline # from fastapi import APIRouter # dotenv.load_dotenv() # Redis_url = os.getenv("REDIS_URL") # print(Redis_url) # # Create the main FastAPI app # app = FastAPI(title="Redis Session Manager API") # Redis_session_router = APIRouter(prefix="/Redis", tags=["Redis_Agent_Memory_Management"]) # # ============ Transformers ============ # # # ---------- Core Utility Functions ---------- # # async def get_sessions(user_login_id: str): # redis = await aioredis.from_url(Redis_url) # pattern = f"{user_login_id}:*" # keys = await redis.keys(pattern) # sessions = [key.decode().replace(f"{user_login_id}:", "") for key in keys] # await redis.close() # return sessions # async def get_session_history(user_login_id: str, session_id: str): # """ # Retrieve chat history for a given user's session. # """ # try: # session = RedisSession.from_url( # session_id, # url=Redis_url, # key_prefix=f"{user_login_id}:", # ) # if not await session.ping(): # raise Exception("Redis connection failed") # items = await session.get_items() # history = [ # {"role": msg.get("role", "unknown"), "content": msg.get("content", "")} # for msg in items # ] # await session.close() # return history # except Exception as e: # return {"error": str(e)} # async def delete_session(user_login_id: str, session_id: str): # session = RedisSession.from_url(session_id, url=Redis_url, key_prefix=f"{user_login_id}:") # if not await session.ping(): # raise HTTPException(status_code=500, detail="Redis connection failed") # await session.clear_session() # await session.close() # return {"status": "success", "message": f"Session {session_id} deleted"} # # ---------- API Endpoints ---------- # # @Redis_session_router.get("/sessions/{user_login_id}") # async def api_get_sessions(user_login_id: str): # """List all sessions for a given user.""" # try: # sessions = await get_sessions(user_login_id) # return {"user": user_login_id, "sessions": sessions, "count": len(sessions)} # except Exception as e: # raise HTTPException(status_code=500, detail=str(e)) # @Redis_session_router.get("/sessions/{user_login_id}/{session_id}") # async def api_get_session_history(user_login_id: str, session_id: str, summarize: bool = Query(False)): # """Get the chat history of a session. Optionally summarize with ?summarize=true.""" # try: # history = await get_session_history(user_login_id, session_id) # if isinstance(history, dict) and "error" in history: # raise HTTPException(status_code=500, detail=history["error"]) # if not history: # raise HTTPException(status_code=404, detail="Session history not found") # result = {"user": user_login_id, "session_id": session_id, "history": history} # return result # except Exception as e: # raise HTTPException(status_code=500, detail=str(e)) # @Redis_session_router.delete("/sessions/{user_login_id}/{session_id}") # async def api_delete_session(user_login_id: str, session_id: str): # """Delete a session.""" # try: # result = await delete_session(user_login_id, session_id) # return JSONResponse(content=result) # except Exception as e: # raise HTTPException(status_code=500, detail=str(e)) # # ---------- Demo route (optional) ---------- # # @Redis_session_router.get("/") # async def root(): # return {"message": "Redis Session Manager API running 🧠"} # # Mount the router to the app # app.include_router(Redis_session_router) # # ---------- Local testing ---------- # # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="0.0.0.0", port=8000, reload=True) # -------------------------------------------------------------- # Redis Session Manager – uses REDIS_HOST / REDIS_PORT # -------------------------------------------------------------- import os import asyncio import dotenv from fastapi import FastAPI, HTTPException, Query from fastapi.responses import JSONResponse import redis.asyncio as aioredis from agents import Agent, Runner from agents.extensions.memory import RedisSession from fastapi import APIRouter from retrieve_secret import * # ------------------------------------------------------------------ # Load environment variables (create a .env file with the keys) # ------------------------------------------------------------------ dotenv.load_dotenv() REDIS_HOST = os.getenv("REDIS_HOST") # REDIS_HOST = "localhost" REDIS_PORT = 6380 # REDIS_PORT = int(os.getenv("REDIS_PORT")) REDIS_PASSWORD = os.getenv("REDIS_PASSWORD") REDIS_DB = int(os.getenv("REDIS_DB", "0")) # DEBUG: Print to confirm values print("Redis Config:") print(f" Host: {REDIS_HOST}") print(f" Port: {REDIS_PORT}") print(f" Password: {'***' if REDIS_PASSWORD else None}") print(f" DB: {REDIS_DB}") # optional # Build the async-redis client lazily inside each helper # ------------------------------------------------------------------ app = FastAPI(title="Redis Session Manager API") Redis_session_router = APIRouter(prefix="/Redis", tags=["Redis_Agent_Memory_Management"]) # ---------- Core Utility Functions ---------- async def _redis_client(): """Return a fresh aioredis client using host/port.""" return await aioredis.from_url( f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}", password=REDIS_PASSWORD, decode_responses=True, ) async def get_sessions(user_login_id: str): redis = await _redis_client() pattern = f"{user_login_id}:*" keys = await redis.keys(pattern) sessions = [key.replace(f"{user_login_id}:", "") for key in keys] await redis.close() return sessions async def get_session_history(user_login_id: str, session_id: str): """ Retrieve chat history for a given user's session. """ try: session = RedisSession.from_url( session_id, url=f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}", password=REDIS_PASSWORD, key_prefix=f"{user_login_id}:", ) if not await session.ping(): raise Exception("Redis connection failed") items = await session.get_items() history = [ {"role": msg.get("role", "unknown"), "content": msg.get("content", "")} for msg in items ] await session.close() return history except Exception as e: return {"error": str(e)} async def delete_session(user_login_id: str, session_id: str): session = RedisSession.from_url( session_id, url=f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}", password=REDIS_PASSWORD, key_prefix=f"{user_login_id}:", ) if not await session.ping(): raise HTTPException(status_code=500, detail="Redis connection failed") await session.clear_session() await session.close() return {"status": "success", "message": f"Session {session_id} deleted"} # ---------- API Endpoints ---------- @Redis_session_router.get("/sessions/{user_login_id}") async def api_get_sessions(user_login_id: str): """List all sessions for a given user.""" try: sessions = await get_sessions(user_login_id) return {"user": user_login_id, "sessions": sessions, "count": len(sessions)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @Redis_session_router.get("/sessions/{user_login_id}/{session_id}") async def api_get_session_history( user_login_id: str, session_id: str, summarize: bool = Query(False), ): """Get the chat history of a session. Optionally summarize with ?summarize=true.""" try: history = await get_session_history(user_login_id, session_id) if isinstance(history, dict) and "error" in history: raise HTTPException(status_code=500, detail=history["error"]) if not history: raise HTTPException(status_code=404, detail="Session history not found") result = {"user": user_login_id, "session_id": session_id, "history": history} return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @Redis_session_router.delete("/sessions/{user_login_id}/{session_id}") async def api_delete_session(user_login_id: str, session_id: str): """Delete a session.""" try: result = await delete_session(user_login_id, session_id) return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ---------- Demo route ---------- @Redis_session_router.get("/") async def root(): return {"message": "Redis Session Manager API running"} # ------------------------------------------------------------------ # Mount router & run # ------------------------------------------------------------------ app.include_router(Redis_session_router) # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)