burtenshaw commited on
Commit
e996b22
·
1 Parent(s): 0a12050

add streaming

Browse files
Files changed (4) hide show
  1. app.py +42 -17
  2. backend/config.py +1 -2
  3. backend/council.py +78 -69
  4. backend/openrouter.py +41 -15
app.py CHANGED
@@ -1,5 +1,5 @@
1
  import gradio as gr
2
- from backend.council import stage1_collect_responses, stage2_collect_rankings, stage3_synthesize_final
3
  from backend.config import COUNCIL_MODELS, CHAIRMAN_MODEL
4
 
5
 
@@ -23,42 +23,67 @@ async def ask_council(question: str, progress=gr.Progress()):
23
  )
24
 
25
  try:
 
 
26
  # Stage 1: Collect individual responses
27
  progress(0.1, desc="Stage 1: Collecting individual responses...")
28
- yield ("## 🟡 Stage 1: Collecting individual responses from council members...")
 
29
 
30
  stage1_results = await stage1_collect_responses(question)
31
 
32
  if not stage1_results:
33
- yield "❌ The council failed to generate a response."
 
34
  return
35
 
 
 
 
 
 
 
 
 
 
36
  # Stage 2: Collect rankings
37
  progress(0.4, desc="Stage 2: Council members are ranking responses...")
38
- yield (
39
- f"## 🟢 Stage 1 Complete ({len(stage1_results)} responses received).\n\n"
40
- "## 🟡 Stage 2: Council members are ranking each other's responses..."
41
- )
42
 
43
  stage2_results, _ = await stage2_collect_rankings(question, stage1_results)
44
 
 
 
 
 
 
 
 
 
 
45
  # Stage 3: Synthesize final answer
46
  progress(0.7, desc="Stage 3: Chairman is synthesizing the final answer...")
47
- yield (
48
- "## 🟢 Stage 2 Complete (Rankings collected).\n\n"
49
- "## 🟡 Stage 3: Chairman is synthesizing the final answer..."
50
- )
51
 
52
- stage3_result = await stage3_synthesize_final(question, stage1_results, stage2_results)
 
 
 
53
 
54
  progress(1.0, desc="Complete!")
55
 
56
- response = stage3_result.get("response")
57
- if not response:
58
- yield "❌ The council failed to generate a final synthesis."
59
  return
60
 
61
- yield response
 
 
 
 
62
 
63
  except Exception as e:
64
  yield f"❌ Error consulting the council: {str(e)}"
@@ -66,7 +91,7 @@ async def ask_council(question: str, progress=gr.Progress()):
66
 
67
  description = """
68
  An MCP server that consults a council of LLMs to answer questions.
69
- ![image](https://pbs.twimg.com/media/G6ZZO7ragAAtnCZ?format=jpg)
70
  ⚠️ We're using 5 models in the council, so it takes a minute to answer.
71
  """
72
 
 
1
  import gradio as gr
2
+ from backend.council import stage1_collect_responses, stage2_collect_rankings, stage3_synthesize_final_stream
3
  from backend.config import COUNCIL_MODELS, CHAIRMAN_MODEL
4
 
5
 
 
23
  )
24
 
25
  try:
26
+ buffer = ""
27
+
28
  # Stage 1: Collect individual responses
29
  progress(0.1, desc="Stage 1: Collecting individual responses...")
30
+ buffer += "## 🟡 Stage 1: Collecting individual responses from council members...\n\n"
31
+ yield buffer
32
 
33
  stage1_results = await stage1_collect_responses(question)
34
 
35
  if not stage1_results:
36
+ buffer += "\n❌ The council failed to generate a response."
37
+ yield buffer
38
  return
39
 
40
+ # Format Stage 1 results
41
+ buffer += f"### ✅ Received {len(stage1_results)} responses:\n"
42
+ for res in stage1_results:
43
+ model_name = res["model"].split("/")[-1]
44
+ preview = res["response"][:100].replace("\n", " ") + "..."
45
+ buffer += f"- **{model_name}**: {preview}\n"
46
+ buffer += "\n---\n\n"
47
+ yield buffer
48
+
49
  # Stage 2: Collect rankings
50
  progress(0.4, desc="Stage 2: Council members are ranking responses...")
51
+ buffer += "## 🟡 Stage 2: Council members are ranking each other's responses...\n\n"
52
+ yield buffer
 
 
53
 
