# import pandas as pd # import os # import sys # from pathlib import Path # import requests # from urllib.parse import quote # from typing import Optional # import json # from typing import AsyncGenerator, List # import requests # from fastapi import FastAPI, HTTPException, Request # from fastapi.middleware.cors import CORSMiddleware # from fastapi.responses import StreamingResponse, FileResponse # from fastapi.staticfiles import StaticFiles # from pydantic import BaseModel # import uvicorn # from urllib.parse import quote # # Ensure project root is on sys.path # project_root = Path(__file__).resolve().parents[2] # Adjust if needed # if str(project_root) not in sys.path: # sys.path.insert(0, str(project_root)) # from langchain_experimental.agents.agent_toolkits import create_pandas_dataframe_agent # from langchain_openai import ChatOpenAI # import dotenv # from agents import Agent, Runner, function_tool, trace # from agents.extensions.memory import RedisSession # import asyncio # from agents import Agent, Runner, function_tool # from openai.types.responses import ResponseTextDeltaEvent # from openai.types.responses import ResponseTextDeltaEvent # from agents import Agent, Runner # import asyncio # # Load environment variables # dotenv.load_dotenv() # os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") # # Redis URL (optional) # Redis_url = os.getenv("REDIS_URL") # # --- Vector Search Tool: Fixed URL Encoding + Schema --- # def query_vector_agent_calling(user_query: str, collection_name: str) -> str: # """ # Call the vector agent API to get relevant context. # Uses proper URL encoding for collection name with special chars like '&'. # """ # base_url = "https://srivatsavdamaraju-mvp-2-0-deploy-all-apis.hf.space/qdrant/search" # encoded_collection = quote(collection_name, safe='') # & → %26 # url = f"{base_url}?collection_name={encoded_collection}&mode=hybrid" # headers = { # "accept": "application/json", # "Content-Type": "application/json", # } # payload = { # "query": user_query, # "top_k": 2 # } # try: # response = requests.post(url, headers=headers, json=payload, timeout=30) # response.raise_for_status() # data = response.json() # results = data.get("results") or data.get("result") or data # if not results: # return "No relevant context found." # contexts = [] # for item in results: # text = ( # item.get("text") or # item.get("payload", {}).get("text") or # str(item) # ) # if text: # contexts.append(text.strip()) # return "\n\n".join(contexts) if contexts else "No text extracted from results." # except requests.exceptions.HTTPError as e: # error_msg = f"HTTP {e.response.status_code}: {e.response.text}" # print(f"Vector API HTTP Error: {error_msg}") # return f"Vector API error: {e.response.status_code}" # except requests.exceptions.Timeout: # return "Vector API timeout. Please try again." # except requests.exceptions.RequestException as e: # print(f"Vector API Request Error: {e}") # return "Failed to connect to vector search service." # except Exception as e: # print(f"Unexpected error in vector tool: {e}") # return "Internal error in vector search tool." # # --- Fixed Tool with Type Hints (Critical for OpenAI Schema) --- # @function_tool # async def vector_agent_tool( # user_query: str, # collection_name: Optional[str] = "sri_1_files_&_files_metadata" # ) -> str: # """ # Search vector database for context relevant to the user's query. # Args: # user_query: Natural language question from user. # collection_name: Qdrant collection to search (default: sri_1_files_&_files_metadata). # Returns: # Relevant text snippets from the vector store. # """ # return query_vector_agent_calling(user_query, collection_name) # # --- Agents --- # DataAnalyst_agent = Agent( # name="DataAnalyst_agent", # instructions=( # "You are a data analyst. Use the provided tools to analyze the healthcare dataset. " # "Answer questions factually based on data. Use visualizations only if explicitly asked." # ), # model="gpt-4o-mini", # tools=[], # ) # Business_Intelligence_Agent = Agent( # name="Business_Intelligence_Agent", # instructions="Analyze the dataframe and provide business insights, trends, and recommendations.", # model="gpt-4o-mini", # tools=[], # ) # Data_Scientist_Agent = Agent( # name="Data_Scientist_Agent", # instructions="Perform statistical analysis, identify patterns, and suggest models.", # model="gpt-4o-mini", # tools=[], # ) # general_purpose_agent = Agent( # name="general_purpose_agent", # instructions=( # "You are a general-purpose agent. For any question (what, why, how, etc.), " # "first use `vector_agent_tool` to retrieve relevant context from the knowledge base. " # "Then, use the retrieved context to answer the question. " # "If the question is about data analysis, hand off to DataAnalyst_agent." # ), # model="gpt-4o-mini", # tools=[vector_agent_tool], # Only this tool # ) # rephraser_agent = Agent( # name="rephraser_agent", # instructions="Rephrase the user's message to make it more natural, clear, and fluent.", # handoff_description="English rephraser", # model="gpt-4o-mini" # ) # orchestrator_agent = Agent( # name="orchestrator_agent", # instructions=( # "You are the main orchestrator. Follow this logic:\n" # "- For general questions (what, who, why, etc.), use `general_purpose_agent`.\n" # "- It will automatically call `vector_agent_tool` to get context.\n" # "- If the user asks for plots or visualizations, use `autoviz_agent` (not implemented yet).\n" # "- For data analysis, use `analyze_dataframe` or `DataAnalyst_agent`.\n" # "- Always aim for accurate, concise, and helpful responses." # ), # tools=[ # DataAnalyst_agent.as_tool( # tool_name="analyze_dataframe", # tool_description="Analyze the healthcare dataset using pandas and return insights." # ), # rephraser_agent.as_tool( # tool_name="rephrase", # tool_description="Rephrase user input for clarity." # ), # Business_Intelligence_Agent.as_tool( # tool_name="Business_Intelligence_Agent", # tool_description="Provide business-level insights from data." # ), # Data_Scientist_Agent.as_tool( # tool_name="DataScientist_Agent", # tool_description="Perform advanced statistical analysis." # ), # general_purpose_agent.as_tool( # tool_name="general_purpose_agent", # tool_description="Answer general questions using vector search context." # ), # ], # ) # synthesizer_agent = Agent( # name="synthesizer_agent", # instructions=( # "Review all intermediate results. " # "Clean up language, remove redundancy, and present a clear, final answer to the user." # ), # model="gpt-4o-mini" # ) # # --- Main Runner --- # # async def main(query: str): # # print(f"User Query: {query}\n") # # # Optional: Use Redis session for memory # # # session = RedisSession.from_url("session_123", url=Redis_url, key_prefix="user_123:") # # # orchestrator_result = await Runner.run(orchestrator_agent, query, session=session) # # orchestrator_result = await Runner.run(orchestrator_agent, query) # # # result = Runner.run_streamed(orchestrator_agent, query) # # # async for event in result.stream_events(): # # # if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): # # # print(event.data.delta, end="", flush=True) # # print("=== Intermediate Steps ===") # # for item in orchestrator_result.new_items: # # print(f"Intermediate result: {item}") # # print() # # # Synthesize final answer # # synthesizer_result = await Runner.run( # # synthesizer_agent, # # orchestrator_result.to_input_list() # # ) # # final_answer = synthesizer_result.final_output # # print(f"Final Answer:\n{final_answer}") # # print(f"final_output_type: {type(final_answer)}") # # # return final_answer # # return json.dumps(final_answer) # async def stream_multiagent(user_query: str) -> AsyncGenerator[str, None]: # convo_id = "multiagent_123" # analysis_text = "" # try: # # === 1. Run Orchestrator (streamed) === # result = Runner.run_streamed(orchestrator_agent, input=user_query) # async for event in result.stream_events(): # if ( # event.type == "raw_response_event" # and isinstance(event.data, ResponseTextDeltaEvent) # ): # delta = event.data.delta # analysis_text += delta # yield delta # # === 2. Run Synthesizer (after orchestrator) === # # Build input from orchestrator result # orchestrator_output = result.final_output or "" # synthesizer_input = [ # {"role": "system", "content": "Synthesize the final answer."}, # {"role": "user", "content": orchestrator_output} # ] # synth_result = await Runner.run(synthesizer_agent, synthesizer_input) # final_answer = synth_result.final_output or analysis_text # fallback # # === 3. Final Payload === # payload = { # "convoId": convo_id, # "type": "assistant_message", # "hidden_code": False, # "hidden_text": "", # "ai_message": final_answer, # "artifacts": [] # Add Vega-Lite later if needed # } # yield "\n\n[PAYLOAD]\n" + json.dumps(payload, ensure_ascii=False) + "\n\n" # yield "[DONE]" # except Exception as e: # error_msg = f"[ERROR] {str(e)}" # yield error_msg # yield "[DONE]" # #======================api endpoints========================= # app = FastAPI(title="Multi-Agent Streaming API", version="1.0") # # === CORS === # app.add_middleware( # CORSMiddleware, # allow_origins=["*"], # allow_credentials=True, # allow_methods=["*"], # allow_headers=["*"], # ) # # === Static Files === # os.makedirs("static", exist_ok=True) # app.mount("/static", StaticFiles(directory="static", html=True), name="static") # # === Favicon === # @app.get("/favicon.ico") # async def favicon(): # path = "static/favicon.ico" # return FileResponse(path) if os.path.exists(path) else FileResponse("static/default-favicon.ico") # # ------------------------------- # # Request Model # # ------------------------------- # class QueryRequest(BaseModel): # query: str # # ------------------------------- # # Streaming Endpoint # # ------------------------------- # @app.post("/chat", response_class=StreamingResponse) # async def chat_stream(request: QueryRequest): # if not request.query.strip(): # raise HTTPException(status_code=400, detail="Query is required") # return StreamingResponse( # stream_multiagent(request.query), # media_type="text/event-stream", # headers={ # "Cache-Control": "no-cache", # "Connection": "keep-alive", # "X-Accel-Buffering": "no", # } # ) # # ------------------------------- # # Core Streaming Generator # # ------------------------------- # async def stream_multiagent(user_query: str) -> AsyncGenerator[str, None]: # convo_id = "multiagent_123" # analysis_text = "" # try: # # === 1. Run Orchestrator (streamed) === # result = Runner.run_streamed(orchestrator_agent, input=user_query) # async for event in result.stream_events(): # if ( # event.type == "raw_response_event" # and isinstance(event.data, ResponseTextDeltaEvent) # ): # delta = event.data.delta # analysis_text += delta # yield delta # # === 2. Run Synthesizer (after orchestrator) === # # Build input from orchestrator result # orchestrator_output = result.final_output or "" # synthesizer_input = [ # {"role": "system", "content": "Synthesize the final answer."}, # {"role": "user", "content": orchestrator_output} # ] # synth_result = await Runner.run(synthesizer_agent, synthesizer_input) # final_answer = synth_result.final_output or analysis_text # fallback # # === 3. Final Payload === # payload = { # "convoId": convo_id, # "type": "assistant_message", # "hidden_code": False, # "hidden_text": "", # "ai_message": final_answer, # "artifacts": [] # Add Vega-Lite later if needed # } # yield "\n\n[PAYLOAD]\n" + json.dumps(payload, ensure_ascii=False) + "\n\n" # yield "[DONE]" # except Exception as e: # error_msg = f"[ERROR] {str(e)}" # yield error_msg # yield "[DONE]" # # ------------------------------- # # Health Check # # ------------------------------- # @app.get("/health") # async def health(): # return {"status": "healthy"} # # --- Entry Point --- # # if __name__ == "__main__": # # user_query = input("Enter the query: ").strip() # # if not user_query: # # user_query = "what is srivatsav" # # asyncio.run(main(user_query)) # # if __name__ == "__main__": # # print("DataAnalyst-Only Mode – streaming (type 'exit' to quit)\n") # # while True: # # user_query = input("Query: ").strip() # # if user_query.lower() in {"exit", "quit"}: # # break # # if not user_query: # # continue # # async def run_and_print(): # # async for chunk in stream_multiagent(user_query): # # print(chunk, end="", flush=True) # # try: # # asyncio.run(run_and_print()) # # except Exception as e: # # print(f"\nError: {e}") # # import traceback # # traceback.print_exc() # # again = input("\nAnother query? (y/n): ").strip().lower() # # if again not in {'y', 'yes'}: # # print("Goodbye!") # # break # multiagent_api.py import asyncio import json import os import sys from pathlib import Path from typing import AsyncGenerator from typing import Optional import requests from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel import uvicorn from urllib.parse import quote from retrieve_secret import * # ------------------------------- # Project Root # ------------------------------- PROJECT_ROOT = Path(__file__).resolve().parents[0] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) # ------------------------------- # Imports # ------------------------------- import dotenv dotenv.load_dotenv() # os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") from agents import Agent, Runner, function_tool from openai.types.responses import ResponseTextDeltaEvent # Redis URL (optional) Redis_url = os.getenv("REDIS_URL") # --- Vector Search Tool: Fixed URL Encoding + Schema --- def query_vector_agent_calling(user_query: str, collection_name: str) -> str: """ Call the vector agent API to get relevant context. Uses proper URL encoding for collection name with special chars like '&'. """ base_url = "https://mr-mvp-api-dev.dev.ingenspark.com/qdrant/search" encoded_collection = quote(collection_name, safe='') # & → %26 url = f"{base_url}?collection_name={encoded_collection}&mode=hybrid" headers = { "accept": "application/json", "Content-Type": "application/json", } payload = { "query": user_query, "top_k": 2 } try: response = requests.post(url, headers=headers, json=payload, timeout=30) response.raise_for_status() data = response.json() results = data.get("results") or data.get("result") or data if not results: return "No relevant context found." contexts = [] for item in results: text = ( item.get("text") or item.get("payload", {}).get("text") or str(item) ) if text: contexts.append(text.strip()) return "\n\n".join(contexts) if contexts else "No text extracted from results." except requests.exceptions.HTTPError as e: error_msg = f"HTTP {e.response.status_code}: {e.response.text}" print(f"Vector API HTTP Error: {error_msg}") return f"Vector API error: {e.response.status_code}" except requests.exceptions.Timeout: return "Vector API timeout. Please try again." except requests.exceptions.RequestException as e: print(f"Vector API Request Error: {e}") return "Failed to connect to vector search service." except Exception as e: print(f"Unexpected error in vector tool: {e}") return "Internal error in vector search tool." # --- Fixed Tool with Type Hints (Critical for OpenAI Schema) --- @function_tool async def vector_agent_tool( user_query: str, collection_name: Optional[str] = "sri_1_files_&_files_metadata" ) -> str: """ Search vector database for context relevant to the user's query. Args: user_query: Natural language question from user. collection_name: Qdrant collection to search (default: sri_1_files_&_files_metadata). Returns: Relevant text snippets from the vector store. """ return query_vector_agent_calling(user_query, collection_name) # --- Agents --- DataAnalyst_agent = Agent( name="DataAnalyst_agent", instructions=( "You are a data analyst. Use the provided tools to analyze the healthcare dataset. " "Answer questions factually based on data. Use visualizations only if explicitly asked." ), model="gpt-4o-mini", tools=[], ) Business_Intelligence_Agent = Agent( name="Business_Intelligence_Agent", instructions="Analyze the dataframe and provide business insights, trends, and recommendations.", model="gpt-4o-mini", tools=[], ) Data_Scientist_Agent = Agent( name="Data_Scientist_Agent", instructions="Perform statistical analysis, identify patterns, and suggest models.", model="gpt-4o-mini", tools=[], ) general_purpose_agent = Agent( name="general_purpose_agent", instructions=( "You are a general-purpose agent. For any question (what, why, how, etc.), " "first use `vector_agent_tool` to retrieve relevant context from the knowledge base. " "Then, use the retrieved context to answer the question. " "If the question is about data analysis, hand off to DataAnalyst_agent." ), model="gpt-4o-mini", tools=[vector_agent_tool], # Only this tool ) rephraser_agent = Agent( name="rephraser_agent", instructions="Rephrase the user's message to make it more natural, clear, and fluent.", handoff_description="English rephraser", model="gpt-4o-mini" ) orchestrator_agent = Agent( name="orchestrator_agent", instructions=( "You are the main orchestrator. Follow this logic:\n" "- For general questions (what, who, why, etc.), use `general_purpose_agent`.\n" "- It will automatically call `vector_agent_tool` to get context.\n" "- If the user asks for plots or visualizations, use `autoviz_agent` (not implemented yet).\n" "- For data analysis, use `analyze_dataframe` or `DataAnalyst_agent`.\n" "- Always aim for accurate, concise, and helpful responses." ), tools=[ DataAnalyst_agent.as_tool( tool_name="analyze_dataframe", tool_description="Analyze the healthcare dataset using pandas and return insights." ), rephraser_agent.as_tool( tool_name="rephrase", tool_description="Rephrase user input for clarity." ), Business_Intelligence_Agent.as_tool( tool_name="Business_Intelligence_Agent", tool_description="Provide business-level insights from data." ), Data_Scientist_Agent.as_tool( tool_name="DataScientist_Agent", tool_description="Perform advanced statistical analysis." ), general_purpose_agent.as_tool( tool_name="general_purpose_agent", tool_description="Answer general questions using vector search context." ), ], ) synthesizer_agent = Agent( name="synthesizer_agent", instructions=( "Review all intermediate results. " "Clean up language, remove redundancy, and present a clear, final answer to the user." ), model="gpt-4o-mini" ) # ------------------------------- # FastAPI App # ------------------------------- # === Static Files === # ------------------------------- # Streaming Endpoint (ONLY ONE!) # ------------------------------- # ------------------------------- # Core Streaming Generator (ONLY ONE!) # ------------------------------- async def stream_multiagent(user_query: str) -> AsyncGenerator[str, None]: convo_id = "multiagent_123" analysis_text = "" try: # === 1. Run Orchestrator (streamed) === result = Runner.run_streamed(orchestrator_agent, input=user_query) async for event in result.stream_events(): if ( event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent) ): delta = event.data.delta analysis_text += delta yield delta # ← Live streaming # === 2. Run Synthesizer === orchestrator_output = result.final_output or analysis_text synthesizer_input = [ {"role": "system", "content": "Synthesize the final answer."}, {"role": "user", "content": orchestrator_output} ] synth_result = await Runner.run(synthesizer_agent, synthesizer_input) final_answer = synth_result.final_output or analysis_text # === 3. Final Payload === payload = { "convoId": convo_id, "type": "assistant_message", "hidden_code": False, "hidden_text": "", "ai_message": final_answer, "artifacts": [] # Add Vega-Lite later } yield "\n\n[PAYLOAD]\n" + json.dumps(payload, ensure_ascii=False) + "\n\n" yield "[DONE]" except Exception as e: error_msg = f"[ERROR] {str(e)}" yield error_msg yield "[DONE]" # ------------------------------- # Health Check # ------------------------------- # ------------------------------- # Interactive Console (Optional) # ------------------------------- if __name__ == "__main__": print("Multi-Agent Streaming Console (type 'exit' to quit)\n") while True: user_query = input("Query: ").strip() if user_query.lower() in {"exit", "quit"}: break if not user_query: continue async def run_and_print(): async for chunk in stream_multiagent(user_query): print(chunk, end="", flush=True) try: asyncio.run(run_and_print()) except Exception as e: print(f"\nError: {e}") import traceback traceback.print_exc() again = input("\nAnother query? (y/n): ").strip().lower() if again not in {'y', 'yes'}: print("Goodbye!") break