# -------------------------------------------------------------- # chat_with_memory_redis.py (100% FIXED – Sync, Simple, Works) # -------------------------------------------------------------- import os from agents import Agent, Runner, function_tool from mem0 import Memory from dotenv import load_dotenv # ---- Load .env ---- load_dotenv() os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") # ---- Redis Mem0 Config (EXACT from your working comment) ---- REDIS_URL = os.getenv( "REDIS_URL", "redis://default:FO6JF6mPJ0bgPj1Rpof84OtYyZUeIxUm@redis-17714.c81.us-east-1-2.ec2.redns.redis-cloud.com:17714" ) mem0_config = { "vector_store": { "provider": "redis", "config": { "collection_name": "mem0_chat", "embedding_model_dims": 1536, "redis_url": REDIS_URL, } }, "version": "v1.1" } # ---- FIXED: Use from_config (matches your commented code) ---- memory = Memory.from_config(mem0_config) print("✅ Mem0 + Redis ready") # -------------------------------------------------------------- # SYNC Memory Tools (no async – simple & fast) # -------------------------------------------------------------- @function_tool def search_memory(query: str, user_id: str) -> str: """Search user's memories in Redis.""" results = memory.search(query, user_id=user_id, limit=5) if results and results.get("results"): return "\n".join(f"- {r['memory']}" for r in results["results"]) return "No memories found." @function_tool def save_memory(content: str, user_id: str) -> str: """Save fact to Redis memory.""" memory.add(content, user_id=user_id) return "Saved to memory." # -------------------------------------------------------------- # Personal Assistant Agent # -------------------------------------------------------------- def personal_assistant(user_id: str) -> Agent: return Agent( name="Assistant", instructions=f"""You are a helpful assistant with long-term memory. **Always follow this order:** 1. Call `search_memory` with the query and user_id="{user_id}". 2. Personalize your answer using any memories found. 3. Answer the question clearly. 4. If user shares a new fact/preference, call `save_memory` with a short summary and same user_id. Only use tool results – no guessing. """, model="gpt-4o-mini", tools=[search_memory, save_memory], ) # -------------------------------------------------------------- # MAIN FUNCTION: chat_with_memory (SYNC – call directly!) # -------------------------------------------------------------- def chat_with_memory(query: str, user_id: str) -> str: """ Chat with Redis-memory agent. Returns final response as string. >>> chat_with_memory("I love pizza", "alice") >>> chat_with_memory("What food do I like?", "alice") # Recalls pizza! """ agent = personal_assistant(user_id) try: result = Runner.run_sync(agent, query) # FIXED: run_sync (from your original) return result.final_output.strip() except Exception as e: return f"Error: {e}" # -------------------------------------------------------------- # Test it! # -------------------------------------------------------------- if __name__ == "__main__": # # Save fact # print(chat_with_memory("I love pizza and live in New York", "alice")) # # Recall # print(chat_with_memory("Where do I live?", "alice")) # print(chat_with_memory("What food do I like?", "alice")) while True: query = input("You: ") response = chat_with_memory(query, "alice") print("Assistant:", response)