54
  stage2_results, _ = await stage2_collect_rankings(question, stage1_results)
55
 
56
+ # Format Stage 2 results
57
+ buffer += "### ✅ Rankings Collected:\n"
58
+ for res in stage2_results:
59
+ model_name = res["model"].split("/")[-1]
60
+ # Extract just the ranking part if possible, or just say "Ranked"
61
+ buffer += f"- **{model_name}** has submitted their rankings.\n"
62
+ buffer += "\n---\n\n"
63
+ yield buffer
64
+
65
  # Stage 3: Synthesize final answer
66
  progress(0.7, desc="Stage 3: Chairman is synthesizing the final answer...")
67
+ buffer += "## 🟡 Stage 3: Chairman is synthesizing the final answer...\n\n"
68
+ yield buffer
 
 
69
 
70
+ full_response = ""
71
+ async for chunk in stage3_synthesize_final_stream(question, stage1_results, stage2_results):
72
+ full_response += chunk
73
+ yield buffer + full_response
74
 
75
  progress(1.0, desc="Complete!")
76
 
77
+ if not full_response:
78
+ buffer += "\n❌ The council failed to generate a final synthesis."
79
+ yield buffer
80
  return
81
 
82
+ # Let's keep the history but mark Stage 3 as done
83
+ final_buffer = buffer.replace(
84
+ "## 🟡 Stage 3: Chairman is synthesizing the final answer...", "## 🟢 Stage 3: Final Answer"
85
+ )
86
+ yield final_buffer + full_response
87
 
88
  except Exception as e:
89
  yield f"❌ Error consulting the council: {str(e)}"
 
91
 
92
  description = """
93
  An MCP server that consults a council of LLMs to answer questions.
94
+ <img src="https://pbs.twimg.com/media/G6ZZO7ragAAtnCZ?format=jpg" alt="MCP Server" style="width: 100%; height: auto; text-align: center;">
95
  ⚠️ We're using 5 models in the council, so it takes a minute to answer.
96
  """
97
 
backend/config.py CHANGED
@@ -10,14 +10,13 @@ OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
10
 
11
  # Council members - list of OpenRouter model identifiers
12
  COUNCIL_MODELS = [
13
- "moonshotai/Kimi-K2-Thinking:novita",
14
  "openai/gpt-oss-120b:hyperbolic",
15
  "deepseek-ai/DeepSeek-V3.2-Exp:novita",
16
  "Qwen/Qwen3-235B-A22B-Instruct-2507:hyperbolic",
17
  ]
18
 
19
  # Chairman model - synthesizes final response
20
- CHAIRMAN_MODEL = "moonshotai/Kimi-K2-Thinking:novita"
21
 
22
  # OpenRouter API endpoint
23
  OPENROUTER_API_URL = "https://router.huggingface.co/v1/chat/completions"
 
10
 
11
  # Council members - list of OpenRouter model identifiers
12
  COUNCIL_MODELS = [
 
13
  "openai/gpt-oss-120b:hyperbolic",
14
  "deepseek-ai/DeepSeek-V3.2-Exp:novita",
15
  "Qwen/Qwen3-235B-A22B-Instruct-2507:hyperbolic",
16
  ]
17
 
18
  # Chairman model - synthesizes final response
19
+ CHAIRMAN_MODEL = "deepseek-ai/DeepSeek-V3.2-Exp:novita"
20
 
21
  # OpenRouter API endpoint
22
  OPENROUTER_API_URL = "https://router.huggingface.co/v1/chat/completions"
backend/council.py CHANGED
@@ -1,7 +1,7 @@
1
  """3-stage LLM Council orchestration."""
2
 
3
  from typing import List, Dict, Any, Tuple
4
- from .openrouter import query_models_parallel, query_model
5
  from .config import COUNCIL_MODELS, CHAIRMAN_MODEL
6
 
7
 
@@ -25,18 +25,14 @@ async def stage1_collect_responses(user_query: str) -> List[Dict[str, Any]]:
25
  stage1_results = []
26
  for model, response in responses.items():
27
  if response is not None: # Only include successful responses
28
- stage1_results.append({
29
- "model": model,
30
- "response": response.get('content', '')
31
- })
32
 
33
  print(f"STAGE 1 COMPLETE: Received {len(stage1_results)} responses.")
34
  return stage1_results
35
 
36
 
37
  async def stage2_collect_rankings(
38
- user_query: str,
39
- stage1_results: List[Dict[str, Any]]
40
  ) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
