mr_mvp_dev / Redis /sessions.py
srivatsavdamaraju's picture
Upload 173 files
b2315b1 verified
# # import os
# # import asyncio
# # import dotenv
# # from fastapi import FastAPI, HTTPException
# # from fastapi.responses import JSONResponse
# # import redis.asyncio as aioredis
# # from agents import Agent, Runner
# # from agents.extensions.memory import RedisSession
# # # from transformers import pipeline
# # from fastapi import APIRouter
# # dotenv.load_dotenv()
# # Redis_url = os.getenv("REDIS_URL")
# # Redis_session_router = APIRouter(prefix="/Redis", tags=["Redis_Agent_Memory_Management"])
# # # ============ Transformers ============ #
# # # try:
# # # print("🚀 Loading summarization model (BART)...")
# # # summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
# # # print("✅ Model ready!")
# # # except Exception as e:
# # # print(f"⚠️ Error loading model: {e}")
# # # summarizer = None
# # # ---------- Core Utility Functions ---------- #
# # async def get_sessions(user_login_id: str):
# # redis = await aioredis.from_url(Redis_url)
# # pattern = f"{user_login_id}:*"
# # keys = await redis.keys(pattern)
# # sessions = [key.decode().replace(f"{user_login_id}:", "") for key in keys]
# # await redis.close()
# # return sessions
# # async def get_session_history(user_login_id: str, session_id: str):
# # """
# # Retrieve chat history for a given user's session.
# # """
# # try:
# # session = RedisSession.from_url(
# # session_id,
# # url=Redis_url,
# # key_prefix=f"{user_login_id}:",
# # )
# # if not await session.ping():
# # raise Exception("Redis connection failed")
# # items = await session.get_items()
# # history = [
# # {"role": msg.get("role", "unknown"), "content": msg.get("content", "")}
# # for msg in items
# # ]
# # await session.close()
# # return history
# # except Exception as e:
# # return {"error": str(e)}
# # async def delete_session(user_login_id: str, session_id: str):
# # session = RedisSession.from_url(session_id, url=Redis_url, key_prefix=f"{user_login_id}:")
# # if not await session.ping():
# # raise HTTPException(status_code=500, detail="Redis connection failed")
# # await session.clear_session()
# # await session.close()
# # return {"status": "success", "message": f"Session {session_id} deleted"}
# # # ---------- API Endpoints ---------- #
# # @Redis_session_router.get("/sessions/{user_login_id}")
# # async def api_get_sessions(user_login_id: str):
# # """List all sessions for a given user."""
# # try:
# # sessions = await get_sessions(user_login_id)
# # return {"user": user_login_id, "sessions": sessions, "count": len(sessions)}
# # except Exception as e:
# # raise HTTPException(status_code=500, detail=str(e))
# # @Redis_session_router.get("/sessions/{user_login_id}/{session_id}")
# # async def api_get_session_history(user_login_id: str, session_id: str):
# # """Get the chat history of a session. Optionally summarize with ?summarize=true."""
# # try:
# # history = await get_session_history(user_login_id, session_id)
# # if not history:
# # raise HTTPException(status_code=404, detail="Session history not found")
# # result = {"user": user_login_id, "session_id": session_id, "history": history}
# # text = " ".join([msg["content"] for msg in history if msg["role"] == "user"])
# # summary = summarizer(text, max_length=130, min_length=30, do_sample=False)[0]["summary_text"]
# # result["summary"] = summary
# # return result
# # except Exception as e:
# # raise HTTPException(status_code=500, detail=str(e))
# # @Redis_session_router.delete("/sessions/{user_login_id}/{session_id}")
# # async def api_delete_session(user_login_id: str, session_id: str):
# # """Delete a session."""
# # try:
# # result = await delete_session(user_login_id, session_id)
# # return JSONResponse(content=result)
# # except Exception as e:
# # raise HTTPException(status_code=500, detail=str(e))
# # # ---------- Demo route (optional) ---------- #
# # @Redis_session_router.get("/")
# # async def root():
# # return {"message": "Redis Session Manager API running 🧠"}
# # ---------- Local testing ---------- #
# # if __name__ == "__main__":
# # import uvicorn
# # uvicorn.run("main:Redis_session_router", host="0.0.0.0", port=8000, reload=True)
# import os
# import asyncio
# import dotenv
# from fastapi import FastAPI, HTTPException, Query
# from fastapi.responses import JSONResponse
# import redis.asyncio as aioredis
# from agents import Agent, Runner
# from agents.extensions.memory import RedisSession
# # from transformers import pipeline
# from fastapi import APIRouter
# dotenv.load_dotenv()
# Redis_url = os.getenv("REDIS_URL")
# print(Redis_url)
# # Create the main FastAPI app
# app = FastAPI(title="Redis Session Manager API")
# Redis_session_router = APIRouter(prefix="/Redis", tags=["Redis_Agent_Memory_Management"])
# # ============ Transformers ============ #
# # ---------- Core Utility Functions ---------- #
# async def get_sessions(user_login_id: str):
# redis = await aioredis.from_url(Redis_url)
# pattern = f"{user_login_id}:*"
# keys = await redis.keys(pattern)
# sessions = [key.decode().replace(f"{user_login_id}:", "") for key in keys]
# await redis.close()
# return sessions
# async def get_session_history(user_login_id: str, session_id: str):
# """
# Retrieve chat history for a given user's session.
# """
# try:
# session = RedisSession.from_url(
# session_id,
# url=Redis_url,
# key_prefix=f"{user_login_id}:",
# )
# if not await session.ping():
# raise Exception("Redis connection failed")
# items = await session.get_items()
# history = [
# {"role": msg.get("role", "unknown"), "content": msg.get("content", "")}
# for msg in items
# ]
# await session.close()
# return history
# except Exception as e:
# return {"error": str(e)}
# async def delete_session(user_login_id: str, session_id: str):
# session = RedisSession.from_url(session_id, url=Redis_url, key_prefix=f"{user_login_id}:")
# if not await session.ping():
# raise HTTPException(status_code=500, detail="Redis connection failed")
# await session.clear_session()
# await session.close()
# return {"status": "success", "message": f"Session {session_id} deleted"}
# # ---------- API Endpoints ---------- #
# @Redis_session_router.get("/sessions/{user_login_id}")
# async def api_get_sessions(user_login_id: str):
# """List all sessions for a given user."""
# try:
# sessions = await get_sessions(user_login_id)
# return {"user": user_login_id, "sessions": sessions, "count": len(sessions)}
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @Redis_session_router.get("/sessions/{user_login_id}/{session_id}")
# async def api_get_session_history(user_login_id: str, session_id: str, summarize: bool = Query(False)):
# """Get the chat history of a session. Optionally summarize with ?summarize=true."""
# try:
# history = await get_session_history(user_login_id, session_id)
# if isinstance(history, dict) and "error" in history:
# raise HTTPException(status_code=500, detail=history["error"])
# if not history:
# raise HTTPException(status_code=404, detail="Session history not found")
# result = {"user": user_login_id, "session_id": session_id, "history": history}
# return result
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# @Redis_session_router.delete("/sessions/{user_login_id}/{session_id}")
# async def api_delete_session(user_login_id: str, session_id: str):
# """Delete a session."""
# try:
# result = await delete_session(user_login_id, session_id)
# return JSONResponse(content=result)
# except Exception as e:
# raise HTTPException(status_code=500, detail=str(e))
# # ---------- Demo route (optional) ---------- #
# @Redis_session_router.get("/")
# async def root():
# return {"message": "Redis Session Manager API running 🧠"}
# # Mount the router to the app
# app.include_router(Redis_session_router)
# # ---------- Local testing ---------- #
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
# --------------------------------------------------------------
# Redis Session Manager – uses REDIS_HOST / REDIS_PORT
# --------------------------------------------------------------
import os
import asyncio
import dotenv
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse
import redis.asyncio as aioredis
from agents import Agent, Runner
from agents.extensions.memory import RedisSession
from fastapi import APIRouter
from retrieve_secret import *
# ------------------------------------------------------------------
# Load environment variables (create a .env file with the keys)
# ------------------------------------------------------------------
dotenv.load_dotenv()
REDIS_HOST = os.getenv("REDIS_HOST")
# REDIS_HOST = "localhost"
REDIS_PORT = 6380
# REDIS_PORT = int(os.getenv("REDIS_PORT"))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
REDIS_DB = int(os.getenv("REDIS_DB", "0"))
# DEBUG: Print to confirm values
print("Redis Config:")
print(f" Host: {REDIS_HOST}")
print(f" Port: {REDIS_PORT}")
print(f" Password: {'***' if REDIS_PASSWORD else None}")
print(f" DB: {REDIS_DB}") # optional
# Build the async-redis client lazily inside each helper
# ------------------------------------------------------------------
app = FastAPI(title="Redis Session Manager API")
Redis_session_router = APIRouter(prefix="/Redis", tags=["Redis_Agent_Memory_Management"])
# ---------- Core Utility Functions ----------
async def _redis_client():
"""Return a fresh aioredis client using host/port."""
return await aioredis.from_url(
f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}",
password=REDIS_PASSWORD,
decode_responses=True,
)
async def get_sessions(user_login_id: str):
redis = await _redis_client()
pattern = f"{user_login_id}:*"
keys = await redis.keys(pattern)
sessions = [key.replace(f"{user_login_id}:", "") for key in keys]
await redis.close()
return sessions
async def get_session_history(user_login_id: str, session_id: str):
"""
Retrieve chat history for a given user's session.
"""
try:
session = RedisSession.from_url(
session_id,
url=f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}",
password=REDIS_PASSWORD,
key_prefix=f"{user_login_id}:",
)
if not await session.ping():
raise Exception("Redis connection failed")
items = await session.get_items()
history = [
{"role": msg.get("role", "unknown"), "content": msg.get("content", "")}
for msg in items
]
await session.close()
return history
except Exception as e:
return {"error": str(e)}
async def delete_session(user_login_id: str, session_id: str):
session = RedisSession.from_url(
session_id,
url=f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}",
password=REDIS_PASSWORD,
key_prefix=f"{user_login_id}:",
)
if not await session.ping():
raise HTTPException(status_code=500, detail="Redis connection failed")
await session.clear_session()
await session.close()
return {"status": "success", "message": f"Session {session_id} deleted"}
# ---------- API Endpoints ----------
@Redis_session_router.get("/sessions/{user_login_id}")
async def api_get_sessions(user_login_id: str):
"""List all sessions for a given user."""
try:
sessions = await get_sessions(user_login_id)
return {"user": user_login_id, "sessions": sessions, "count": len(sessions)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@Redis_session_router.get("/sessions/{user_login_id}/{session_id}")
async def api_get_session_history(
user_login_id: str,
session_id: str,
summarize: bool = Query(False),
):
"""Get the chat history of a session. Optionally summarize with ?summarize=true."""
try:
history = await get_session_history(user_login_id, session_id)
if isinstance(history, dict) and "error" in history:
raise HTTPException(status_code=500, detail=history["error"])
if not history:
raise HTTPException(status_code=404, detail="Session history not found")
result = {"user": user_login_id, "session_id": session_id, "history": history}
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@Redis_session_router.delete("/sessions/{user_login_id}/{session_id}")
async def api_delete_session(user_login_id: str, session_id: str):
"""Delete a session."""
try:
result = await delete_session(user_login_id, session_id)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ---------- Demo route ----------
@Redis_session_router.get("/")
async def root():
return {"message": "Redis Session Manager API running"}
# ------------------------------------------------------------------
# Mount router & run
# ------------------------------------------------------------------
app.include_router(Redis_session_router)
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)