41
  """
42
  Stage 2: Each model ranks the anonymized responses.
@@ -53,16 +49,12 @@ async def stage2_collect_rankings(
53
  labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ...
54
 
55
  # Create mapping from label to model name
56
- label_to_model = {
57
- f"Response {label}": result['model']
58
- for label, result in zip(labels, stage1_results)
59
- }
60
 
61
  # Build the ranking prompt
62
- responses_text = "\n\n".join([
63
- f"Response {label}:\n{result['response']}"
64
- for label, result in zip(labels, stage1_results)
65
- ])
66
 
67
  ranking_prompt = f"""You are evaluating different responses to the following question:
68
 
@@ -104,22 +96,16 @@ Now provide your evaluation and ranking:"""
104
  stage2_results = []
105
  for model, response in responses.items():
106
  if response is not None:
107
- full_text = response.get('content', '')
108
  parsed = parse_ranking_from_text(full_text)
109
- stage2_results.append({
110
- "model": model,
111
- "ranking": full_text,
112
- "parsed_ranking": parsed
113
- })
114
 
115
  print("STAGE 2 COMPLETE: Rankings collected.")
116
  return stage2_results, label_to_model
117
 
118
 
119
  async def stage3_synthesize_final(
120
- user_query: str,
121
- stage1_results: List[Dict[str, Any]],
122
- stage2_results: List[Dict[str, Any]]
123
  ) -> Dict[str, Any]:
124
  """
125
  Stage 3: Chairman synthesizes final response.
@@ -134,15 +120,13 @@ async def stage3_synthesize_final(
134
  """
135
  print("STAGE 3: Chairman is synthesizing the final answer...")
136
  # Build comprehensive context for chairman
137
- stage1_text = "\n\n".join([
138
- f"Model: {result['model']}\nResponse: {result['response']}"
139
- for result in stage1_results
140
- ])
141
 
142
- stage2_text = "\n\n".join([
143
- f"Model: {result['model']}\nRanking: {result['ranking']}"
144
- for result in stage2_results
145
- ])
146
 
147
  chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses.
148
 
@@ -169,16 +153,54 @@ Provide a clear, well-reasoned final answer that represents the council's collec
169
  if response is None:
170
  # Fallback if chairman fails
171
  print("STAGE 3 ERROR: Unable to generate final synthesis.")
172
- return {
173
- "model": CHAIRMAN_MODEL,
174
- "response": "Error: Unable to generate final synthesis."
175
- }
176
 
177
  print("STAGE 3 COMPLETE: Final answer synthesized.")
178
- return {
179
- "model": CHAIRMAN_MODEL,
180
- "response": response.get('content', '')
181
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
 
183
 
184
  def parse_ranking_from_text(ranking_text: str) -> List[str]:
@@ -201,23 +223,22 @@ def parse_ranking_from_text(ranking_text: str) -> List[str]:
201
  ranking_section = parts[1]
202
  # Try to extract numbered list format (e.g., "1. Response A")
203
  # This pattern looks for: number, period, optional space, "Response X"
204
- numbered_matches = re.findall(r'\d+\.\s*Response [A-Z]', ranking_section)
205
  if numbered_matches:
206
  # Extract just the "Response X" part
207
- return [re.search(r'Response [A-Z]', m).group() for m in numbered_matches]
208
 
209
  # Fallback: Extract all "Response X" patterns in order
210
- matches = re.findall(r'Response [A-Z]', ranking_section)
211
  return matches
212
 
213
  # Fallback: try to find any "Response X" patterns in order
214
- matches = re.findall(r'Response [A-Z]', ranking_text)
215
  return matches
216
 
217
 
218
  def calculate_aggregate_rankings(
219
- stage2_results: List[Dict[str, Any]],
220
- label_to_model: Dict[str, str]
221
  ) -> List[Dict[str, Any]]:
222
  """
223
  Calculate aggregate rankings across all models.
@@ -235,7 +256,7 @@ def calculate_aggregate_rankings(
235
  model_positions = defaultdict(list)
236
 
237
  for ranking in stage2_results:
238
- ranking_text = ranking['ranking']
239
 
240
  # Parse the ranking from the structured format
241
  parsed_ranking = parse_ranking_from_text(ranking_text)
@@ -250,14 +271,12 @@ def calculate_aggregate_rankings(
250
  for model, positions in model_positions.items():
251
  if positions:
252
  avg_rank = sum(positions) / len(positions)
253
- aggregate.append({
254
- "model": model,
255
- "average_rank": round(avg_rank, 2),
256
- "rankings_count": len(positions)
257
- })
258
 
259
  # Sort by average rank (lower is better)
260
- aggregate.sort(key=lambda x: x['average_rank'])
261
 
262
  return aggregate
263
 
@@ -288,10 +307,10 @@ Title:"""
288
  # Fallback to a generic title
289
  return "New Conversation"
290
 
291
- title = response.get('content', 'New Conversation').strip()
292
 
293
  # Clean up the title - remove quotes, limit length
294
- title = title.strip('"\'')
295
 
296
  # Truncate if too long
297
  if len(title) > 50:
@@ -315,28 +334,18 @@ async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]:
315
 
316
  # If no models responded successfully, return error
317
  if not stage1_results:
318
- return [], [], {
319
- "model": "error",
320
- "response": "All models failed to respond. Please try again."
321
- }, {}
322
 
323
  # Stage 2: Collect rankings
324
  stage2_results, label_to_model = await stage2_collect_rankings(user_query, stage1_results)
325
 
326
  # Calculate aggregate rankings
327
  aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model)
328
-
329
  # Stage 3: Synthesize final answer
330
- stage3_result = await stage3_synthesize_final(
331
- user_query,
332
- stage1_results,
333
- stage2_results
334
- )
335
 
336
  # Prepare metadata
337
- metadata = {
338
- "label_to_model": label_to_model,
339
- "aggregate_rankings": aggregate_rankings
340
- }
341
 
342
  return stage1_results, stage2_results, stage3_result, metadata
 
1
  """3-stage LLM Council orchestration."""
2
 
3
  from typing import List, Dict, Any, Tuple
4
+ from .openrouter import query_models_parallel, query_model, query_model_stream
5
  from .config import COUNCIL_MODELS, CHAIRMAN_MODEL
6
 
7
 
 
25
  stage1_results = []
26
  for model, response in responses.items():
27
  if response is not None: # Only include successful responses
28
+ stage1_results.append({"model": model, "response": response.get("content", "")})
 
 
 
29
 
30
  print(f"STAGE 1 COMPLETE: Received {len(stage1_results)} responses.")
31
  return stage1_results
32
 
33
 
34
  async def stage2_collect_rankings(
35
+ user_query: str, stage1_results: List[Dict[str, Any]]
 
36
  ) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
37
  """
38
  Stage 2: Each model ranks the anonymized responses.
 
49
  labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ...
50
 
51
  # Create mapping from label to model name
52
+ label_to_model = {f"Response {label}": result["model"] for label, result in zip(labels, stage1_results)}
 
 
 
53
 
54
  # Build the ranking prompt
55
+ responses_text = "\n\n".join(
56
+ [f"Response {label}:\n{result['response']}" for label, result in zip(labels, stage1_results)]
57
+ )
 
58
 
59
  ranking_prompt = f"""You are evaluating different responses to the following question:
60
 
 
96
  stage2_results = []
97
  for model, response in responses.items():
98
  if response is not None:
99
+ full_text = response.get("content", "")
100
  parsed = parse_ranking_from_text(full_text)
101
+ stage2_results.append({"model": model, "ranking": full_text, "parsed_ranking": parsed})
 
 
 
 
102
 
103
  print("STAGE 2 COMPLETE: Rankings collected.")
104
  return stage2_results, label_to_model
105
 
106
 
107
  async def stage3_synthesize_final(
108
+ user_query: str, stage1_results: List[Dict[str, Any]], stage2_results: List[Dict[str, Any]]
 
 
109
  ) -> Dict[str, Any]:
110
  """
111
  Stage 3: Chairman synthesizes final response.
 
120
  """
121
  print("STAGE 3: Chairman is synthesizing the final answer...")
122
  # Build comprehensive context for chairman
123
+ stage1_text = "\n\n".join(
124
+ [f"Model: {result['model']}\nResponse: {result['response']}" for result in stage1_results]
125
+ )
 
126
 
127
+ stage2_text = "\n\n".join(
128
+ [f"Model: {result['model']}\nRanking: {result['ranking']}" for result in stage2_results]
129
+ )
 
130
 
131
  chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses.
132
 
 
153
  if response is None:
154
  # Fallback if chairman fails
155
  print("STAGE 3 ERROR: Unable to generate final synthesis.")
156
+ return {"model": CHAIRMAN_MODEL, "response": "Error: Unable to generate final synthesis."}
 
 
 
157
 
158
  print("STAGE 3 COMPLETE: Final answer synthesized.")
159
+ return {"model": CHAIRMAN_MODEL, "response": response.get("content", "")}
160
+
161
+
162
+ async def stage3_synthesize_final_stream(
163
+ user_query: str, stage1_results: List[Dict[str, Any]], stage2_results: List[Dict[str, Any]]
164
+ ):
165
+ """
166
+ Stage 3: Chairman synthesizes final response (Streaming).
167
+ Yields chunks of text.
168
+ """
169
+ print("STAGE 3: Chairman is synthesizing the final answer (Streaming)...")
170
+
171
+ # Build comprehensive context for chairman
172
+ stage1_text = "\n\n".join(
173
+ [f"Model: {result['model']}\nResponse: {result['response']}" for result in stage1_results]
174
+ )
175
+
176
+ stage2_text = "\n\n".join(
177
+ [f"Model: {result['model']}\nRanking: {result['ranking']}" for result in stage2_results]
178
+ )
179
+
180
+ chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses.
181
+
182
+ Original Question: {user_query}
183
+
184
+ STAGE 1 - Individual Responses:
185
+ {stage1_text}
186
+
187
+ STAGE 2 - Peer Rankings:
188
+ {stage2_text}
189
+
190
+ Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider:
191
+ - The individual responses and their insights
192
+ - The peer rankings and what they reveal about response quality
193
+ - Any patterns of agreement or disagreement
194
+
195
+ Provide a clear, well-reasoned final answer that represents the council's collective wisdom:"""
196
+
197
+ messages = [{"role": "user", "content": chairman_prompt}]
198
+
199
+ # Stream the chairman model
200
+ async for chunk in query_model_stream(CHAIRMAN_MODEL, messages):
201
+ yield chunk
202
+
203
+ print("STAGE 3 COMPLETE: Final answer stream finished.")
204
 
205
 
206
  def parse_ranking_from_text(ranking_text: str) -> List[str]:
 
223
  ranking_section = parts[1]
224
  # Try to extract numbered list format (e.g., "1. Response A")
225
  # This pattern looks for: number, period, optional space, "Response X"
226
+ numbered_matches = re.findall(r"\d+\.\s*Response [A-Z]", ranking_section)
227
  if numbered_matches:
228
  # Extract just the "Response X" part
229
+ return [re.search(r"Response [A-Z]", m).group() for m in numbered_matches]
230
 
231
  # Fallback: Extract all "Response X" patterns in order
232
+ matches = re.findall(r"Response [A-Z]", ranking_section)
233
  return matches
234
 
235
  # Fallback: try to find any "Response X" patterns in order
236
+ matches = re.findall(r"Response [A-Z]", ranking_text)
237
  return matches
238
 
239
 
240
  def calculate_aggregate_rankings(
241
+ stage2_results: List[Dict[str, Any]], label_to_model: Dict[str, str]
 
242
  ) -> List[Dict[str, Any]]:
243
  """
244
  Calculate aggregate rankings across all models.
 
256
  model_positions = defaultdict(list)
257
 
258
  for ranking in stage2_results:
259
+ ranking_text = ranking["ranking"]
260
 
261
  # Parse the ranking from the structured format
262
  parsed_ranking = parse_ranking_from_text(ranking_text)
 
271
  for model, positions in model_positions.items():
272
  if positions:
273
  avg_rank = sum(positions) / len(positions)
274
+ aggregate.append(
275
+ {"model": model, "average_rank": round(avg_rank, 2), "rankings_count": len(positions)}
276
+ )
 
 
277
 
278
  # Sort by average rank (lower is better)
279
+ aggregate.sort(key=lambda x: x["average_rank"])
280
 
281
  return aggregate
282
 
 
307
  # Fallback to a generic title
308
  return "New Conversation"
309
 
310
+ title = response.get("content", "New Conversation").strip()
311
 
312
  # Clean up the title - remove quotes, limit length
313
+ title = title.strip("\"'")
314
 
315
  # Truncate if too long
316
  if len(title) > 50:
 
334
 
335
  # If no models responded successfully, return error
336
  if not stage1_results:
337
+ return [], [], {"model": "error", "response": "All models failed to respond. Please try again."}, {}
 
 
 
338
 
339
  # Stage 2: Collect rankings
340
  stage2_results, label_to_model = await stage2_collect_rankings(user_query, stage1_results)
341
 
342
  # Calculate aggregate rankings
343
  aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model)
344
+
345
  # Stage 3: Synthesize final answer
346
+ stage3_result = await stage3_synthesize_final(user_query, stage1_results, stage2_results)
 
 
 
 
347
 
348
  # Prepare metadata
349
+ metadata = {"label_to_model": label_to_model, "aggregate_rankings": aggregate_rankings}
 
 
 
350
 
351
  return stage1_results, stage2_results, stage3_result, metadata
backend/openrouter.py CHANGED
@@ -6,9 +6,7 @@ from .config import OPENROUTER_API_KEY, OPENROUTER_API_URL
6
 
7
 
8
  async def query_model(
9
- model: str,
10
- messages: List[Dict[str, str]],
11
- timeout: float = 120.0
12
  ) -> Optional[Dict[str, Any]]:
13
  """
14
  Query a single model via OpenRouter API.
@@ -33,29 +31,57 @@ async def query_model(
33
 
34
  try:
35
  async with httpx.AsyncClient(timeout=timeout) as client:
36
- response = await client.post(
37
- OPENROUTER_API_URL,
38
- headers=headers,
39
- json=payload
40
- )
41
  response.raise_for_status()
42
 
43
  data = response.json()
44
- message = data['choices'][0]['message']
45
 
46
- return {
47
- 'content': message.get('content'),
48
- 'reasoning_details': message.get('reasoning_details')
49
- }
50
 
51
  except Exception as e:
52
  print(f"Error querying model {model}: {e}")
53
  return None
54
 
55
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  async def query_models_parallel(
57
- models: List[str],
58
- messages: List[Dict[str, str]]
59
  ) -> Dict[str, Optional[Dict[str, Any]]]:
60
  """
61
  Query multiple models in parallel.
 
6
 
7
 
8
  async def query_model(
9
+ model: str, messages: List[Dict[str, str]], timeout: float = 120.0
 
 
10
  ) -> Optional[Dict[str, Any]]:
11
  """
12
  Query a single model via OpenRouter API.
 
31
 
32
  try:
33
  async with httpx.AsyncClient(timeout=timeout) as client:
34
+ response = await client.post(OPENROUTER_API_URL, headers=headers, json=payload)
 
 
 
 
35
  response.raise_for_status()
36
 
37
  data = response.json()
38
+ message = data["choices"][0]["message"]
39
 
40
+ return {"content": message.get("content"), "reasoning_details": message.get("reasoning_details")}
 
 
 
41
 
42
  except Exception as e:
43
  print(f"Error querying model {model}: {e}")
44
  return None
45
 
46
 
47
+ async def query_model_stream(model: str, messages: List[Dict[str, str]], timeout: float = 120.0):
48
+ """
49
+ Query a model via OpenRouter API and stream the response.
50
+ Yields content chunks as they arrive.
51
+ """
52
+ headers = {
53
+ "Authorization": f"Bearer {OPENROUTER_API_KEY}",
54
+ "Content-Type": "application/json",
55
+ }
56
+
57
+ payload = {"model": model, "messages": messages, "stream": True}
58
+
59
+ import json
60
+
61
+ try:
62
+ async with httpx.AsyncClient(timeout=timeout) as client:
63
+ async with client.stream("POST", OPENROUTER_API_URL, headers=headers, json=payload) as response:
64
+ response.raise_for_status()
65
+ async for line in response.aiter_lines():
66
+ if line.startswith("data: "):
67
+ data_str = line[6:]
68
+ if data_str.strip() == "[DONE]":
69
+ break
70
+ try:
71
+ data = json.loads(data_str)
72
+ delta = data["choices"][0]["delta"]
73
+ content = delta.get("content")
74
+ if content:
75
+ yield content
76
+ except json.JSONDecodeError:
77
+ pass
78
+ except Exception as e:
79
+ print(f"Error streaming model {model}: {e}")
80
+ yield f"[Error: {str(e)}]"
81
+
82
+
83
  async def query_models_parallel(
84
+ models: List[str], messages: List[Dict[str, str]]
 
85
  ) -> Dict[str, Optional[Dict[str, Any]]]:
86
  """
87
  Query multiple models in parallel.