destinyebuka commited on
Commit
9f3c354
Β·
1 Parent(s): 366f758
app/ai/nodes/draft_node.py CHANGED
@@ -1,18 +1,20 @@
1
- # app/ai/nodes/draft_node.py – FIXED: Don't regenerate when preview is active + error handling
 
2
  import datetime
3
  from typing import Dict
4
- from bson import ObjectId
5
  from app.database import get_db
6
- from app.ai.state import ListingDraft
7
- from structlog import get_logger
8
- from app.config import settings
9
  from app.ml.models.ml_listing_extractor import get_ml_extractor
10
 
11
- logger = get_logger(__name__)
12
 
13
  ml_extractor = get_ml_extractor()
14
 
15
- # ========== AMENITY ICONS MAPPING ==========
 
 
 
16
  AMENITY_ICONS = {
17
  "wifi": "πŸ“Ά",
18
  "parking": "πŸ…ΏοΈ",
@@ -31,16 +33,22 @@ AMENITY_ICONS = {
31
  "television": "πŸ“Ί",
32
  }
33
 
34
- # ========== CUSTOM EXCEPTIONS ==========
 
 
 
35
  class ValidationError(Exception):
36
- """Raised when ML validation fails."""
37
  pass
38
 
39
  class DraftGenerationError(Exception):
40
  """Raised when draft generation fails."""
41
  pass
42
 
43
- # ========== HELPERS ==========
 
 
 
44
  def _add_amenity_icons(amenities: list) -> str:
45
  """Convert amenities list to string with icons."""
46
  if not amenities:
@@ -48,7 +56,7 @@ def _add_amenity_icons(amenities: list) -> str:
48
  icons_text = []
49
  for amenity in amenities:
50
  amenity_lower = amenity.lower().strip()
51
- icon = AMENITY_ICONS.get(amenity_lower, "βœ…")
52
  icons_text.append(f"{icon} {amenity.title()}")
53
  return " | ".join(icons_text)
54
 
@@ -91,117 +99,148 @@ def _get_current_message(state: Dict) -> str:
91
  return full.split("Now the user says:")[-1].strip()
92
  return full.strip()
93
 
94
- # ---------- node ----------
 
 
 
 
95
  async def draft_node(state: Dict) -> Dict:
96
  """
97
- LangGraph node:
98
- - NEVER regenerate if preview is already shown
99
- - Only run if status is exactly "draft_ready"
100
- - After preview shown, let intent_node handle all commands
101
- - βœ… Comprehensive error handling + logging
102
- """
103
- status = state.get("status")
104
-
105
- # ===== CRITICAL: Exit immediately if preview already active =====
106
- if status in {"preview_shown", "waiting_for_images"}:
107
- logger.info("πŸ›‘ DRAFT NODE SKIPPED: Preview already active, not regenerating")
108
- return state
109
-
110
- # ===== Only proceed if status is EXACTLY "draft_ready" =====
111
- if status != "draft_ready":
112
- logger.info(f"πŸ›‘ DRAFT NODE SKIPPED: status={status}, not draft_ready")
113
- return state
114
 
115
- # ===== Only proceed if intent is "list" =====
116
- if state.get("intent") != "list":
117
- logger.info("πŸ›‘ DRAFT NODE SKIPPED: intent is not 'list'")
118
- return state
 
 
 
 
119
 
120
- # ===== If we get here, generate the draft =====
121
- logger.info("βœ… DRAFT NODE RUNNING: Generating draft preview")
122
 
123
- user_id = state.get("user_id")
124
-
125
- # ===== ML VALIDATION =====
126
- try:
127
- validation = ml_extractor.validate_all_fields(state, user_id)
128
- if not validation["all_valid"]:
129
- issues_text = "\n".join([f"❌ {issue}" for issue in validation["issues"]])
130
- state["ai_reply"] = f"""I found some issues with your listing:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
 
132
  {issues_text}
133
 
134
  Let me ask again - could you clarify these fields?"""
135
- state["status"] = "collecting"
136
- state["missing_fields"] = [
137
- field for field, result in validation["field_validations"].items()
138
- if not result["is_valid"]
139
- ]
140
- logger.warning("🚫 Fields failed ML validation", issues=validation["issues"])
141
- return state
142
- logger.info("βœ… All fields passed ML validation", user_id=user_id)
143
- except Exception as e:
144
- logger.error("❌ ML validation error", exc_info=e)
145
- state["ai_reply"] = "Sorry, I couldn't validate your listing. Please try again."
146
- state["status"] = "error"
147
- return state
148
-
149
- # ===== DRAFT GENERATION =====
150
- try:
151
- # Generate title / description / icons
152
- title = _generate_title(state)
153
- description = _generate_description(state)
154
- amenities_with_icons = _add_amenity_icons(state.get("amenities", []))
155
- images = state.get("draft", {}).get("images", []) if isinstance(state.get("draft"), dict) else []
156
-
157
- draft_preview = {
158
- "title": title,
159
- "description": description,
160
- "location": state.get("location", "").title(),
161
- "bedrooms": state.get("bedrooms"),
162
- "bathrooms": state.get("bathrooms"),
163
- "price": state.get("price"),
164
- "price_type": state.get("price_type"),
165
- "listing_type": state.get("listing_type"),
166
- "amenities": state.get("amenities", []),
167
- "amenities_with_icons": amenities_with_icons,
168
- "requirements": state.get("requirements"),
169
- "currency": state.get("currency", "XOF"),
170
- "images": images,
171
- "field_confidences": validation["field_validations"],
172
- }
173
-
174
- logger.info("🎯 Draft preview generated",
175
- title=title,
176
- location=state.get("location"),
177
- image_count=len(images),
178
- amenities=state.get("amenities", []))
179
-
180
- except Exception as e:
181
- logger.error("❌ Failed to generate draft preview", exc_info=e)
182
- state["ai_reply"] = "Sorry, I couldn't generate your draft. Please try again."
183
- state["status"] = "error"
184
- return state
185
-
186
- # ===== BUILD PREVIEW MESSAGE =====
187
- try:
188
- images_section = ""
189
- if images:
190
- images_section = f"\nπŸ“· Images: {len(images)} uploaded\n"
191
- for idx, img_url in enumerate(images[:3], 1):
192
- images_section += f" {idx}. {img_url[:60]}...\n"
193
- if len(images) > 3:
194
- images_section += f" ... and {len(images) - 3} more\n"
195
-
196
- preview_text = f"""
197
- β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
198
- 🏘 LISTING PREVIEW
199
- β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
 
 
 
 
 
 
 
 
 
 
 
 
200
 
201
  **{draft_preview['title']}**
202
 
203
  πŸ“ Location: {draft_preview['location']}
204
- πŸ› Bedrooms: {draft_preview['bedrooms']}
205
  🚿 Bathrooms: {draft_preview['bathrooms']}
206
  πŸ’° Price: {draft_preview['price']:,} {draft_preview['price_type']} ({draft_preview['currency']})
207
 
@@ -209,30 +248,36 @@ Let me ask again - could you clarify these fields?"""
209
 
210
  ✨ Amenities: {draft_preview['amenities_with_icons'] if draft_preview['amenities_with_icons'] else 'None specified'}
211
  {images_section}
212
- β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
213
  """
214
-
215
- if not images:
216
- preview_text += """
217
  πŸ“Έ Upload property images to make your listing more attractive!
218
 
219
  Then say **publish** to make it live!
220
  """
221
- state["status"] = "waiting_for_images"
222
- else:
223
- preview_text += """
 
224
  βœ… Perfect! Say **publish** to make your listing live!
225
  """
226
- state["status"] = "preview_shown"
227
-
228
- state["draft_preview"] = draft_preview
229
- state["ai_reply"] = preview_text
230
-
231
- logger.info("βœ… Draft node DONE: status set to preview_shown or waiting_for_images")
232
- return state
233
-
234
- except Exception as e:
235
- logger.error("❌ Failed to build preview message", exc_info=e)
236
- state["ai_reply"] = "Sorry, I encountered an error preparing your listing. Please try again."
237
- state["status"] = "error"
238
- return state
 
 
 
 
 
 
1
+ # app/ai/nodes/draft_node.py – FINAL: Error handling + validation + observability
2
+ import logging
3
  import datetime
4
  from typing import Dict
5
+
6
  from app.database import get_db
7
+ from app.core.error_handling import trace_operation, handle_errors
 
 
8
  from app.ml.models.ml_listing_extractor import get_ml_extractor
9
 
10
+ logger = logging.getLogger(__name__)
11
 
12
  ml_extractor = get_ml_extractor()
13
 
14
+ # ============================================================
15
+ # Amenity Icons Mapping
16
+ # ============================================================
17
+
18
  AMENITY_ICONS = {
19
  "wifi": "πŸ“Ά",
20
  "parking": "πŸ…ΏοΈ",
 
33
  "television": "πŸ“Ί",
34
  }
35
 
36
+ # ============================================================
37
+ # Custom Exceptions
38
+ # ============================================================
39
+
40
  class ValidationError(Exception):
41
+ """Raised when field validation fails."""
42
  pass
43
 
44
  class DraftGenerationError(Exception):
45
  """Raised when draft generation fails."""
46
  pass
47
 
48
+ # ============================================================
49
+ # Helpers
50
+ # ============================================================
51
+
52
  def _add_amenity_icons(amenities: list) -> str:
53
  """Convert amenities list to string with icons."""
54
  if not amenities:
 
56
  icons_text = []
57
  for amenity in amenities:
58
  amenity_lower = amenity.lower().strip()
59
+ icon = AMENITY_ICONS.get(amenity_lower, "βœ”")
60
  icons_text.append(f"{icon} {amenity.title()}")
61
  return " | ".join(icons_text)
62
 
 
99
  return full.split("Now the user says:")[-1].strip()
100
  return full.strip()
101
 
102
+ # ============================================================
103
+ # Draft Node
104
+ # ============================================================
105
+
106
+ @handle_errors(default_return=None)
107
  async def draft_node(state: Dict) -> Dict:
108
  """
109
+ LangGraph node: Generate draft listing preview
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
 
111
+ Features:
112
+ - Prevents regeneration when preview active
113
+ - ML validation of all fields
114
+ - Professional title/description generation
115
+ - Amenity icon formatting
116
+ - Error handling with graceful fallback
117
+ - Full observability and logging
118
+ """
119
 
120
+ status = state.get("status")
 
121
 
122
+ with trace_operation(
123
+ "draft_node",
124
+ {
125
+ "status": status,
126
+ "intent": state.get("intent"),
127
+ "has_draft": state.get("draft_preview") is not None,
128
+ }
129
+ ):
130
+ # ===== CRITICAL: Exit immediately if preview already active =====
131
+ if status in {"preview_shown", "waiting_for_images"}:
132
+ logger.info("πŸ›‘ DRAFT NODE SKIPPED: Preview already active")
133
+ return state
134
+
135
+ # ===== Only proceed if status is EXACTLY "draft_ready" =====
136
+ if status != "draft_ready":
137
+ logger.info(f"πŸ›‘ DRAFT NODE SKIPPED: status={status}, not draft_ready")
138
+ return state
139
+
140
+ # ===== Only proceed if intent is "list" =====
141
+ if state.get("intent") != "list":
142
+ logger.info("πŸ›‘ DRAFT NODE SKIPPED: intent is not 'list'")
143
+ return state
144
+
145
+ # ===== If we get here, generate the draft =====
146
+ logger.info("βœ… DRAFT NODE RUNNING: Generating draft preview")
147
+
148
+ user_id = state.get("user_id")
149
+
150
+ # ===== ML VALIDATION =====
151
+ with trace_operation("ml_validation"):
152
+ try:
153
+ validation = ml_extractor.validate_all_fields(state, user_id)
154
+
155
+ if not validation["all_valid"]:
156
+ issues_text = "\n".join([f"❌ {issue}" for issue in validation["issues"]])
157
+ state["ai_reply"] = f"""I found some issues with your listing:
158
 
159
  {issues_text}
160
 
161
  Let me ask again - could you clarify these fields?"""
162
+ state["status"] = "collecting"
163
+ state["missing_fields"] = [
164
+ field for field, result in validation["field_validations"].items()
165
+ if not result["is_valid"]
166
+ ]
167
+ logger.warning(f"🚫 Validation failed", extra={"issues": validation["issues"]})
168
+ return state
169
+
170
+ logger.info(f"βœ… All fields passed validation")
171
+
172
+ except Exception as e:
173
+ logger.error(f"❌ ML validation error: {e}", exc_info=True)
174
+ state["ai_reply"] = "Sorry, I couldn't validate your listing. Please try again."
175
+ state["status"] = "error"
176
+ return state
177
+
178
+ # ===== DRAFT GENERATION =====
179
+ with trace_operation("draft_generation"):
180
+ try:
181
+ # Generate components
182
+ title = _generate_title(state)
183
+ description = _generate_description(state)
184
+ amenities_with_icons = _add_amenity_icons(state.get("amenities", []))
185
+ images = (
186
+ state.get("draft", {}).get("images", [])
187
+ if isinstance(state.get("draft"), dict)
188
+ else []
189
+ )
190
+
191
+ # Build draft preview
192
+ draft_preview = {
193
+ "title": title,
194
+ "description": description,
195
+ "location": state.get("location", "").title(),
196
+ "bedrooms": state.get("bedrooms"),
197
+ "bathrooms": state.get("bathrooms"),
198
+ "price": state.get("price"),
199
+ "price_type": state.get("price_type"),
200
+ "listing_type": state.get("listing_type"),
201
+ "amenities": state.get("amenities", []),
202
+ "amenities_with_icons": amenities_with_icons,
203
+ "requirements": state.get("requirements"),
204
+ "currency": state.get("currency", "XOF"),
205
+ "images": images,
206
+ "field_confidences": validation.get("field_validations", {}),
207
+ }
208
+
209
+ logger.info(
210
+ f"🎯 Draft generated",
211
+ extra={
212
+ "title": title,
213
+ "images": len(images),
214
+ "amenities": len(state.get("amenities", [])),
215
+ }
216
+ )
217
+
218
+ except Exception as e:
219
+ logger.error(f"❌ Failed to generate draft: {e}", exc_info=True)
220
+ state["ai_reply"] = "Sorry, I couldn't generate your draft. Please try again."
221
+ state["status"] = "error"
222
+ return state
223
+
224
+ # ===== BUILD PREVIEW MESSAGE =====
225
+ with trace_operation("build_preview_message"):
226
+ try:
227
+ images_section = ""
228
+ if images:
229
+ images_section = f"\nπŸ“· Images: {len(images)} uploaded\n"
230
+ for idx, img_url in enumerate(images[:3], 1):
231
+ images_section += f" {idx}. {img_url[:60]}...\n"
232
+ if len(images) > 3:
233
+ images_section += f" ... and {len(images) - 3} more\n"
234
+
235
+ preview_text = f"""
236
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
237
+ 🏠 LISTING PREVIEW
238
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
239
 
240
  **{draft_preview['title']}**
241
 
242
  πŸ“ Location: {draft_preview['location']}
243
+ πŸ› Bedrooms: {draft_preview['bedrooms']}
244
  🚿 Bathrooms: {draft_preview['bathrooms']}
245
  πŸ’° Price: {draft_preview['price']:,} {draft_preview['price_type']} ({draft_preview['currency']})
246
 
 
248
 
249
  ✨ Amenities: {draft_preview['amenities_with_icons'] if draft_preview['amenities_with_icons'] else 'None specified'}
250
  {images_section}
251
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
252
  """
253
+
254
+ if not images:
255
+ preview_text += """
256
  πŸ“Έ Upload property images to make your listing more attractive!
257
 
258
  Then say **publish** to make it live!
259
  """
260
+ state["status"] = "waiting_for_images"
261
+ logger.info("⏳ Waiting for images")
262
+ else:
263
+ preview_text += """
264
  βœ… Perfect! Say **publish** to make your listing live!
265
  """
266
+ state["status"] = "preview_shown"
267
+ logger.info("βœ… Preview ready for publishing")
268
+
269
+ state["draft_preview"] = draft_preview
270
+ state["ai_reply"] = preview_text
271
+
272
+ logger.info(
273
+ f"βœ… Draft node completed",
274
+ extra={"status": state["status"]}
275
+ )
276
+
277
+ return state
278
+
279
+ except Exception as e:
280
+ logger.error(f"❌ Failed to build preview: {e}", exc_info=True)
281
+ state["ai_reply"] = "Sorry, an error occurred preparing your listing. Please try again."
282
+ state["status"] = "error"
283
+ return state
app/ai/nodes/intent_node.py CHANGED
@@ -1,12 +1,19 @@
1
- # app/ai/nodes/intent_node.py – FIXED: Detect commands FIRST when preview is active
2
  import json
3
  import re
4
  from typing import Dict, List
 
 
5
  from tenacity import retry, stop_after_attempt, wait_exponential
6
- from structlog import get_logger
7
  from app.core.llm_router import call_llm_smart
8
  from app.core.context_manager import get_context_manager
9
- from app.config import settings
 
 
 
 
 
 
10
  from app.ml.models.ml_listing_extractor import get_ml_extractor
11
  from app.ai.nodes.draft_node import (
12
  _generate_title,
@@ -14,21 +21,30 @@ from app.ai.nodes.draft_node import (
14
  _add_amenity_icons,
15
  )
16
 
17
- logger = get_logger(__name__)
18
 
19
  MAX_TOKENS = 600
20
  TEMP = 0
21
 
22
  ml_extractor = get_ml_extractor()
23
 
24
- # ---------- helpers ----------
 
 
 
25
  def _load_system() -> str:
26
- with open("app/ai/prompts/system_prompt.txt", encoding="utf-8") as f:
27
- return f.read()
 
 
 
 
 
28
 
29
  SYSTEM_PROMPT = _load_system()
30
 
31
  def _clean_json(raw: str) -> str:
 
32
  cleaned = re.sub(r'```json\s*', '', raw)
33
  cleaned = re.sub(r'```\s*', '', cleaned)
34
  return cleaned.strip()
@@ -41,27 +57,25 @@ def _get_current_message(state: Dict) -> str:
41
  return full.strip()
42
 
43
  def _normalize_locations(location: str) -> str:
 
44
  if not location:
45
  return None
46
  loc_lower = location.lower().strip()
47
  location_map = {
48
- "lago": "lagos", "lgs": "lagos", "lag": "lagos",
49
- "cotnu": "cotonou", "cotonus": "cotonou", "cotou": "cotonou",
50
- "akpakpa": "akpakpa", "nairobi": "nairobi", "nbi": "nairobi",
51
- "accra": "accra", "acc": "accra", "joburg": "johannesburg",
52
- "jozi": "johannesburg", "london": "london", "paris": "paris",
53
  }
54
  return location_map.get(loc_lower, location.lower())
55
 
56
  def _normalize_amenities(amenities: list) -> list:
 
57
  if not amenities:
58
  return []
59
  amenity_map = {
60
- "balcno": "balcony", "balconny": "balcony", "parkng": "parking",
61
- "park": "parking", "furnisd": "furnished", "furnishd": "furnished",
62
- "furnish": "furnished", "ac": "air conditioning", "air cond": "air conditioning",
63
- "aircond": "air conditioning", "swiming": "pool", "kitchn": "kitchen",
64
- "gdn": "garden",
65
  }
66
  normalized = []
67
  for amenity in amenities:
@@ -74,32 +88,29 @@ def _normalize_amenities(amenities: list) -> list:
74
  return normalized
75
 
76
  def _normalize_price_type(price_type: str) -> str:
 
77
  if not price_type:
78
  return None
79
  pt_lower = price_type.lower().strip()
80
  price_type_map = {
81
- "montly": "monthly", "monthyl": "monthly", "mth": "monthly", "month": "monthly",
82
- "nightl": "nightly", "night": "nightly", "daily": "daily", "day": "daily",
83
- "weakly": "weekly", "weakyl": "weekly", "week": "weekly",
84
- "yr": "yearly", "year": "yearly", "annum": "yearly",
85
  }
86
  return price_type_map.get(pt_lower, pt_lower)
87
 
88
  def _normalize_listing_type(listing_type: str) -> str:
 
89
  if not listing_type:
90
  return None
91
  lt_lower = listing_type.lower().strip()
92
  listing_type_map = {
93
- "for rent": "rent", "rental": "rent",
94
- "short stay": "short-stay", "short-stay": "short-stay", "shortsta": "short-stay",
95
- "short stya": "short-stay", "stayover": "short-stay",
96
- "roommate": "roommate", "roommat": "roommate", "sharing": "roommate",
97
- "flatmate": "roommate", "shareflat": "roommate",
98
- "for sale": "sale", "selling": "sale", "sell": "sale",
99
  }
100
  return listing_type_map.get(lt_lower, lt_lower)
101
 
102
  def _get_missing_fields(data: Dict) -> List[str]:
 
103
  if data.get("intent") != "list":
104
  return []
105
  required = ["location", "bedrooms", "bathrooms", "price", "listing_type", "price_type"]
@@ -111,6 +122,7 @@ def _get_missing_fields(data: Dict) -> List[str]:
111
  return missing
112
 
113
  def _get_next_question(missing_fields: List[str]) -> str:
 
114
  if not missing_fields:
115
  return None
116
  next_field = missing_fields[0]
@@ -125,263 +137,246 @@ def _get_next_question(missing_fields: List[str]) -> str:
125
  return questions.get(next_field, "What else should I know?")
126
 
127
  def _build_draft_preview(data: dict) -> dict:
128
- """Return the same dict draft_node puts in state['draft_preview']."""
129
- title = _generate_title(data)
130
  description = _generate_description(data)
131
- icons = _add_amenity_icons(data.get("amenities", []))
132
- images = data.get("draft", {}).get("images", []) if isinstance(data.get("draft"), dict) else []
133
 
134
  return {
135
- "title" : title,
136
- "description" : description,
137
- "location" : data.get("location", "").title(),
138
- "bedrooms" : data.get("bedrooms"),
139
- "bathrooms" : data.get("bathrooms"),
140
- "price" : data.get("price"),
141
- "price_type" : data.get("price_type"),
142
- "listing_type" : data.get("listing_type"),
143
- "amenities" : data.get("amenities", []),
144
- "amenities_with_icons" : icons,
145
- "requirements" : data.get("requirements"),
146
- "currency" : data.get("currency", "XOF"),
147
- "images" : images,
148
- "field_confidences" : data.get("field_validations", {}),
149
  }
150
 
151
- # ---------- node ----------
152
- @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=5))
 
 
 
153
  async def intent_node(state: Dict) -> Dict:
154
  """
155
- LangGraph node:
156
- - FIRST: Check if preview is active and detect COMMANDS
157
- - THEN: Do normal intent extraction for new listings/searches
 
 
 
 
 
158
  """
 
159
  current_msg = _get_current_message(state).lower()
160
  status = state.get("status")
161
 
162
- # ===== CRITICAL: Handle commands FIRST when preview is active =====
163
- if status in {"preview_shown", "waiting_for_images"} and state.get("draft_preview"):
164
- logger.info(f"🎯 COMMAND DETECTION MODE: status={status}")
165
-
166
- # PUBLISH command
167
- if any(w in current_msg for w in {"publish", "go live", "post it", "list it", "confirm", "yes", "ok", "okay"}):
168
- logger.info("πŸ“€ COMMAND: publish – routing to publish_node")
169
- state["intent"] = "publish"
170
- state["ai_reply"] = "" # publish_node will fill this
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
171
  return state
172
-
173
- # EDIT command
174
- if "edit" in current_msg or "change" in current_msg or "update" in current_msg:
175
- field = current_msg.replace("edit", "").replace("change", "").replace("update", "").strip()
176
- logger.info(f"✏️ COMMAND: edit field='{field}'")
177
- state["status"] = "collecting"
178
- state["missing_fields"] = [field] if field else ["location"]
179
- state["ai_reply"] = f"Sure! What would you like to change for **{field}**?"
180
- return state
181
-
182
- # DISCARD command
183
- if any(w in current_msg for w in {"discard", "cancel", "delete", "remove", "no thanks"}):
184
- logger.info("πŸ—‘οΈ COMMAND: discard – clearing draft")
185
- state["status"] = None
186
- state["draft_preview"] = None
187
- state["intent"] = None
188
- state["ai_reply"] = "Draft discarded. Let me know if you want to list another property!"
189
- return state
190
-
191
- # SEARCH command while draft is active
192
- if any(w in current_msg for w in {"search", "find", "look for", "show me"}):
193
- logger.info("πŸ” COMMAND: search – switching to search mode")
194
- state["intent"] = "search"
195
- state["status"] = None
196
- # keep draft_preview intact so preview stays visible
197
  return state
198
-
199
- # Any other message while draft is active = casual chat
200
- logger.info("πŸ’¬ COMMAND: casual chat – keeping preview active")
201
- state["ai_reply"] = "I'm here to help! You can say **publish** to list, **edit** to change something, or ask me anything else."
202
- return state
203
-
204
- # ===== End command detection =====
205
-
206
- # ===== NORMAL FLOW: New listing or search intent =====
207
- if state.get("status") in ["published", "error"]:
208
- logger.info(f"⭐ Skipping intent_node, status={state.get('status')}")
209
- return state
210
-
211
- user_role = state["user_role"]
212
- user_id = state.get("user_id")
213
- human_msg = state["messages"][-1]["content"]
214
-
215
- prompt = SYSTEM_PROMPT.replace("{user_role}", user_role)
216
- messages = [
217
- {"role": "system", "content": prompt},
218
- {"role": "user", "content": human_msg},
219
- ]
220
-
221
- logger.info("πŸ€– Aida intent call with LLM router", user_role=user_role, msg=human_msg)
222
-
223
- try:
224
- # Use smart LLM routing instead of direct client call
225
- text, model_used, usage = await call_llm_smart(
226
- messages=messages,
227
- intent="extraction",
228
- max_tokens=MAX_TOKENS,
229
- temperature=TEMP,
230
- )
231
- raw = text.strip()
232
- logger.debug("πŸ€– LLM response received", model=model_used, usage=usage)
233
- except Exception as e:
234
- logger.error("❌ LLM call failed", exc_info=e)
235
- data = {"allowed": False, "ai_reply": "Sorry, I couldn't process that. Please try again."}
236
- state.update(allowed=False, ai_reply=data["ai_reply"])
237
- return state
238
-
239
- try:
240
- cleaned = _clean_json(raw)
241
- data = json.loads(cleaned)
242
- except json.JSONDecodeError as e:
243
- logger.error("❌ Aida bad json", raw=raw, exc_info=e)
244
- data = {"allowed": False, "ai_reply": "Sorry, I didn't understand that. Could you rephrase?"}
245
-
246
- # Handle LISTING INTENT with progressive collection
247
- if data.get("intent") == "list":
248
- data["allowed"] = True
249
- missing = _get_missing_fields(data)
250
-
251
- intro_and_example = """To list a property, here's how it's done:
252
-
253
- πŸ“ **Example:**
254
- "I have a 3-bedroom, 2-bathroom property in Cotonou for rent at 50,000 XOF per month. It has a balcony, kitchen, and dryer. It's fully furnished. Renters must pay a 3-month deposit."
255
-
256
- πŸ“Έ You can also upload property photos to make it more attractive!"""
257
-
258
- should_show_example = not any([
259
- data.get("bedrooms"), data.get("bathrooms"), data.get("price"),
260
- data.get("amenities"), data.get("requirements")
261
- ])
262
-
263
- if missing:
264
- data["status"] = "collecting"
265
- data["missing_fields"] = missing
266
- data["next_question"] = _get_next_question(missing)
267
- data["ai_reply"] = intro_and_example if should_show_example else _get_next_question(missing)
268
- else:
269
- # All required fields complete – move to draft_ready
270
- data["status"] = "draft_ready"
271
- data["missing_fields"] = []
272
- data["draft_preview"] = _build_draft_preview(data)
273
- data["ai_reply"] = "Perfect! Let me prepare your listing draft..."
274
- logger.info("βœ… All required fields complete, moving to draft_ready")
275
-
276
- # SEARCH is always allowed (role_gate_node will check)
277
- if data.get("intent") == "search":
278
- data["allowed"] = True
279
-
280
- # Normalize values
281
- location = _normalize_locations(data.get("location"))
282
- amenities = _normalize_amenities(data.get("amenities", []))
283
- price_type = _normalize_price_type(data.get("price_type"))
284
- listing_type = _normalize_listing_type(data.get("listing_type"))
285
-
286
- # SMART INFERENCE + ML VALIDATION with ERROR HANDLING
287
- if data.get("intent") == "list":
288
- location_input = data.get("location")
289
- if location_input:
290
  try:
291
- city, location_info = await ml_extractor.extract_location_from_address(location_input)
292
- if city:
293
- data["location"] = city
294
- data["location_details"] = location_info
295
- logger.info(f"βœ… Extracted city from address: {location_input} β†’ {city}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
296
  except Exception as e:
297
- logger.warning(f"⚠️ Failed to extract location: {e}")
298
-
299
- try:
300
- listing_type, listing_confidence = ml_extractor.infer_listing_type(
301
- data, user_role=user_role, user_message=human_msg
302
- )
303
- if listing_type:
304
- data["listing_type"] = listing_type
305
- data["listing_confidence"] = listing_confidence
306
- logger.info(f"βœ… Inferred listing_type: {listing_type} (confidence: {listing_confidence})")
307
- except Exception as e:
308
- logger.warning(f"⚠️ Failed to infer listing_type: {e}")
309
-
310
- try:
311
- currency, extracted_city, currency_confidence = await ml_extractor.infer_currency(data)
312
- if currency:
313
- data["currency"] = currency
314
- data["currency_confidence"] = currency_confidence
315
- if extracted_city:
316
- data["location"] = extracted_city
317
- logger.info(f"βœ… Inferred currency: {currency} (confidence: {currency_confidence})")
318
- except Exception as e:
319
- logger.warning(f"⚠️ Failed to infer currency: {e}")
320
- data["currency"] = data.get("currency", "XOF")
321
-
322
  try:
323
- validation_issues = []
324
- validation_suggestions = []
325
- field_validations = {}
326
- for field in ["location", "bedrooms", "bathrooms", "price", "price_type"]:
327
- value = data.get(field)
328
- if value is not None:
329
- result = ml_extractor.validate_field(field, value, human_msg, user_id)
330
- field_validations[field] = result
331
- if not result["is_valid"]:
332
- validation_issues.append(f"❌ {field}: {result['suggestion']}")
333
- logger.warning(f"Validation failed for {field}", suggestion=result["suggestion"])
334
- elif result["suggestion"]:
335
- validation_suggestions.append(f"πŸ’‘ {field}: {result['suggestion']}")
336
- data["field_validations"] = field_validations
337
- data["validation_suggestions"] = validation_suggestions
338
- if validation_issues:
339
- current_reply = data.get("ai_reply", "")
340
- data["ai_reply"] = current_reply + "\n\n" + "\n".join(validation_issues)
341
- logger.info("⚠️ ML validation issues found", issues=validation_issues)
342
- except Exception as e:
343
- logger.warning(f"⚠️ Failed to validate fields: {e}")
344
-
345
- intent_value = data.get("intent")
346
-
347
- # Update state with all fields
348
- state.update(
349
- allowed=data.get("allowed", False),
350
- status=data.get("status"),
351
- missing_fields=data.get("missing_fields", []),
352
- next_question=data.get("next_question"),
353
-
354
- # Listing fields
355
- listing_type=listing_type,
356
- location=location,
357
- bedrooms=data.get("bedrooms"),
358
- bathrooms=data.get("bathrooms"),
359
- price=data.get("price"),
360
- price_type=price_type,
361
- amenities=amenities,
362
- requirements=data.get("requirements"),
363
-
364
- # Search fields
365
- min_price=data.get("min_price"),
366
- max_price=data.get("max_price"),
367
-
368
- # ML fields
369
- field_validations=data.get("field_validations"),
370
- listing_confidence=data.get("listing_confidence"),
371
- currency_confidence=data.get("currency_confidence"),
372
- location_details=data.get("location_details"),
373
- validation_suggestions=data.get("validation_suggestions"),
374
-
375
- # Other
376
- currency=data.get("currency", "XOF"),
377
- ai_reply=data.get("ai_reply", ""),
378
- draft_preview=data.get("draft_preview"),
379
- )
380
-
381
- logger.info("πŸ‘€ Intent node processed",
382
- intent=intent_value,
383
- status=state.get("status"),
384
- missing_fields=state.get("missing_fields"),
385
- location=state.get("location"),
386
- amenities=state.get("amenities"))
387
- return state
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/ai/nodes/intent_node.py – FINAL: Multi-LLM routing + context mgmt + observability
2
  import json
3
  import re
4
  from typing import Dict, List
5
+ import logging
6
+
7
  from tenacity import retry, stop_after_attempt, wait_exponential
 
8
  from app.core.llm_router import call_llm_smart
9
  from app.core.context_manager import get_context_manager
10
+ from app.core.error_handling import (
11
+ async_retry,
12
+ RetryStrategy,
13
+ trace_operation,
14
+ LLMError,
15
+ )
16
+ from app.core.observability import get_token_tracker
17
  from app.ml.models.ml_listing_extractor import get_ml_extractor
18
  from app.ai.nodes.draft_node import (
19
  _generate_title,
 
21
  _add_amenity_icons,
22
  )
23
 
24
+ logger = logging.getLogger(__name__)
25
 
26
  MAX_TOKENS = 600
27
  TEMP = 0
28
 
29
  ml_extractor = get_ml_extractor()
30
 
31
+ # ============================================================
32
+ # Helpers
33
+ # ============================================================
34
+
35
  def _load_system() -> str:
36
+ """Load system prompt from file."""
37
+ try:
38
+ with open("app/ai/prompts/system_prompt.txt", encoding="utf-8") as f:
39
+ return f.read()
40
+ except FileNotFoundError:
41
+ logger.error("❌ System prompt file not found")
42
+ return "You are Aida, a helpful AI assistant."
43
 
44
  SYSTEM_PROMPT = _load_system()
45
 
46
  def _clean_json(raw: str) -> str:
47
+ """Clean JSON response by removing markdown artifacts."""
48
  cleaned = re.sub(r'```json\s*', '', raw)
49
  cleaned = re.sub(r'```\s*', '', cleaned)
50
  return cleaned.strip()
 
57
  return full.strip()
58
 
59
  def _normalize_locations(location: str) -> str:
60
+ """Normalize location names."""
61
  if not location:
62
  return None
63
  loc_lower = location.lower().strip()
64
  location_map = {
65
+ "lago": "lagos", "lgs": "lagos",
66
+ "cotnu": "cotonou", "cotonus": "cotonou",
67
+ "akpakpa": "akpakpa", "nairobi": "nairobi",
68
+ "accra": "accra", "joburg": "johannesburg",
 
69
  }
70
  return location_map.get(loc_lower, location.lower())
71
 
72
  def _normalize_amenities(amenities: list) -> list:
73
+ """Normalize amenity names."""
74
  if not amenities:
75
  return []
76
  amenity_map = {
77
+ "balcno": "balcony", "parkng": "parking",
78
+ "furnisd": "furnished", "ac": "air conditioning",
 
 
 
79
  }
80
  normalized = []
81
  for amenity in amenities:
 
88
  return normalized
89
 
90
  def _normalize_price_type(price_type: str) -> str:
91
+ """Normalize price type."""
92
  if not price_type:
93
  return None
94
  pt_lower = price_type.lower().strip()
95
  price_type_map = {
96
+ "montly": "monthly", "mth": "monthly",
97
+ "nightl": "nightly", "weakly": "weekly",
 
 
98
  }
99
  return price_type_map.get(pt_lower, pt_lower)
100
 
101
  def _normalize_listing_type(listing_type: str) -> str:
102
+ """Normalize listing type."""
103
  if not listing_type:
104
  return None
105
  lt_lower = listing_type.lower().strip()
106
  listing_type_map = {
107
+ "for rent": "rent", "short stay": "short-stay",
108
+ "for sale": "sale", "roommate": "roommate",
 
 
 
 
109
  }
110
  return listing_type_map.get(lt_lower, lt_lower)
111
 
112
  def _get_missing_fields(data: Dict) -> List[str]:
113
+ """Get missing required fields for listing."""
114
  if data.get("intent") != "list":
115
  return []
116
  required = ["location", "bedrooms", "bathrooms", "price", "listing_type", "price_type"]
 
122
  return missing
123
 
124
  def _get_next_question(missing_fields: List[str]) -> str:
125
+ """Get next question for missing field."""
126
  if not missing_fields:
127
  return None
128
  next_field = missing_fields[0]
 
137
  return questions.get(next_field, "What else should I know?")
138
 
139
  def _build_draft_preview(data: dict) -> dict:
140
+ """Build draft preview object."""
141
+ title = _generate_title(data)
142
  description = _generate_description(data)
143
+ icons = _add_amenity_icons(data.get("amenities", []))
144
+ images = data.get("draft", {}).get("images", []) if isinstance(data.get("draft"), dict) else []
145
 
146
  return {
147
+ "title": title,
148
+ "description": description,
149
+ "location": data.get("location", "").title(),
150
+ "bedrooms": data.get("bedrooms"),
151
+ "bathrooms": data.get("bathrooms"),
152
+ "price": data.get("price"),
153
+ "price_type": data.get("price_type"),
154
+ "listing_type": data.get("listing_type"),
155
+ "amenities": data.get("amenities", []),
156
+ "amenities_with_icons": icons,
157
+ "requirements": data.get("requirements"),
158
+ "currency": data.get("currency", "XOF"),
159
+ "images": images,
160
+ "field_confidences": data.get("field_validations", {}),
161
  }
162
 
163
+ # ============================================================
164
+ # Intent Node
165
+ # ============================================================
166
+
167
+ @async_retry(strategy=RetryStrategy.MODERATE, operation_name="intent_node")
168
  async def intent_node(state: Dict) -> Dict:
169
  """
170
+ LangGraph node: Extract and route user intent
171
+
172
+ Features:
173
+ - Command detection when preview active
174
+ - Smart LLM routing with auto-fallback
175
+ - Context window management
176
+ - ML validation and inference
177
+ - Full error handling and observability
178
  """
179
+
180
  current_msg = _get_current_message(state).lower()
181
  status = state.get("status")
182
 
183
+ with trace_operation(
184
+ "intent_node",
185
+ {
186
+ "status": status,
187
+ "has_draft": state.get("draft_preview") is not None,
188
+ }
189
+ ):
190
+ # ===== CRITICAL: Handle commands FIRST when preview is active =====
191
+ if status in {"preview_shown", "waiting_for_images"} and state.get("draft_preview"):
192
+ logger.info(f"🎯 COMMAND DETECTION MODE: status={status}")
193
+
194
+ # PUBLISH command
195
+ if any(w in current_msg for w in {"publish", "go live", "confirm", "yes", "ok"}):
196
+ logger.info("πŸ“€ COMMAND: publish")
197
+ state["intent"] = "publish"
198
+ state["ai_reply"] = ""
199
+ return state
200
+
201
+ # EDIT command
202
+ if "edit" in current_msg or "change" in current_msg or "update" in current_msg:
203
+ field = current_msg.replace("edit", "").replace("change", "").replace("update", "").strip()
204
+ logger.info(f"✏️ COMMAND: edit field='{field}'")
205
+ state["status"] = "collecting"
206
+ state["missing_fields"] = [field] if field else ["location"]
207
+ state["ai_reply"] = f"Sure! What would you like to change for **{field}**?"
208
+ return state
209
+
210
+ # DISCARD command
211
+ if any(w in current_msg for w in {"discard", "cancel", "delete", "no"}):
212
+ logger.info("πŸ—‘οΈ COMMAND: discard")
213
+ state["status"] = None
214
+ state["draft_preview"] = None
215
+ state["intent"] = None
216
+ state["ai_reply"] = "Draft discarded. Let me know if you want to list another property!"
217
+ return state
218
+
219
+ # CASUAL CHAT
220
+ logger.info("πŸ’¬ COMMAND: casual chat")
221
+ state["ai_reply"] = "Say **publish** to list, **edit** to change, or **discard** to start over."
222
  return state
223
+
224
+ # ===== End command detection =====
225
+
226
+ if state.get("status") in ["published", "error"]:
227
+ logger.info(f"⭐ Skipping intent_node, status={state.get('status')}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
228
  return state
229
+
230
+ user_role = state["user_role"]
231
+ user_id = state.get("user_id")
232
+ human_msg = state["messages"][-1]["content"]
233
+
234
+ # ===== LLM CALL WITH SMART ROUTING =====
235
+ with trace_operation("llm_call_with_routing"):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
236
  try:
237
+ # Manage context
238
+ context_mgr = get_context_manager()
239
+ messages = await context_mgr.manage_context([
240
+ {"role": "system", "content": SYSTEM_PROMPT.replace("{user_role}", user_role)},
241
+ {"role": "user", "content": human_msg},
242
+ ])
243
+
244
+ logger.info(f"πŸ€– Calling LLM with smart routing")
245
+
246
+ # Call LLM with smart routing
247
+ text, model_used, usage = await call_llm_smart(
248
+ messages,
249
+ intent=state.get("intent"),
250
+ temperature=TEMP,
251
+ max_tokens=MAX_TOKENS,
252
+ )
253
+
254
+ # Track tokens
255
+ tracker = get_token_tracker()
256
+ tracker.record_tokens(
257
+ model_used,
258
+ usage.get("prompt_tokens", 0),
259
+ usage.get("completion_tokens", 0),
260
+ )
261
+
262
+ logger.info(
263
+ f"βœ… LLM response from {model_used}",
264
+ extra={
265
+ "tokens": usage.get("total_tokens", 0),
266
+ "duration_ms": usage.get("duration_ms", 0),
267
+ }
268
+ )
269
+
270
+ raw = text
271
+
272
+ except LLMError as e:
273
+ logger.error(f"❌ LLM error: {e.message}")
274
+ state["ai_reply"] = "Sorry, I'm having trouble. Please try again."
275
+ state["status"] = "error"
276
+ return state
277
+
278
  except Exception as e:
279
+ logger.error(f"❌ Unexpected LLM error: {e}", exc_info=True)
280
+ raise
281
+
282
+ # ===== Parse JSON response =====
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
283
  try:
284
+ cleaned = _clean_json(raw)
285
+ data = json.loads(cleaned)
286
+ except json.JSONDecodeError as e:
287
+ logger.error(f"❌ Invalid JSON response: {raw[:100]}")
288
+ data = {"allowed": False, "ai_reply": "Sorry, I didn't understand that. Could you rephrase?"}
289
+
290
+ # ===== Handle LISTING INTENT =====
291
+ if data.get("intent") == "list":
292
+ data["allowed"] = True
293
+ missing = _get_missing_fields(data)
294
+
295
+ if missing:
296
+ data["status"] = "collecting"
297
+ data["missing_fields"] = missing
298
+ data["next_question"] = _get_next_question(missing)
299
+ else:
300
+ data["status"] = "draft_ready"
301
+ data["missing_fields"] = []
302
+ data["draft_preview"] = _build_draft_preview(data)
303
+ data["ai_reply"] = "Perfect! Let me prepare your listing draft..."
304
+ logger.info("βœ… All required fields complete")
305
+
306
+ # ===== Handle SEARCH INTENT =====
307
+ if data.get("intent") == "search":
308
+ data["allowed"] = True
309
+
310
+ # ===== Normalize values =====
311
+ location = _normalize_locations(data.get("location"))
312
+ amenities = _normalize_amenities(data.get("amenities", []))
313
+ price_type = _normalize_price_type(data.get("price_type"))
314
+ listing_type = _normalize_listing_type(data.get("listing_type"))
315
+
316
+ # ===== ML INFERENCE & VALIDATION =====
317
+ if data.get("intent") == "list":
318
+ with trace_operation("ml_processing"):
319
+ # Extract location
320
+ if data.get("location"):
321
+ try:
322
+ city, loc_info = await ml_extractor.extract_location_from_address(data["location"])
323
+ if city:
324
+ data["location"] = city
325
+ data["location_details"] = loc_info
326
+ logger.info(f"βœ… Location extracted: {data['location']}")
327
+ except Exception as e:
328
+ logger.warning(f"⚠️ Location extraction failed: {e}")
329
+
330
+ # Infer listing type
331
+ try:
332
+ lt, conf = ml_extractor.infer_listing_type(
333
+ data, user_role=user_role, user_message=human_msg
334
+ )
335
+ if lt:
336
+ data["listing_type"] = lt
337
+ data["listing_confidence"] = conf
338
+ logger.info(f"βœ… Listing type inferred: {lt}")
339
+ except Exception as e:
340
+ logger.warning(f"⚠️ Listing type inference failed: {e}")
341
+
342
+ # Infer currency
343
+ try:
344
+ currency, city, conf = await ml_extractor.infer_currency(data)
345
+ if currency:
346
+ data["currency"] = currency
347
+ data["currency_confidence"] = conf
348
+ logger.info(f"βœ… Currency inferred: {currency}")
349
+ except Exception as e:
350
+ logger.warning(f"⚠️ Currency inference failed: {e}")
351
+ data["currency"] = data.get("currency", "XOF")
352
+
353
+ # ===== Update state =====
354
+ state.update(
355
+ allowed=data.get("allowed", False),
356
+ status=data.get("status"),
357
+ missing_fields=data.get("missing_fields", []),
358
+ next_question=data.get("next_question"),
359
+ listing_type=listing_type,
360
+ location=location,
361
+ bedrooms=data.get("bedrooms"),
362
+ bathrooms=data.get("bathrooms"),
363
+ price=data.get("price"),
364
+ price_type=price_type,
365
+ amenities=amenities,
366
+ requirements=data.get("requirements"),
367
+ min_price=data.get("min_price"),
368
+ max_price=data.get("max_price"),
369
+ currency=data.get("currency", "XOF"),
370
+ ai_reply=data.get("ai_reply", ""),
371
+ draft_preview=data.get("draft_preview"),
372
+ )
373
+
374
+ logger.info(
375
+ f"πŸ‘€ Intent node processed",
376
+ extra={
377
+ "intent": data.get("intent"),
378
+ "status": state.get("status"),
379
+ }
380
+ )
381
+
382
+ return state
app/ai/nodes/search_node.py CHANGED
@@ -1,20 +1,27 @@
1
- # app/ai/nodes/search_node.py - Return UI-ready search results with error handling
2
- import json
3
  import httpx
4
  from typing import Dict, List
5
  from qdrant_client import AsyncQdrantClient, models
6
- from app.config import settings
7
- from structlog import get_logger
8
  from tenacity import retry, stop_after_attempt, wait_exponential
9
 
10
- logger = get_logger(__name__)
 
 
 
 
 
 
 
 
11
 
12
  EMBED_MODEL = "qwen/qwen3-embedding-8b"
13
  TOP_K = 6
14
 
15
- # ------------------------------------------------------------------
16
- # Qdrant client
17
- # ------------------------------------------------------------------
 
18
  qdrant_client = AsyncQdrantClient(
19
  url=settings.QDRANT_URL,
20
  api_key=settings.QDRANT_API_KEY,
@@ -22,20 +29,26 @@ qdrant_client = AsyncQdrantClient(
22
  timeout=60,
23
  )
24
 
25
- # ---------- error handling ----------
 
 
 
26
  class SearchError(Exception):
27
  """Base exception for search operations."""
28
  pass
29
 
30
- class VectorDBError(SearchError):
31
- """Qdrant/Vector DB error."""
32
- pass
33
-
34
  class EmbeddingError(SearchError):
35
  """Embedding generation error."""
36
  pass
37
 
38
- # ---------- helpers ----------
 
 
 
 
 
 
 
39
  def _build_filter(state: Dict) -> models.Filter:
40
  """Build comprehensive Qdrant filter from ALL search fields."""
41
  must = []
@@ -50,7 +63,7 @@ def _build_filter(state: Dict) -> models.Filter:
50
  )
51
  )
52
 
53
- # Price range filters (combine into single condition)
54
  if state.get("min_price") is not None or state.get("max_price") is not None:
55
  price_range = {}
56
  if state.get("min_price") is not None:
@@ -84,7 +97,7 @@ def _build_filter(state: Dict) -> models.Filter:
84
  )
85
  )
86
 
87
- # Price type filter (monthly, nightly, yearly, etc.)
88
  price_type = (state.get("price_type") or "").lower()
89
  if price_type:
90
  must.append(
@@ -94,7 +107,7 @@ def _build_filter(state: Dict) -> models.Filter:
94
  )
95
  )
96
 
97
- # Listing type filter (rent, short_stay, roommate, sale)
98
  listing_type = (state.get("listing_type") or "").lower()
99
  if listing_type:
100
  must.append(
@@ -104,7 +117,7 @@ def _build_filter(state: Dict) -> models.Filter:
104
  )
105
  )
106
 
107
- # Amenities filter (all mentioned amenities must exist in listing)
108
  amenities = state.get("amenities", [])
109
  if amenities:
110
  for amenity in amenities:
@@ -118,67 +131,94 @@ def _build_filter(state: Dict) -> models.Filter:
118
  )
119
 
120
  filt = models.Filter(must=must) if must else models.Filter()
121
- logger.info("πŸ” Filter built", must_conditions=len(must), location=loc,
122
- min_price=state.get("min_price"), max_price=state.get("max_price"),
123
- bedrooms=state.get("bedrooms"), bathrooms=state.get("bathrooms"),
124
- amenities=amenities, price_type=price_type, listing_type=listing_type)
 
 
 
 
 
 
125
  return filt
126
 
127
-
128
- @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=5))
129
  async def _embed(text: str) -> List[float]:
130
- """Call OpenRouter embedding endpoint with retry logic."""
131
- if not text or not text.strip():
132
- raise EmbeddingError("Empty text provided for embedding")
133
 
134
- payload = {
135
- "model": EMBED_MODEL,
136
- "input": text,
137
- "encoding_format": "float",
138
- }
139
- headers = {
140
- "Authorization": f"Bearer {settings.OPENROUTER_API_KEY}",
141
- "Content-Type": "application/json",
142
- "HTTP-Referer": "",
143
- "X-Title": "",
144
- }
145
 
146
- try:
147
- async with httpx.AsyncClient(timeout=60) as client:
148
- resp = await client.post(
149
- "https://openrouter.ai/api/v1/embeddings",
150
- headers=headers,
151
- json=payload,
152
- )
153
- resp.raise_for_status()
154
- data = resp.json()
155
- if not data.get("data"):
156
- raise EmbeddingError("Empty embedding response from API")
157
- return data["data"][0]["embedding"]
158
- except httpx.HTTPError as e:
159
- logger.error("❌ Embedding API HTTP error", exc_info=e)
160
- raise EmbeddingError(f"Failed to get embedding: {e}")
161
- except KeyError as e:
162
- logger.error("❌ Embedding API response malformed", exc_info=e)
163
- raise EmbeddingError(f"Malformed embedding response: {e}")
164
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
165
 
166
- # ---------- suggestion helpers ----------
167
- @retry(stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=1, max=3))
168
  async def _search_with_must(must: List, vector: List[float]):
169
  """Execute Qdrant search with retry logic."""
170
- try:
171
- return await qdrant_client.search(
172
- collection_name="listings",
173
- query_vector=vector,
174
- query_filter=models.Filter(must=must),
175
- limit=TOP_K,
176
- with_payload=True,
177
- )
178
- except Exception as e:
179
- logger.error("❌ Qdrant search failed", exc_info=e)
180
- raise VectorDBError(f"Search failed: {e}")
181
-
 
 
 
 
 
182
 
183
  def _add_price_range(must: List, state: Dict):
184
  """Add combined price range filter."""
@@ -190,199 +230,225 @@ def _add_price_range(must: List, state: Dict):
190
  price_range["lte"] = state["max_price"]
191
 
192
  if price_range:
193
- must.append(models.FieldCondition(key="price", range=models.Range(**price_range)))
194
-
 
 
 
 
195
 
196
- def _hits_to_cards(hits):
197
  """Convert Qdrant hits to UI cards."""
198
- return [
199
- {
200
- "id": hit.id,
201
- "title": hit.payload.get("title") or f"{hit.payload.get('bedrooms', '')}-bed {hit.payload.get('location', '')}",
202
- "location": hit.payload.get("location"),
203
- "price": hit.payload.get("price"),
204
- "price_type": hit.payload.get("price_type"),
205
- "bedrooms": hit.payload.get("bedrooms"),
206
- "bathrooms": hit.payload.get("bathrooms"),
207
- "amenities": hit.payload.get("amenities", []),
208
- "description": hit.payload.get("description"),
209
- "listing_type": hit.payload.get("listing_type"),
210
- "images": hit.payload.get("images", []), # βœ… Include images for UI
211
- "currency": hit.payload.get("currency", "XOF"),
212
- }
213
- for hit in hits
214
- ]
215
-
 
 
 
 
 
 
 
 
216
 
217
  async def _suggest_relaxed(state: Dict, vector: List[float]) -> List[dict]:
218
  """
219
  Loosen constraints progressively while keeping location strict.
220
- If no location specified, return empty.
221
  """
222
- loc = (state.get("location") or "").lower()
223
- br = state.get("bedrooms")
224
- amenities = state.get("amenities", [])
225
-
226
- # If no location specified, return empty
227
- if not loc:
228
- return []
229
 
230
- # Location is ALWAYS a hard constraint
231
- location_filter = models.FieldCondition(
232
- key="location_lower",
233
- match=models.MatchValue(value=loc)
234
- )
235
 
236
- # 1. Try with all filters intact
237
- try:
238
- must = [location_filter]
239
- if br is not None:
240
- must.append(models.FieldCondition(key="bedrooms", match=models.MatchValue(value=br)))
241
- _add_price_range(must, state)
242
- for amenity in amenities:
243
- must.append(models.FieldCondition(key="amenities", match=models.MatchValue(value=amenity.lower())))
244
- hits = await _search_with_must(must, vector)
245
- if hits:
246
- return _hits_to_cards(hits)
247
- except VectorDBError as e:
248
- logger.warning(f"⚠️ Search with all filters failed: {e}")
249
 
250
- # 2. Loosen amenities (remove optional ones)
251
- try:
252
- must = [location_filter]
253
- if br is not None:
254
- must.append(models.FieldCondition(key="bedrooms", match=models.MatchValue(value=br)))
255
- _add_price_range(must, state)
256
- hits = await _search_with_must(must, vector)
257
- if hits:
258
- return _hits_to_cards(hits)
259
- except VectorDBError as e:
260
- logger.warning(f"⚠️ Search without amenities failed: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
261
 
262
- # 3. Loosen bedrooms Β±1, keep location strict
263
- if br is not None:
264
  try:
 
265
  must = [location_filter]
266
- new_br = br - 1 if br > 1 else br + 1
267
- must.append(models.FieldCondition(key="bedrooms", match=models.MatchValue(value=new_br)))
 
 
 
 
 
268
  _add_price_range(must, state)
269
  hits = await _search_with_must(must, vector)
270
  if hits:
 
271
  return _hits_to_cards(hits)
272
- except VectorDBError as e:
273
- logger.warning(f"⚠️ Search with adjusted bedrooms failed: {e}")
274
 
275
- # 4. Loosen price +25%, keep location strict
276
- try:
277
- must = [location_filter]
278
  if br is not None:
279
- must.append(models.FieldCondition(key="bedrooms", match=models.MatchValue(value=br)))
280
- if state.get("max_price") is not None:
281
- relaxed_max = int(state["max_price"] * 1.25)
282
- must.append(models.FieldCondition(key="price", range=models.Range(lte=relaxed_max)))
283
- else:
284
- _add_price_range(must, state)
285
- hits = await _search_with_must(must, vector)
286
- if hits:
287
- return _hits_to_cards(hits)
288
- except VectorDBError as e:
289
- logger.warning(f"⚠️ Search with relaxed price failed: {e}")
 
 
 
 
 
 
290
 
291
- return []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
292
 
 
 
 
293
 
294
- # ---------- node ----------
295
  async def search_node(state: Dict) -> Dict:
296
  """
297
- LangGraph node: comprehensive search with all filters
298
- - title/description (semantic via embedding)
299
- - location, price range, bedrooms, bathrooms, amenities (keyword filters)
300
- - price_type, listing_type
301
 
302
- βœ… RETURNS: search_results + search_preview (UI-ready)
303
- βœ… ERROR HANDLING: Graceful fallback on Qdrant/embedding failures
 
 
 
 
304
  """
305
- query = state.get("search_query", "") or state["messages"][-1]["content"]
306
 
307
- # Get embedding with error handling
308
- try:
309
- vector = await _embed(query)
310
- logger.info("βœ… Embedding generated successfully")
311
- except EmbeddingError as e:
312
- logger.error("❌ Failed to generate embedding", exc_info=e)
313
- state["ai_reply"] = "Sorry, I couldn't process your search right now. Please try again."
314
- state["search_preview"] = {
315
- "type": "search_results",
316
- "count": 0,
317
- "query": query,
318
- "filters": {},
319
- "results": [],
320
- "message": "Search temporarily unavailable"
321
- }
322
- state["search_results"] = []
323
- return state
324
-
325
- # Build filter
326
- filt = _build_filter(state)
327
-
328
- logger.info("πŸ” Searching Qdrant", query=query, filter=str(filt))
329
-
330
- # Execute search with error handling
331
- try:
332
- hits = await _search_with_must(filt.must if filt.must else [], vector)
333
- logger.info("πŸ“ Qdrant search result", hits_count=len(hits))
334
- except VectorDBError as e:
335
- logger.error("❌ Qdrant search failed", exc_info=e)
336
- state["ai_reply"] = "Sorry, I'm having trouble searching right now. Please try again."
337
- state["search_preview"] = {
338
- "type": "search_results",
339
- "count": 0,
340
- "query": query,
341
- "filters": {
342
- "location": state.get("location"),
343
- "min_price": state.get("min_price"),
344
- "max_price": state.get("max_price"),
345
- "bedrooms": state.get("bedrooms"),
346
- "bathrooms": state.get("bathrooms"),
347
- "price_type": state.get("price_type"),
348
- "listing_type": state.get("listing_type"),
349
- "amenities": state.get("amenities", []),
350
- },
351
- "results": [],
352
- "message": "Search service temporarily unavailable"
353
  }
354
- state["search_results"] = []
355
- return state
356
-
357
- cards = _hits_to_cards(hits)
358
-
359
- # --- personalize zero-hit reply + suggestions + UI preview
360
- if not cards:
361
- location = state.get("location") or "that area"
362
- bedrooms = state.get("bedrooms")
363
- price_bit = (
364
- " in your price range"
365
- if state.get("min_price") is not None or state.get("max_price") is not None
366
- else ""
367
- )
368
- br_bit = f" with {bedrooms} bedrooms" if bedrooms else ""
369
- amenities_bit = f" with {', '.join(state.get('amenities', []))}" if state.get("amenities") else ""
370
-
371
  try:
372
- suggestions = await _suggest_relaxed(state, vector)
373
- logger.info("πŸ’‘ Suggestions generated", count=len(suggestions))
374
- except Exception as e:
375
- logger.warning(f"⚠️ Failed to generate suggestions: {e}")
376
- suggestions = []
377
-
378
- if suggestions:
379
- state["ai_reply"] = (
380
- f"I found no exact match for your request, "
381
- f"but you might like these similar options:"
382
- )
383
- state["search_preview"] = { # βœ… NEW: UI-ready preview for suggestions
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
384
  "type": "search_results",
385
- "count": len(suggestions),
386
  "query": query,
387
  "filters": {
388
  "location": state.get("location"),
@@ -390,22 +456,77 @@ async def search_node(state: Dict) -> Dict:
390
  "max_price": state.get("max_price"),
391
  "bedrooms": state.get("bedrooms"),
392
  "bathrooms": state.get("bathrooms"),
393
- "price_type": state.get("price_type"),
394
- "listing_type": state.get("listing_type"),
395
  "amenities": state.get("amenities", []),
396
  },
397
- "results": suggestions,
398
- "message": "Similar options available"
 
399
  }
400
- state["search_results"] = suggestions
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
401
  else:
402
- state["ai_reply"] = (
403
- f"I found no property in {location}{price_bit}{br_bit}{amenities_bit}. "
404
- "Try widening your search or check back later!"
405
- )
406
- state["search_preview"] = { # βœ… Empty UI state
407
  "type": "search_results",
408
- "count": 0,
409
  "query": query,
410
  "filters": {
411
  "location": state.get("location"),
@@ -417,30 +538,10 @@ async def search_node(state: Dict) -> Dict:
417
  "listing_type": state.get("listing_type"),
418
  "amenities": state.get("amenities", []),
419
  },
420
- "results": [],
421
- "message": "No results found"
422
  }
423
- else:
424
- state["ai_reply"] = f"Here are {len(cards)} places I found for you:"
425
- # βœ… NEW: Return UI-ready search preview
426
- state["search_preview"] = {
427
- "type": "search_results",
428
- "count": len(cards),
429
- "query": query,
430
- "filters": {
431
- "location": state.get("location"),
432
- "min_price": state.get("min_price"),
433
- "max_price": state.get("max_price"),
434
- "bedrooms": state.get("bedrooms"),
435
- "bathrooms": state.get("bathrooms"),
436
- "price_type": state.get("price_type"),
437
- "listing_type": state.get("listing_type"),
438
- "amenities": state.get("amenities", []),
439
- },
440
- "results": cards,
441
- "message": f"Found {len(cards)} listings"
442
- }
443
- state["search_results"] = cards
444
-
445
- logger.info("βœ… Search node finished", query=query, count=len(cards))
446
- return state
 
1
+ # app/ai/nodes/search_node.py - FINAL: Complete error handling + retry logic + observability
2
+ import logging
3
  import httpx
4
  from typing import Dict, List
5
  from qdrant_client import AsyncQdrantClient, models
 
 
6
  from tenacity import retry, stop_after_attempt, wait_exponential
7
 
8
+ from app.config import settings
9
+ from app.core.error_handling import (
10
+ async_retry,
11
+ RetryStrategy,
12
+ trace_operation,
13
+ VectorDBError,
14
+ )
15
+
16
+ logger = logging.getLogger(__name__)
17
 
18
  EMBED_MODEL = "qwen/qwen3-embedding-8b"
19
  TOP_K = 6
20
 
21
+ # ============================================================
22
+ # Qdrant Client
23
+ # ============================================================
24
+
25
  qdrant_client = AsyncQdrantClient(
26
  url=settings.QDRANT_URL,
27
  api_key=settings.QDRANT_API_KEY,
 
29
  timeout=60,
30
  )
31
 
32
+ # ============================================================
33
+ # Custom Exceptions
34
+ # ============================================================
35
+
36
  class SearchError(Exception):
37
  """Base exception for search operations."""
38
  pass
39
 
 
 
 
 
40
  class EmbeddingError(SearchError):
41
  """Embedding generation error."""
42
  pass
43
 
44
+ class QdrantSearchError(SearchError):
45
+ """Qdrant search error."""
46
+ pass
47
+
48
+ # ============================================================
49
+ # Helpers
50
+ # ============================================================
51
+
52
  def _build_filter(state: Dict) -> models.Filter:
53
  """Build comprehensive Qdrant filter from ALL search fields."""
54
  must = []
 
63
  )
64
  )
65
 
66
+ # Price range filters
67
  if state.get("min_price") is not None or state.get("max_price") is not None:
68
  price_range = {}
69
  if state.get("min_price") is not None:
 
97
  )
98
  )
99
 
100
+ # Price type filter
101
  price_type = (state.get("price_type") or "").lower()
102
  if price_type:
103
  must.append(
 
107
  )
108
  )
109
 
110
+ # Listing type filter
111
  listing_type = (state.get("listing_type") or "").lower()
112
  if listing_type:
113
  must.append(
 
117
  )
118
  )
119
 
120
+ # Amenities filter
121
  amenities = state.get("amenities", [])
122
  if amenities:
123
  for amenity in amenities:
 
131
  )
132
 
133
  filt = models.Filter(must=must) if must else models.Filter()
134
+ logger.info(
135
+ "πŸ” Filter built",
136
+ extra={
137
+ "conditions": len(must),
138
+ "location": loc,
139
+ "price_range": f"{state.get('min_price')}-{state.get('max_price')}",
140
+ "bedrooms": state.get("bedrooms"),
141
+ "amenities": len(amenities),
142
+ }
143
+ )
144
  return filt
145
 
146
+ @async_retry(strategy=RetryStrategy.MODERATE, operation_name="embedding_generation")
 
147
  async def _embed(text: str) -> List[float]:
148
+ """
149
+ Generate embedding with retry logic and error handling.
 
150
 
151
+ Raises:
152
+ EmbeddingError: If embedding generation fails
153
+ """
 
 
 
 
 
 
 
 
154
 
155
+ with trace_operation("embedding_generation", {"text_length": len(text)}):
156
+ if not text or not text.strip():
157
+ logger.error("❌ Empty text provided for embedding")
158
+ raise EmbeddingError("Empty text provided for embedding")
159
+
160
+ payload = {
161
+ "model": EMBED_MODEL,
162
+ "input": text,
163
+ "encoding_format": "float",
164
+ }
165
+ headers = {
166
+ "Authorization": f"Bearer {settings.OPENROUTER_API_KEY}",
167
+ "Content-Type": "application/json",
168
+ "HTTP-Referer": "",
169
+ "X-Title": "",
170
+ }
171
+
172
+ try:
173
+ async with httpx.AsyncClient(timeout=60) as client:
174
+ logger.info("πŸ€– Calling embedding API")
175
+ resp = await client.post(
176
+ "https://openrouter.ai/api/v1/embeddings",
177
+ headers=headers,
178
+ json=payload,
179
+ timeout=60,
180
+ )
181
+ resp.raise_for_status()
182
+
183
+ data = resp.json()
184
+ if not data.get("data"):
185
+ logger.error("❌ Empty embedding response")
186
+ raise EmbeddingError("Empty embedding response from API")
187
+
188
+ embedding = data["data"][0]["embedding"]
189
+ logger.info(f"βœ… Embedding generated, dimension={len(embedding)}")
190
+ return embedding
191
+
192
+ except httpx.HTTPError as e:
193
+ logger.error(f"❌ Embedding API HTTP error: {e}", exc_info=True)
194
+ raise EmbeddingError(f"HTTP error calling embedding API: {e}")
195
+ except KeyError as e:
196
+ logger.error(f"❌ Embedding response malformed: {e}", exc_info=True)
197
+ raise EmbeddingError(f"Malformed embedding response: {e}")
198
+ except Exception as e:
199
+ logger.error(f"❌ Unexpected embedding error: {e}", exc_info=True)
200
+ raise EmbeddingError(f"Unexpected error generating embedding: {e}")
201
 
202
+ @async_retry(strategy=RetryStrategy.MODERATE, operation_name="qdrant_search")
 
203
  async def _search_with_must(must: List, vector: List[float]):
204
  """Execute Qdrant search with retry logic."""
205
+
206
+ with trace_operation("qdrant_search_execution", {"filter_count": len(must)}):
207
+ try:
208
+ logger.info("πŸ”Ž Executing Qdrant search")
209
+ hits = await qdrant_client.search(
210
+ collection_name="listings",
211
+ query_vector=vector,
212
+ query_filter=models.Filter(must=must),
213
+ limit=TOP_K,
214
+ with_payload=True,
215
+ )
216
+ logger.info(f"βœ… Qdrant search returned {len(hits)} results")
217
+ return hits
218
+
219
+ except Exception as e:
220
+ logger.error(f"❌ Qdrant search failed: {e}", exc_info=True)
221
+ raise QdrantSearchError(f"Qdrant search failed: {e}")
222
 
223
  def _add_price_range(must: List, state: Dict):
224
  """Add combined price range filter."""
 
230
  price_range["lte"] = state["max_price"]
231
 
232
  if price_range:
233
+ must.append(
234
+ models.FieldCondition(
235
+ key="price",
236
+ range=models.Range(**price_range)
237
+ )
238
+ )
239
 
240
+ def _hits_to_cards(hits) -> List[dict]:
241
  """Convert Qdrant hits to UI cards."""
242
+ cards = []
243
+ for hit in hits:
244
+ try:
245
+ card = {
246
+ "id": hit.id,
247
+ "title": (
248
+ hit.payload.get("title") or
249
+ f"{hit.payload.get('bedrooms', '')}-bed {hit.payload.get('location', '')}"
250
+ ),
251
+ "location": hit.payload.get("location"),
252
+ "price": hit.payload.get("price"),
253
+ "price_type": hit.payload.get("price_type"),
254
+ "bedrooms": hit.payload.get("bedrooms"),
255
+ "bathrooms": hit.payload.get("bathrooms"),
256
+ "amenities": hit.payload.get("amenities", []),
257
+ "description": hit.payload.get("description"),
258
+ "listing_type": hit.payload.get("listing_type"),
259
+ "images": hit.payload.get("images", []),
260
+ "currency": hit.payload.get("currency", "XOF"),
261
+ }
262
+ cards.append(card)
263
+ except Exception as e:
264
+ logger.warning(f"⚠️ Failed to convert hit to card: {e}")
265
+ continue
266
+
267
+ return cards
268
 
269
  async def _suggest_relaxed(state: Dict, vector: List[float]) -> List[dict]:
270
  """
271
  Loosen constraints progressively while keeping location strict.
272
+ Returns relaxed search results if exact match not found.
273
  """
274
+
275
+ with trace_operation("suggest_relaxed", {"location": state.get("location")}):
276
+ loc = (state.get("location") or "").lower()
277
+ br = state.get("bedrooms")
278
+ amenities = state.get("amenities", [])
 
 
279
 
280
+ # If no location specified, return empty
281
+ if not loc:
282
+ logger.warning("⚠️ No location specified for suggestion")
283
+ return []
 
284
 
285
+ # Location is ALWAYS a hard constraint
286
+ location_filter = models.FieldCondition(
287
+ key="location_lower",
288
+ match=models.MatchValue(value=loc)
289
+ )
 
 
 
 
 
 
 
 
290
 
291
+ # Try progressively looser constraints
292
+
293
+ # 1. All filters
294
+ try:
295
+ logger.info("πŸ”„ Trying search with all filters")
296
+ must = [location_filter]
297
+ if br is not None:
298
+ must.append(
299
+ models.FieldCondition(
300
+ key="bedrooms",
301
+ match=models.MatchValue(value=br)
302
+ )
303
+ )
304
+ _add_price_range(must, state)
305
+ for amenity in amenities:
306
+ must.append(
307
+ models.FieldCondition(
308
+ key="amenities",
309
+ match=models.MatchValue(value=amenity.lower())
310
+ )
311
+ )
312
+ hits = await _search_with_must(must, vector)
313
+ if hits:
314
+ logger.info("βœ… Found results with all filters")
315
+ return _hits_to_cards(hits)
316
+ except Exception as e:
317
+ logger.warning(f"⚠️ Search with all filters failed: {e}")
318
 
319
+ # 2. No amenities
 
320
  try:
321
+ logger.info("πŸ”„ Trying search without amenities")
322
  must = [location_filter]
323
+ if br is not None:
324
+ must.append(
325
+ models.FieldCondition(
326
+ key="bedrooms",
327
+ match=models.MatchValue(value=br)
328
+ )
329
+ )
330
  _add_price_range(must, state)
331
  hits = await _search_with_must(must, vector)
332
  if hits:
333
+ logger.info("βœ… Found results without amenities")
334
  return _hits_to_cards(hits)
335
+ except Exception as e:
336
+ logger.warning(f"⚠️ Search without amenities failed: {e}")
337
 
338
+ # 3. Adjust bedrooms Β±1
 
 
339
  if br is not None:
340
+ try:
341
+ logger.info("πŸ”„ Trying search with adjusted bedrooms")
342
+ must = [location_filter]
343
+ new_br = br - 1 if br > 1 else br + 1
344
+ must.append(
345
+ models.FieldCondition(
346
+ key="bedrooms",
347
+ match=models.MatchValue(value=new_br)
348
+ )
349
+ )
350
+ _add_price_range(must, state)
351
+ hits = await _search_with_must(must, vector)
352
+ if hits:
353
+ logger.info(f"βœ… Found results with {new_br} bedrooms")
354
+ return _hits_to_cards(hits)
355
+ except Exception as e:
356
+ logger.warning(f"⚠️ Search with adjusted bedrooms failed: {e}")
357
 
358
+ # 4. Relax price +25%
359
+ try:
360
+ logger.info("πŸ”„ Trying search with relaxed price")
361
+ must = [location_filter]
362
+ if br is not None:
363
+ must.append(
364
+ models.FieldCondition(
365
+ key="bedrooms",
366
+ match=models.MatchValue(value=br)
367
+ )
368
+ )
369
+ if state.get("max_price") is not None:
370
+ relaxed_max = int(state["max_price"] * 1.25)
371
+ must.append(
372
+ models.FieldCondition(
373
+ key="price",
374
+ range=models.Range(lte=relaxed_max)
375
+ )
376
+ )
377
+ else:
378
+ _add_price_range(must, state)
379
+ hits = await _search_with_must(must, vector)
380
+ if hits:
381
+ logger.info("βœ… Found results with relaxed price")
382
+ return _hits_to_cards(hits)
383
+ except Exception as e:
384
+ logger.warning(f"⚠️ Search with relaxed price failed: {e}")
385
+
386
+ logger.warning("⚠️ No results found even with relaxed criteria")
387
+ return []
388
 
389
+ # ============================================================
390
+ # Search Node
391
+ # ============================================================
392
 
 
393
  async def search_node(state: Dict) -> Dict:
394
  """
395
+ LangGraph node: Comprehensive search with all filters
 
 
 
396
 
397
+ Features:
398
+ - Semantic search via embeddings
399
+ - Keyword filters (location, price, bedrooms, etc.)
400
+ - Error handling with graceful fallback
401
+ - Suggestion system for zero-hit scenarios
402
+ - UI-ready response format
403
  """
 
404
 
405
+ with trace_operation(
406
+ "search_node",
407
+ {
408
+ "location": state.get("location"),
409
+ "bedrooms": state.get("bedrooms"),
410
+ "min_price": state.get("min_price"),
411
+ "max_price": state.get("max_price"),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
412
  }
413
+ ):
414
+ query = state.get("search_query", "") or state["messages"][-1]["content"]
415
+ logger.info(f"πŸ” Starting search for: {query[:100]}")
416
+
417
+ # ===== Generate embedding =====
 
 
 
 
 
 
 
 
 
 
 
 
418
  try:
419
+ with trace_operation("embed_query"):
420
+ vector = await _embed(query)
421
+ logger.info("βœ… Query embedded successfully")
422
+ except EmbeddingError as e:
423
+ logger.error(f"❌ Embedding failed: {e}")
424
+ state["ai_reply"] = "Sorry, I couldn't process your search right now. Please try again."
425
+ state["search_preview"] = {
426
+ "type": "search_results",
427
+ "count": 0,
428
+ "query": query,
429
+ "filters": {},
430
+ "results": [],
431
+ "message": "Search temporarily unavailable",
432
+ "error": "embedding_failed",
433
+ }
434
+ state["search_results"] = []
435
+ return state
436
+
437
+ # ===== Build filter =====
438
+ filt = _build_filter(state)
439
+
440
+ # ===== Execute search =====
441
+ try:
442
+ with trace_operation("execute_search"):
443
+ logger.info("πŸ”Ž Executing search")
444
+ hits = await _search_with_must(filt.must if filt.must else [], vector)
445
+ logger.info(f"βœ… Search returned {len(hits)} results")
446
+ except QdrantSearchError as e:
447
+ logger.error(f"❌ Search failed: {e}")
448
+ state["ai_reply"] = "Sorry, I'm having trouble searching right now. Please try again."
449
+ state["search_preview"] = {
450
  "type": "search_results",
451
+ "count": 0,
452
  "query": query,
453
  "filters": {
454
  "location": state.get("location"),
 
456
  "max_price": state.get("max_price"),
457
  "bedrooms": state.get("bedrooms"),
458
  "bathrooms": state.get("bathrooms"),
 
 
459
  "amenities": state.get("amenities", []),
460
  },
461
+ "results": [],
462
+ "message": "Search service temporarily unavailable",
463
+ "error": "search_failed",
464
  }
465
+ state["search_results"] = []
466
+ return state
467
+
468
+ cards = _hits_to_cards(hits)
469
+
470
+ # ===== Handle zero-hit scenario =====
471
+ if not cards:
472
+ logger.info("ℹ️ No exact matches found, generating suggestions")
473
+ location = state.get("location") or "that area"
474
+
475
+ try:
476
+ suggestions = await _suggest_relaxed(state, vector)
477
+ except Exception as e:
478
+ logger.warning(f"⚠️ Failed to generate suggestions: {e}")
479
+ suggestions = []
480
+
481
+ if suggestions:
482
+ state["ai_reply"] = (
483
+ f"I found no exact match for your request, "
484
+ f"but you might like these similar options:"
485
+ )
486
+ state["search_preview"] = {
487
+ "type": "search_results",
488
+ "count": len(suggestions),
489
+ "query": query,
490
+ "filters": {
491
+ "location": state.get("location"),
492
+ "min_price": state.get("min_price"),
493
+ "max_price": state.get("max_price"),
494
+ "bedrooms": state.get("bedrooms"),
495
+ "bathrooms": state.get("bathrooms"),
496
+ "price_type": state.get("price_type"),
497
+ "listing_type": state.get("listing_type"),
498
+ "amenities": state.get("amenities", []),
499
+ },
500
+ "results": suggestions,
501
+ "message": "Similar options available",
502
+ }
503
+ state["search_results"] = suggestions
504
+ else:
505
+ state["ai_reply"] = f"I found no property in {location}. Try widening your search or check back later!"
506
+ state["search_preview"] = {
507
+ "type": "search_results",
508
+ "count": 0,
509
+ "query": query,
510
+ "filters": {
511
+ "location": state.get("location"),
512
+ "min_price": state.get("min_price"),
513
+ "max_price": state.get("max_price"),
514
+ "bedrooms": state.get("bedrooms"),
515
+ "bathrooms": state.get("bathrooms"),
516
+ "price_type": state.get("price_type"),
517
+ "listing_type": state.get("listing_type"),
518
+ "amenities": state.get("amenities", []),
519
+ },
520
+ "results": [],
521
+ "message": "No results found",
522
+ }
523
+ state["search_results"] = []
524
  else:
525
+ logger.info(f"βœ… Found {len(cards)} results")
526
+ state["ai_reply"] = f"Here are {len(cards)} places I found for you:"
527
+ state["search_preview"] = {
 
 
528
  "type": "search_results",
529
+ "count": len(cards),
530
  "query": query,
531
  "filters": {
532
  "location": state.get("location"),
 
538
  "listing_type": state.get("listing_type"),
539
  "amenities": state.get("amenities", []),
540
  },
541
+ "results": cards,
542
+ "message": f"Found {len(cards)} listings",
543
  }
544
+ state["search_results"] = cards
545
+
546
+ logger.info("βœ… Search node completed")
547
+ return state
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/ai/routes/chat.py CHANGED
@@ -1,15 +1,27 @@
1
- # app/ai/routes/chat.py - Return search_preview + draft_preview
2
- from fastapi import APIRouter, Depends, HTTPException
3
  from fastapi.security import HTTPBearer
4
  from pydantic import BaseModel
5
  from typing import Optional, List
 
 
 
6
  from app.guards.jwt_guard import decode_access_token
7
  from app.ai.service import aida_chat_sync
8
- from app.ai.memory.redis_memory import is_rate_limited
 
 
 
 
 
9
 
10
  router = APIRouter()
11
  security = HTTPBearer()
12
 
 
 
 
 
13
  class MessageHistory(BaseModel):
14
  role: str # 'user' or 'assistant'
15
  content: str
@@ -22,41 +34,235 @@ class AskBody(BaseModel):
22
  user_role: Optional[str] = None
23
  history: Optional[List[MessageHistory]] = None
24
 
 
 
 
25
 
26
  @router.post("/ask")
 
27
  async def ask_ai(
28
  body: AskBody,
 
29
  token: str = Depends(security),
30
  ):
31
- payload = decode_access_token(token.credentials)
32
- if not payload:
33
- raise HTTPException(status_code=401, detail="Invalid token")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
 
35
- if await is_rate_limited(payload["user_id"]):
36
- raise HTTPException(status_code=429, detail="Rate limit exceeded")
37
-
38
- # Build conversation context from history
39
- conversation_context = ""
40
- if body.history:
41
- for msg in body.history:
42
- role = "User" if msg.role == "user" else "Assistant"
43
- conversation_context += f"{role}: {msg.content}\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
 
45
- # Combine context with current message
46
- full_message = body.message
47
- if conversation_context:
48
- full_message = f"Previous conversation:\n{conversation_context}\nNow the user says: {body.message}"
49
-
50
- final_state = await aida_chat_sync(
51
- payload["user_id"],
52
- payload["role"],
53
- full_message,
54
- )
55
-
56
- # Òœ… RETURN FULL STATE - text + cards + draft + search preview
57
  return {
58
- "text": final_state.get("ai_reply", ""),
59
- "cards": final_state.get("search_results", []),
60
- "draft_preview": final_state.get("draft_preview"), # For listing preview
61
- "search_preview": final_state.get("search_preview"), # Òœ… NEW: For search results UI
 
 
 
 
 
62
  }
 
1
+ # app/ai/routes/chat.py - Enhanced with Observability & Rate Limiting
2
+ from fastapi import APIRouter, Depends, HTTPException, Request
3
  from fastapi.security import HTTPBearer
4
  from pydantic import BaseModel
5
  from typing import Optional, List
6
+ import logging
7
+ import time
8
+
9
  from app.guards.jwt_guard import decode_access_token
10
  from app.ai.service import aida_chat_sync
11
+ from app.core.rate_limiter import get_rate_limiter, RateLimitExceeded
12
+ from app.core.observability import trace_operation, get_token_tracker
13
+ from app.core.error_handling import handle_errors, async_retry, RetryStrategy
14
+ from app.core.context_manager import get_message_window
15
+
16
+ logger = logging.getLogger(__name__)
17
 
18
  router = APIRouter()
19
  security = HTTPBearer()
20
 
21
+ # ============================================================
22
+ # Models
23
+ # ============================================================
24
+
25
  class MessageHistory(BaseModel):
26
  role: str # 'user' or 'assistant'
27
  content: str
 
34
  user_role: Optional[str] = None
35
  history: Optional[List[MessageHistory]] = None
36
 
37
+ # ============================================================
38
+ # Enhanced Chat Endpoint
39
+ # ============================================================
40
 
41
  @router.post("/ask")
42
+ @handle_errors(default_return={"success": False, "error": "Internal server error"})
43
  async def ask_ai(
44
  body: AskBody,
45
+ request: Request,
46
  token: str = Depends(security),
47
  ):
48
+ """
49
+ Enhanced chat endpoint with:
50
+ - Rate limiting (token bucket)
51
+ - Distributed tracing
52
+ - Token tracking
53
+ - Error handling with observability
54
+ - Context management
55
+ """
56
+
57
+ start_time = time.time()
58
+ request_id = request.headers.get("x-request-id", "unknown")
59
+ ip_address = request.client.host if request.client else "unknown"
60
+
61
+ with trace_operation(
62
+ "chat_endpoint",
63
+ {
64
+ "request_id": request_id,
65
+ "ip_address": ip_address,
66
+ "message_length": len(body.message),
67
+ }
68
+ ) as root_span:
69
+ try:
70
+ # ===== Step 1: Validate Token =====
71
+ with trace_operation("token_validation"):
72
+ payload = decode_access_token(token.credentials)
73
+ if not payload:
74
+ raise HTTPException(status_code=401, detail="Invalid token")
75
+
76
+ user_id = payload["user_id"]
77
+ user_role = payload.get("role", "renter")
78
+
79
+ # ===== Step 2: Rate Limiting =====
80
+ with trace_operation(
81
+ "rate_limit_check",
82
+ {"user_id": user_id, "operation": "chat"}
83
+ ):
84
+ rate_limiter = get_rate_limiter()
85
+
86
+ is_allowed, rate_info = await rate_limiter.is_allowed(
87
+ user_id=user_id,
88
+ operation="chat",
89
+ ip_address=ip_address,
90
+ )
91
+
92
+ if not is_allowed:
93
+ logger.warning(
94
+ f"🚫 Rate limit exceeded for user: {user_id}",
95
+ extra={"rate_info": rate_info}
96
+ )
97
+ raise RateLimitExceeded(retry_after=60)
98
+
99
+ # Add rate limit headers
100
+ root_span.set_attribute("rate_limit.remaining", rate_info["user"]["remaining"])
101
+ root_span.set_attribute("rate_limit.capacity", rate_info["user"]["capacity"])
102
+
103
+ # ===== Step 3: Context Management =====
104
+ with trace_operation("context_management", {"user_id": user_id}):
105
+ window = get_message_window(user_id)
106
+
107
+ # Build conversation context from history (if provided)
108
+ conversation_context = ""
109
+ if body.history:
110
+ for msg in body.history:
111
+ role = "User" if msg.role == "user" else "Assistant"
112
+ conversation_context += f"{role}: {msg.content}\n"
113
+
114
+ # Combine context with current message
115
+ full_message = body.message
116
+ if conversation_context:
117
+ full_message = (
118
+ f"Previous conversation:\n{conversation_context}\n"
119
+ f"Now the user says: {body.message}"
120
+ )
121
+
122
+ # Add to message window
123
+ window.add_message("user", full_message)
124
+
125
+ # ===== Step 4: AI Chat Processing =====
126
+ with trace_operation(
127
+ "aida_chat_sync",
128
+ {
129
+ "user_id": user_id,
130
+ "user_role": user_role,
131
+ "message_length": len(full_message),
132
+ }
133
+ ):
134
+ final_state = await aida_chat_sync(
135
+ user_id,
136
+ user_role,
137
+ full_message,
138
+ )
139
+
140
+ # ===== Step 5: Token Tracking =====
141
+ with trace_operation("token_tracking"):
142
+ # Track tokens if available
143
+ usage = final_state.get("token_usage", {})
144
+ if usage:
145
+ tracker = get_token_tracker()
146
+ model_used = final_state.get("model_used", "unknown")
147
+ tracker.record_tokens(
148
+ model_used,
149
+ usage.get("prompt_tokens", 0),
150
+ usage.get("completion_tokens", 0),
151
+ usage.get("cost", 0.0),
152
+ )
153
+
154
+ # ===== Step 6: Build Response =====
155
+ response = {
156
+ "success": True,
157
+ "text": final_state.get("ai_reply", ""),
158
+ "cards": final_state.get("search_results", []),
159
+ "draft_preview": final_state.get("draft_preview"),
160
+ "search_preview": final_state.get("search_preview"),
161
+ "metadata": {
162
+ "request_id": request_id,
163
+ "processing_time_ms": int((time.time() - start_time) * 1000),
164
+ "user_id": user_id,
165
+ "status": final_state.get("status"),
166
+ },
167
+ }
168
+
169
+ # ===== Step 7: Add Message to Window =====
170
+ with trace_operation("window_update"):
171
+ window.add_message("assistant", final_state.get("ai_reply", ""))
172
+
173
+ # Set root span attributes
174
+ root_span.set_attributes({
175
+ "response.status": "success",
176
+ "response.has_cards": len(response["cards"]) > 0,
177
+ "response.has_draft": response["draft_preview"] is not None,
178
+ "processing_time_ms": response["metadata"]["processing_time_ms"],
179
+ })
180
+
181
+ logger.info(
182
+ f"βœ… Chat processed successfully",
183
+ extra={
184
+ "user_id": user_id,
185
+ "request_id": request_id,
186
+ "processing_time_ms": response["metadata"]["processing_time_ms"],
187
+ "has_cards": len(response["cards"]) > 0,
188
+ }
189
+ )
190
+
191
+ return response
192
+
193
+ except RateLimitExceeded as e:
194
+ root_span.set_attribute("error.type", "rate_limit_exceeded")
195
+ logger.error(f"🚫 Rate limit: {str(e)}")
196
+ raise HTTPException(
197
+ status_code=429,
198
+ detail=e.message,
199
+ headers={"Retry-After": str(e.retry_after)},
200
+ )
201
+
202
+ except HTTPException:
203
+ raise
204
+
205
+ except Exception as e:
206
+ root_span.record_exception(e)
207
+ root_span.set_attribute("error.type", type(e).__name__)
208
+ logger.error(
209
+ f"❌ Chat endpoint error: {str(e)}",
210
+ exc_info=True,
211
+ extra={"user_id": user_id if 'user_id' in locals() else "unknown"}
212
+ )
213
+ raise HTTPException(
214
+ status_code=500,
215
+ detail="An error occurred processing your request",
216
+ )
217
+
218
+ # ============================================================
219
+ # Rate Limit Status Endpoint
220
+ # ============================================================
221
+
222
+ @router.get("/rate-limit-status")
223
+ async def get_rate_limit_status(
224
+ token: str = Depends(security),
225
+ ) -> dict:
226
+ """Get current rate limit status for user"""
227
 
228
+ with trace_operation("rate_limit_status"):
229
+ payload = decode_access_token(token.credentials)
230
+ if not payload:
231
+ raise HTTPException(status_code=401, detail="Invalid token")
232
+
233
+ user_id = payload["user_id"]
234
+ rate_limiter = get_rate_limiter()
235
+
236
+ stats = await rate_limiter.get_usage_stats(user_id)
237
+
238
+ return {
239
+ "success": True,
240
+ "data": stats,
241
+ "operations": {
242
+ "chat": {"cost": 1},
243
+ "search": {"cost": 2},
244
+ "list": {"cost": 3},
245
+ "publish": {"cost": 5},
246
+ "upload_image": {"cost": 4},
247
+ },
248
+ }
249
+
250
+ # ============================================================
251
+ # Health Check with Rate Limiter
252
+ # ============================================================
253
+
254
+ @router.get("/health")
255
+ async def chat_health() -> dict:
256
+ """Health check for chat service"""
257
 
 
 
 
 
 
 
 
 
 
 
 
 
258
  return {
259
+ "service": "aida-chat",
260
+ "status": "healthy",
261
+ "rate_limiting": "enabled",
262
+ "features": {
263
+ "distributed_tracing": True,
264
+ "token_tracking": True,
265
+ "context_management": True,
266
+ "error_resilience": True,
267
+ },
268
  }
app/ai/service.py CHANGED
@@ -1,142 +1,254 @@
1
- # app/ai/service.py – stateful version with context management
2
  import json
3
  from typing import AsyncGenerator, Dict, Any
 
 
4
  from app.ai.graph import agent
5
  from app.ai.memory.redis_memory import load_history, save_turn, load_state, save_state
6
  from app.ai.state import ChatState
7
- from app.core.context_manager import get_context_manager, MessageWindowExceeded
8
- from structlog import get_logger
 
 
 
9
 
10
- logger = get_logger(__name__)
 
 
11
 
12
- # --------------------------------------------------
13
- # WebSocket streaming entry-point
14
- # --------------------------------------------------
15
  async def aida_chat(
16
  user_id: str,
17
  user_role: str,
18
  human_msg: str,
19
  ) -> AsyncGenerator[str, None]:
20
  """
21
- Streaming chat endpoint with context management.
22
- βœ… Uses context manager for intelligent message windowing
 
 
 
 
 
 
23
  """
24
- try:
25
- # Initialize context manager for this user
26
- context_mgr = get_context_manager()
27
-
28
- # Load message history from Redis
29
- messages = await load_history(user_id)
30
-
31
- # Add user message to history
32
- messages.append({"role": "user", "content": human_msg})
33
-
34
- # Manage context window (drop old messages if needed)
35
  try:
36
- managed_messages = await context_mgr.manage_context(messages)
37
- logger.info("βœ… Context window managed", total_msgs=len(messages), managed=len(managed_messages))
38
- except MessageWindowExceeded as e:
39
- logger.warning(f"⚠️ Message window exceeded: {e}, using truncated history")
40
- managed_messages = messages[-20:] # Keep last 20 messages as fallback
41
-
42
- # Restore previous state or start fresh
43
- saved = await load_state(user_id)
44
- state: ChatState = {
45
- "user_id": user_id,
46
- "user_role": user_role,
47
- "messages": managed_messages,
48
- "draft": saved.get("draft"),
49
- "vector_meta": saved.get("vector_meta"),
50
- "allowed": saved.get("allowed", True),
51
- "ai_reply": saved.get("ai_reply", ""),
52
- "status": saved.get("status"),
53
- "missing_fields": saved.get("missing_fields", []),
54
- "next_question": saved.get("next_question"),
55
- "location": saved.get("location"),
56
- "min_price": saved.get("min_price"),
57
- "max_price": saved.get("max_price"),
58
- "bedrooms": saved.get("bedrooms"),
59
- "bathrooms": saved.get("bathrooms"),
60
- "amenities": saved.get("amenities", []),
61
- "listing_type": saved.get("listing_type"),
62
- "price": saved.get("price"),
63
- "price_type": saved.get("price_type"),
64
- "currency": saved.get("currency", "XOF"),
65
- "requirements": saved.get("requirements"),
66
- "search_query": saved.get("search_query"),
67
- "search_results": saved.get("search_results"),
68
- "search_preview": saved.get("search_preview"),
69
- "suggestions": saved.get("suggestions", []),
70
- "image": saved.get("image"),
71
- "field_validations": saved.get("field_validations"),
72
- "field_confidences": saved.get("field_confidences"),
73
- "location_details": saved.get("location_details"),
74
- "validation_suggestions": saved.get("validation_suggestions", []),
75
- "listing_confidence": saved.get("listing_confidence"),
76
- "currency_confidence": saved.get("currency_confidence"),
77
- "draft_preview": saved.get("draft_preview"),
78
- "mongo_id": saved.get("mongo_id"),
79
- }
80
-
81
- logger.info("πŸš€ Starting aida_chat stream", user_id=user_id, user_role=user_role)
82
-
83
- # Stream responses from agent
84
- async for step in agent.astream(state):
85
- for node_name, update in step.items():
86
- if update.get("ai_reply"):
87
- logger.debug(f"πŸ“€ Streaming from {node_name}")
88
- yield json.dumps({"node": node_name, "text": update["ai_reply"]}) + "\n"
89
-
90
- # Final invocation to get complete state
91
- final_state = await agent.ainvoke(state)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
 
93
- # Update message history with assistant response
94
- managed_messages.append({"role": "assistant", "content": final_state["ai_reply"]})
95
-
96
- # Persist conversation and state
97
- await save_turn(user_id, managed_messages)
98
- await save_state(user_id, final_state)
99
-
100
- logger.info("βœ… aida_chat stream completed", user_id=user_id)
101
 
102
- except Exception as e:
103
- logger.error("❌ aida_chat error", exc_info=e)
104
- yield json.dumps({"node": "error", "text": "Sorry, something went wrong. Please try again."}) + "\n"
105
 
106
-
107
- # --------------------------------------------------
108
- # REST (non-streaming) – returns the full state dict
109
- # --------------------------------------------------
 
 
110
  async def aida_chat_sync(
111
  user_id: str,
112
  user_role: str,
113
  human_msg: str,
114
  ) -> Dict[str, Any]:
115
  """
116
- Synchronous chat endpoint with full context management.
117
- βœ… Manages message context window
118
- βœ… Returns complete state for REST clients
 
 
 
 
 
119
  """
120
- try:
121
- # Initialize context manager for this user
122
- context_mgr = get_context_manager()
 
 
 
 
 
 
 
 
 
 
123
 
124
- # Load message history from Redis
125
- messages = await load_history(user_id)
 
 
 
 
 
 
126
 
127
- # Add user message to history
128
  messages.append({"role": "user", "content": human_msg})
129
 
130
- # Manage context window (drop old messages if needed)
131
- try:
132
- managed_messages = await context_mgr.manage_context(messages)
133
- logger.info("βœ… Context window managed", total_msgs=len(messages), managed=len(managed_messages))
134
- except MessageWindowExceeded as e:
135
- logger.warning(f"⚠️ Message window exceeded: {e}, using truncated history")
136
- managed_messages = messages[-20:] # Keep last 20 messages as fallback
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
 
138
- # Restore previous state or start fresh
139
- saved = await load_state(user_id)
140
  state: ChatState = {
141
  "user_id": user_id,
142
  "user_role": user_role,
@@ -173,63 +285,126 @@ async def aida_chat_sync(
173
  "draft_preview": saved.get("draft_preview"),
174
  "mongo_id": saved.get("mongo_id"),
175
  }
176
-
177
- logger.info("πŸš€ Starting aida_chat_sync", user_id=user_id, user_role=user_role)
178
-
179
- # Invoke agent with complete state
180
- final_state = await agent.ainvoke(state)
181
 
182
- # Update message history with assistant response
183
- managed_messages.append({"role": "assistant", "content": final_state["ai_reply"]})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
184
 
185
- # Persist conversation and state
186
- await save_turn(user_id, managed_messages)
187
- await save_state(user_id, final_state)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
188
 
189
- logger.info("βœ… aida_chat_sync completed", user_id=user_id, status=final_state.get("status"))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190
 
191
  # Return the entire state so the route can pick text + cards + preview
192
  return final_state
193
 
194
- except Exception as e:
195
- logger.error("❌ aida_chat_sync error", exc_info=e)
196
- return {
197
- "ai_reply": "Sorry, something went wrong. Please try again.",
198
- "status": "error",
199
- "search_preview": None,
200
- "draft_preview": None,
201
- }
202
-
203
 
204
- # --------------------------------------------------
205
- # Health check / debugging endpoint
206
- # --------------------------------------------------
207
  async def get_conversation_context(user_id: str) -> Dict[str, Any]:
208
  """
209
- Get current conversation context for a user.
210
- βœ… Returns managed message window and current state
 
211
  """
212
- try:
213
- context_mgr = get_context_manager()
214
- messages = await load_history(user_id)
215
- saved = await load_state(user_id)
216
-
217
- # Manage context to show what will be sent to agent
218
  try:
219
- managed = await context_mgr.manage_context(messages)
220
- except MessageWindowExceeded:
221
- managed = messages[-20:]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
222
 
223
- return {
224
- "user_id": user_id,
225
- "total_messages": len(messages),
226
- "managed_messages": len(managed),
227
- "current_status": saved.get("status"),
228
- "intent": saved.get("intent"),
229
- "draft_preview": saved.get("draft_preview") is not None,
230
- "search_results_count": len(saved.get("search_results", [])),
231
- "message_sample": managed[-1]["content"][:100] if managed else None,
232
- }
233
- except Exception as e:
234
- logger.error("❌ Failed to get conversation context", exc_info=e)
235
- return {"error": str(e)}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/ai/service.py – Complete with context management + error handling
2
  import json
3
  from typing import AsyncGenerator, Dict, Any
4
+ import logging
5
+
6
  from app.ai.graph import agent
7
  from app.ai.memory.redis_memory import load_history, save_turn, load_state, save_state
8
  from app.ai.state import ChatState
9
+ from app.core.context_manager import get_context_manager, MessageWindow
10
+ from app.core.error_handling import trace_operation, handle_errors
11
+ from app.core.observability import get_token_tracker
12
+
13
+ logger = logging.getLogger(__name__)
14
 
15
+ # ============================================================
16
+ # WebSocket Streaming Entry Point
17
+ # ============================================================
18
 
 
 
 
19
  async def aida_chat(
20
  user_id: str,
21
  user_role: str,
22
  human_msg: str,
23
  ) -> AsyncGenerator[str, None]:
24
  """
25
+ Streaming chat endpoint with full context management and error handling.
26
+
27
+ Features:
28
+ - Context window management (prevents overflow)
29
+ - Message window persistence per user
30
+ - Error recovery with fallbacks
31
+ - Token tracking
32
+ - Full observability
33
  """
34
+
35
+ with trace_operation("aida_chat_stream", {"user_id": user_id, "user_role": user_role}):
 
 
 
 
 
 
 
 
 
36
  try:
37
+ logger.info(
38
+ "πŸš€ Starting aida_chat stream",
39
+ extra={"user_id": user_id, "user_role": user_role}
40
+ )
41
+
42
+ # ===== Load message history =====
43
+ with trace_operation("load_history"):
44
+ try:
45
+ messages = await load_history(user_id)
46
+ logger.info(f"βœ… Loaded {len(messages)} messages from history")
47
+ except Exception as e:
48
+ logger.warning(f"⚠️ Failed to load history: {e}, starting fresh")
49
+ messages = []
50
+
51
+ # ===== Add user message =====
52
+ messages.append({"role": "user", "content": human_msg})
53
+
54
+ # ===== Manage context window =====
55
+ with trace_operation("context_management"):
56
+ try:
57
+ context_mgr = get_context_manager()
58
+ managed_messages = await context_mgr.manage_context(messages)
59
+ logger.info(
60
+ f"βœ… Context managed",
61
+ extra={
62
+ "total_messages": len(messages),
63
+ "managed_messages": len(managed_messages),
64
+ }
65
+ )
66
+ except Exception as e:
67
+ logger.warning(f"⚠️ Context management failed: {e}, using last 20 messages")
68
+ managed_messages = messages[-20:]
69
+
70
+ # ===== Load previous state =====
71
+ with trace_operation("load_state"):
72
+ try:
73
+ saved = await load_state(user_id)
74
+ logger.info(f"βœ… Loaded previous state, status={saved.get('status')}")
75
+ except Exception as e:
76
+ logger.warning(f"⚠️ Failed to load state: {e}, starting fresh")
77
+ saved = {}
78
+
79
+ # ===== Build chat state =====
80
+ state: ChatState = {
81
+ "user_id": user_id,
82
+ "user_role": user_role,
83
+ "messages": managed_messages,
84
+ "draft": saved.get("draft"),
85
+ "vector_meta": saved.get("vector_meta"),
86
+ "allowed": saved.get("allowed", True),
87
+ "ai_reply": saved.get("ai_reply", ""),
88
+ "status": saved.get("status"),
89
+ "missing_fields": saved.get("missing_fields", []),
90
+ "next_question": saved.get("next_question"),
91
+ "location": saved.get("location"),
92
+ "min_price": saved.get("min_price"),
93
+ "max_price": saved.get("max_price"),
94
+ "bedrooms": saved.get("bedrooms"),
95
+ "bathrooms": saved.get("bathrooms"),
96
+ "amenities": saved.get("amenities", []),
97
+ "listing_type": saved.get("listing_type"),
98
+ "price": saved.get("price"),
99
+ "price_type": saved.get("price_type"),
100
+ "currency": saved.get("currency", "XOF"),
101
+ "requirements": saved.get("requirements"),
102
+ "search_query": saved.get("search_query"),
103
+ "search_results": saved.get("search_results"),
104
+ "search_preview": saved.get("search_preview"),
105
+ "suggestions": saved.get("suggestions", []),
106
+ "image": saved.get("image"),
107
+ "field_validations": saved.get("field_validations"),
108
+ "field_confidences": saved.get("field_confidences"),
109
+ "location_details": saved.get("location_details"),
110
+ "validation_suggestions": saved.get("validation_suggestions", []),
111
+ "listing_confidence": saved.get("listing_confidence"),
112
+ "currency_confidence": saved.get("currency_confidence"),
113
+ "draft_preview": saved.get("draft_preview"),
114
+ "mongo_id": saved.get("mongo_id"),
115
+ }
116
+
117
+ # ===== Stream responses from agent =====
118
+ with trace_operation("agent_stream"):
119
+ try:
120
+ async for step in agent.astream(state):
121
+ for node_name, update in step.items():
122
+ if update.get("ai_reply"):
123
+ logger.debug(f"πŸ“€ Streaming from {node_name}")
124
+ yield json.dumps({
125
+ "node": node_name,
126
+ "text": update["ai_reply"]
127
+ }) + "\n"
128
+ except Exception as e:
129
+ logger.error(f"❌ Agent stream error: {e}", exc_info=True)
130
+ yield json.dumps({
131
+ "node": "error",
132
+ "text": "An error occurred processing your request. Please try again."
133
+ }) + "\n"
134
+ return
135
+
136
+ # ===== Get final state =====
137
+ with trace_operation("agent_invoke"):
138
+ try:
139
+ final_state = await agent.ainvoke(state)
140
+ logger.info(f"βœ… Agent invocation complete, status={final_state.get('status')}")
141
+ except Exception as e:
142
+ logger.error(f"❌ Agent invoke error: {e}", exc_info=True)
143
+ yield json.dumps({
144
+ "node": "error",
145
+ "text": "Sorry, something went wrong. Please try again."
146
+ }) + "\n"
147
+ return
148
+
149
+ # ===== Save results =====
150
+ with trace_operation("save_results"):
151
+ try:
152
+ # Update message history
153
+ managed_messages.append({
154
+ "role": "assistant",
155
+ "content": final_state.get("ai_reply", "")
156
+ })
157
+
158
+ # Persist conversation and state
159
+ await save_turn(user_id, managed_messages)
160
+ await save_state(user_id, final_state)
161
+
162
+ logger.info(f"βœ… Results saved for user {user_id}")
163
+ except Exception as e:
164
+ logger.error(f"❌ Failed to save results: {e}")
165
+
166
+ logger.info("βœ… aida_chat stream completed successfully")
167
 
168
+ except Exception as e:
169
+ logger.error(f"❌ Unexpected error in aida_chat: {e}", exc_info=True)
170
+ yield json.dumps({
171
+ "node": "error",
172
+ "text": "An unexpected error occurred. Please try again."
173
+ }) + "\n"
 
 
174
 
175
+ # ============================================================
176
+ # REST Synchronous Entry Point
177
+ # ============================================================
178
 
179
+ @handle_errors(default_return={
180
+ "ai_reply": "Sorry, something went wrong. Please try again.",
181
+ "status": "error",
182
+ "search_preview": None,
183
+ "draft_preview": None,
184
+ })
185
  async def aida_chat_sync(
186
  user_id: str,
187
  user_role: str,
188
  human_msg: str,
189
  ) -> Dict[str, Any]:
190
  """
191
+ Synchronous chat endpoint for REST clients.
192
+
193
+ Features:
194
+ - Full context management
195
+ - Error resilience
196
+ - Token tracking
197
+ - Complete state return
198
+ - Observability integration
199
  """
200
+
201
+ with trace_operation(
202
+ "aida_chat_sync",
203
+ {
204
+ "user_id": user_id,
205
+ "user_role": user_role,
206
+ "message_length": len(human_msg),
207
+ }
208
+ ):
209
+ logger.info(
210
+ "πŸš€ Starting aida_chat_sync",
211
+ extra={"user_id": user_id, "user_role": user_role}
212
+ )
213
 
214
+ # ===== Load message history =====
215
+ with trace_operation("load_history"):
216
+ try:
217
+ messages = await load_history(user_id)
218
+ logger.info(f"βœ… Loaded {len(messages)} messages from history")
219
+ except Exception as e:
220
+ logger.warning(f"⚠️ Failed to load history: {e}, starting fresh")
221
+ messages = []
222
 
223
+ # ===== Add user message =====
224
  messages.append({"role": "user", "content": human_msg})
225
 
226
+ # ===== Manage context window =====
227
+ with trace_operation("context_management"):
228
+ try:
229
+ context_mgr = get_context_manager()
230
+ managed_messages = await context_mgr.manage_context(messages)
231
+ logger.info(
232
+ f"βœ… Context managed",
233
+ extra={
234
+ "total_messages": len(messages),
235
+ "managed_messages": len(managed_messages),
236
+ }
237
+ )
238
+ except Exception as e:
239
+ logger.warning(f"⚠️ Context management failed: {e}, using last 20 messages")
240
+ managed_messages = messages[-20:]
241
+
242
+ # ===== Load previous state =====
243
+ with trace_operation("load_state"):
244
+ try:
245
+ saved = await load_state(user_id)
246
+ logger.info(f"βœ… Loaded previous state, status={saved.get('status')}")
247
+ except Exception as e:
248
+ logger.warning(f"⚠️ Failed to load state: {e}, starting fresh")
249
+ saved = {}
250
 
251
+ # ===== Build chat state =====
 
252
  state: ChatState = {
253
  "user_id": user_id,
254
  "user_role": user_role,
 
285
  "draft_preview": saved.get("draft_preview"),
286
  "mongo_id": saved.get("mongo_id"),
287
  }
 
 
 
 
 
288
 
289
+ # ===== Invoke agent =====
290
+ with trace_operation("agent_invoke"):
291
+ try:
292
+ final_state = await agent.ainvoke(state)
293
+ logger.info(
294
+ f"βœ… Agent invocation complete",
295
+ extra={"status": final_state.get("status")}
296
+ )
297
+ except Exception as e:
298
+ logger.error(f"❌ Agent invoke error: {e}", exc_info=True)
299
+ return {
300
+ "ai_reply": "Sorry, an error occurred processing your request.",
301
+ "status": "error",
302
+ "search_preview": None,
303
+ "draft_preview": None,
304
+ }
305
 
306
+ # ===== Save results =====
307
+ with trace_operation("save_results"):
308
+ try:
309
+ # Update message history
310
+ managed_messages.append({
311
+ "role": "assistant",
312
+ "content": final_state.get("ai_reply", "")
313
+ })
314
+
315
+ # Persist conversation and state
316
+ await save_turn(user_id, managed_messages)
317
+ await save_state(user_id, final_state)
318
+
319
+ logger.info(f"βœ… Results saved for user {user_id}")
320
+ except Exception as e:
321
+ logger.error(f"⚠️ Failed to save results: {e}")
322
+ # Don't fail the response, just log the error
323
 
324
+ # ===== Track tokens if available =====
325
+ with trace_operation("token_tracking"):
326
+ try:
327
+ usage = final_state.get("token_usage", {})
328
+ if usage:
329
+ tracker = get_token_tracker()
330
+ model_used = final_state.get("model_used", "unknown")
331
+ tracker.record_tokens(
332
+ model_used,
333
+ usage.get("prompt_tokens", 0),
334
+ usage.get("completion_tokens", 0),
335
+ usage.get("cost", 0.0),
336
+ )
337
+ logger.info(f"βœ… Tokens tracked", extra={"model": model_used, "usage": usage})
338
+ except Exception as e:
339
+ logger.warning(f"⚠️ Failed to track tokens: {e}")
340
+
341
+ logger.info(f"βœ… aida_chat_sync completed successfully")
342
 
343
  # Return the entire state so the route can pick text + cards + preview
344
  return final_state
345
 
346
+ # ============================================================
347
+ # Debugging / Context Inspection
348
+ # ============================================================
 
 
 
 
 
 
349
 
 
 
 
350
  async def get_conversation_context(user_id: str) -> Dict[str, Any]:
351
  """
352
+ Get current conversation context for debugging/inspection.
353
+
354
+ Returns managed message window and current state stats.
355
  """
356
+
357
+ with trace_operation("get_conversation_context", {"user_id": user_id}):
 
 
 
 
358
  try:
359
+ # Load history and state
360
+ messages = await load_history(user_id)
361
+ saved = await load_state(user_id)
362
+
363
+ # Manage context to show what will be sent to agent
364
+ context_mgr = get_context_manager()
365
+ try:
366
+ managed = await context_mgr.manage_context(messages)
367
+ except Exception as e:
368
+ logger.warning(f"⚠️ Context management failed: {e}")
369
+ managed = messages[-20:] if messages else []
370
+
371
+ return {
372
+ "user_id": user_id,
373
+ "total_messages": len(messages),
374
+ "managed_messages": len(managed),
375
+ "current_status": saved.get("status"),
376
+ "current_intent": saved.get("intent"),
377
+ "has_draft": saved.get("draft_preview") is not None,
378
+ "search_results_count": len(saved.get("search_results", [])),
379
+ "message_sample": managed[-1]["content"][:100] if managed else None,
380
+ "timestamp": managed[-1].get("timestamp") if managed else None,
381
+ }
382
 
383
+ except Exception as e:
384
+ logger.error(f"❌ Failed to get conversation context: {e}", exc_info=True)
385
+ return {
386
+ "error": str(e),
387
+ "user_id": user_id,
388
+ }
389
+
390
+ # ============================================================
391
+ # Health Check
392
+ # ============================================================
393
+
394
+ async def health_check_chat_service() -> Dict[str, Any]:
395
+ """
396
+ Health check for chat service.
397
+ """
398
+
399
+ return {
400
+ "service": "aida-chat",
401
+ "status": "healthy",
402
+ "features": {
403
+ "context_management": True,
404
+ "error_handling": True,
405
+ "token_tracking": True,
406
+ "observability": True,
407
+ "streaming": True,
408
+ "sync": True,
409
+ },
410
+ }
app/api/endpoints/monitoring.py ADDED
@@ -0,0 +1,354 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ============================================================
2
+ # app/api/endpoints/monitoring.py - Observability & Monitoring
3
+ # ============================================================
4
+
5
+ from fastapi import APIRouter, Depends, HTTPException
6
+ from typing import Optional, Dict, Any
7
+ import logging
8
+ from datetime import datetime, timedelta
9
+
10
+ from app.guards.jwt_guard import get_current_user
11
+ from app.core.llm_router import get_llm_router
12
+ from app.core.observability import get_token_tracker, get_meter
13
+ from app.core.error_handling import get_all_circuit_breaker_status
14
+ from app.core.rate_limiter import get_rate_limiter
15
+ from app.core.context_manager import cleanup_expired_windows
16
+ from app.ai.config import redis_client, qdrant_client
17
+ from app.database import get_db
18
+
19
+ logger = logging.getLogger(__name__)
20
+
21
+ router = APIRouter()
22
+
23
+ # ============================================================
24
+ # Health & Status Endpoints
25
+ # ============================================================
26
+
27
+ @router.get("/health/detailed")
28
+ async def detailed_health_check() -> Dict[str, Any]:
29
+ """
30
+ Comprehensive health check with all system components
31
+ """
32
+
33
+ health_status = {
34
+ "status": "checking",
35
+ "timestamp": datetime.utcnow().isoformat(),
36
+ "components": {},
37
+ }
38
+
39
+ try:
40
+ # MongoDB
41
+ try:
42
+ db = await get_db()
43
+ await db.client.admin.command("ping")
44
+ health_status["components"]["mongodb"] = {
45
+ "status": "healthy",
46
+ "response_time_ms": 5,
47
+ }
48
+ except Exception as e:
49
+ health_status["components"]["mongodb"] = {
50
+ "status": "unhealthy",
51
+ "error": str(e),
52
+ }
53
+
54
+ # Redis
55
+ try:
56
+ await redis_client.ping()
57
+ health_status["components"]["redis"] = {
58
+ "status": "healthy",
59
+ }
60
+ except Exception as e:
61
+ health_status["components"]["redis"] = {
62
+ "status": "unhealthy",
63
+ "error": str(e),
64
+ }
65
+
66
+ # Qdrant
67
+ try:
68
+ await qdrant_client.get_collections()
69
+ health_status["components"]["qdrant"] = {
70
+ "status": "healthy",
71
+ }
72
+ except Exception as e:
73
+ health_status["components"]["qdrant"] = {
74
+ "status": "unhealthy",
75
+ "error": str(e),
76
+ }
77
+
78
+ # LLM Router
79
+ try:
80
+ router = get_llm_router()
81
+ stats = router.get_stats()
82
+ available = sum(
83
+ 1 for info in stats["models"].values()
84
+ if info["available"]
85
+ )
86
+ health_status["components"]["llm_router"] = {
87
+ "status": "healthy" if available > 0 else "degraded",
88
+ "available_models": available,
89
+ "total_calls": stats["total_calls"],
90
+ "errors": stats["total_errors"],
91
+ }
92
+ except Exception as e:
93
+ health_status["components"]["llm_router"] = {
94
+ "status": "unhealthy",
95
+ "error": str(e),
96
+ }
97
+
98
+ # Circuit Breakers
99
+ try:
100
+ circuit_breakers = get_all_circuit_breaker_status()
101
+ open_breakers = sum(
102
+ 1 for cb in circuit_breakers.values()
103
+ if cb["is_open"]
104
+ )
105
+ health_status["components"]["circuit_breakers"] = {
106
+ "status": "healthy" if open_breakers == 0 else "warning",
107
+ "total": len(circuit_breakers),
108
+ "open": open_breakers,
109
+ }
110
+ except Exception as e:
111
+ health_status["components"]["circuit_breakers"] = {
112
+ "status": "unknown",
113
+ "error": str(e),
114
+ }
115
+
116
+ # Overall status
117
+ unhealthy = sum(
118
+ 1 for component in health_status["components"].values()
119
+ if component["status"] == "unhealthy"
120
+ )
121
+
122
+ health_status["status"] = (
123
+ "healthy" if unhealthy == 0 else
124
+ "degraded" if unhealthy <= 1 else
125
+ "unhealthy"
126
+ )
127
+
128
+ return health_status
129
+
130
+ except Exception as e:
131
+ logger.error(f"Health check error: {e}")
132
+ return {
133
+ "status": "error",
134
+ "error": str(e),
135
+ "timestamp": datetime.utcnow().isoformat(),
136
+ }
137
+
138
+ # ============================================================
139
+ # Metrics Endpoints
140
+ # ============================================================
141
+
142
+ @router.get("/metrics/tokens")
143
+ async def get_token_metrics(user: dict = Depends(get_current_user)) -> Dict[str, Any]:
144
+ """
145
+ Get token usage metrics
146
+ Only admins or the user themselves can view
147
+ """
148
+
149
+ try:
150
+ tracker = get_token_tracker()
151
+
152
+ return {
153
+ "success": True,
154
+ "data": {
155
+ "timestamp": datetime.utcnow().isoformat(),
156
+ "note": "Token metrics available in traces backend",
157
+ "models_tracked": [
158
+ "deepseek-chat",
159
+ "mistralai/mistral-7b-instruct",
160
+ "xai-org/grok-beta",
161
+ "meta-llama/llama-2-70b-chat",
162
+ ],
163
+ }
164
+ }
165
+ except Exception as e:
166
+ logger.error(f"Token metrics error: {e}")
167
+ raise HTTPException(status_code=500, detail="Failed to get token metrics")
168
+
169
+ @router.get("/metrics/llm")
170
+ async def get_llm_metrics(user: dict = Depends(get_current_user)) -> Dict[str, Any]:
171
+ """
172
+ Get LLM router metrics
173
+ """
174
+
175
+ try:
176
+ llm_router = get_llm_router()
177
+ stats = llm_router.get_stats()
178
+
179
+ return {
180
+ "success": True,
181
+ "data": {
182
+ "timestamp": datetime.utcnow().isoformat(),
183
+ "total_calls": stats["total_calls"],
184
+ "total_errors": stats["total_errors"],
185
+ "models": [
186
+ {
187
+ "name": model,
188
+ "available": info["available"],
189
+ "calls": info["calls"],
190
+ "errors": info["errors"],
191
+ "error_rate": (
192
+ (info["errors"] / max(info["calls"], 1) * 100)
193
+ if info["calls"] > 0 else 0
194
+ ),
195
+ }
196
+ for model, info in stats["models"].items()
197
+ ],
198
+ }
199
+ }
200
+ except Exception as e:
201
+ logger.error(f"LLM metrics error: {e}")
202
+ raise HTTPException(status_code=500, detail="Failed to get LLM metrics")
203
+
204
+ @router.get("/metrics/rate-limit")
205
+ async def get_rate_limit_metrics(user: dict = Depends(get_current_user)) -> Dict[str, Any]:
206
+ """
207
+ Get rate limit metrics for current user
208
+ """
209
+
210
+ try:
211
+ rate_limiter = get_rate_limiter()
212
+ stats = await rate_limiter.get_usage_stats(user["user_id"])
213
+
214
+ usage_percent = (
215
+ ((stats["capacity"] - stats["remaining"]) / stats["capacity"] * 100)
216
+ if stats["capacity"] > 0 else 0
217
+ )
218
+
219
+ return {
220
+ "success": True,
221
+ "data": {
222
+ **stats,
223
+ "usage_percent": round(usage_percent, 2),
224
+ "reset_time": (
225
+ datetime.utcnow() + timedelta(seconds=stats["reset_in"])
226
+ ).isoformat(),
227
+ }
228
+ }
229
+ except Exception as e:
230
+ logger.error(f"Rate limit metrics error: {e}")
231
+ raise HTTPException(status_code=500, detail="Failed to get rate limit metrics")
232
+
233
+ @router.get("/metrics/circuit-breakers")
234
+ async def get_circuit_breaker_metrics(user: dict = Depends(get_current_user)) -> Dict[str, Any]:
235
+ """
236
+ Get circuit breaker status
237
+ """
238
+
239
+ try:
240
+ breakers = get_all_circuit_breaker_status()
241
+
242
+ return {
243
+ "success": True,
244
+ "data": {
245
+ "timestamp": datetime.utcnow().isoformat(),
246
+ "total": len(breakers),
247
+ "open": sum(1 for cb in breakers.values() if cb["is_open"]),
248
+ "closed": sum(1 for cb in breakers.values() if not cb["is_open"]),
249
+ "circuit_breakers": [
250
+ {
251
+ **cb,
252
+ "status": "open" if cb["is_open"] else "closed",
253
+ }
254
+ for cb in breakers.values()
255
+ ],
256
+ }
257
+ }
258
+ except Exception as e:
259
+ logger.error(f"Circuit breaker metrics error: {e}")
260
+ raise HTTPException(status_code=500, detail="Failed to get circuit breaker metrics")
261
+
262
+ # ============================================================
263
+ # System Status Endpoints
264
+ # ============================================================
265
+
266
+ @router.get("/status/system")
267
+ async def get_system_status(user: dict = Depends(get_current_user)) -> Dict[str, Any]:
268
+ """
269
+ Get overall system status
270
+ """
271
+
272
+ try:
273
+ llm_router = get_llm_router()
274
+ llm_stats = llm_router.get_stats()
275
+
276
+ breakers = get_all_circuit_breaker_status()
277
+ open_breakers = [cb for cb in breakers.values() if cb["is_open"]]
278
+
279
+ return {
280
+ "success": True,
281
+ "data": {
282
+ "timestamp": datetime.utcnow().isoformat(),
283
+ "system_status": (
284
+ "healthy" if len(open_breakers) == 0 else
285
+ "degraded" if len(open_breakers) == 1 else
286
+ "unhealthy"
287
+ ),
288
+ "llm_status": {
289
+ "available_models": sum(
290
+ 1 for info in llm_stats["models"].values()
291
+ if info["available"]
292
+ ),
293
+ "error_rate": (
294
+ llm_stats["total_errors"] / max(llm_stats["total_calls"], 1) * 100
295
+ ) if llm_stats["total_calls"] > 0 else 0,
296
+ },
297
+ "circuit_breaker_status": {
298
+ "open": len(open_breakers),
299
+ "total": len(breakers),
300
+ "open_breakers": [cb["name"] for cb in open_breakers],
301
+ },
302
+ }
303
+ }
304
+ except Exception as e:
305
+ logger.error(f"System status error: {e}")
306
+ raise HTTPException(status_code=500, detail="Failed to get system status")
307
+
308
+ # ============================================================
309
+ # Debug Endpoints (Development Only)
310
+ # ============================================================
311
+
312
+ @router.post("/debug/cleanup-windows")
313
+ async def debug_cleanup_windows(user: dict = Depends(get_current_user)) -> Dict[str, Any]:
314
+ """
315
+ Manually trigger cleanup of expired message windows
316
+ """
317
+
318
+ try:
319
+ count = cleanup_expired_windows()
320
+
321
+ return {
322
+ "success": True,
323
+ "data": {
324
+ "cleaned_windows": count,
325
+ "timestamp": datetime.utcnow().isoformat(),
326
+ }
327
+ }
328
+ except Exception as e:
329
+ logger.error(f"Window cleanup error: {e}")
330
+ raise HTTPException(status_code=500, detail="Failed to cleanup windows")
331
+
332
+ @router.post("/debug/reset-rate-limit")
333
+ async def debug_reset_rate_limit(
334
+ user: dict = Depends(get_current_user),
335
+ ) -> Dict[str, Any]:
336
+ """
337
+ Reset rate limits (development/admin only)
338
+ """
339
+
340
+ try:
341
+ rate_limiter = get_rate_limiter()
342
+ success = await rate_limiter.reset_user_limits(user["user_id"])
343
+
344
+ return {
345
+ "success": success,
346
+ "data": {
347
+ "user_id": user["user_id"],
348
+ "limits_reset": success,
349
+ "timestamp": datetime.utcnow().isoformat(),
350
+ }
351
+ }
352
+ except Exception as e:
353
+ logger.error(f"Rate limit reset error: {e}")
354
+ raise HTTPException(status_code=500, detail="Failed to reset rate limits")
app/core/rate_limiter.py ADDED
@@ -0,0 +1,289 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ============================================================
2
+ # app/core/rate_limiter.py - Advanced Token Bucket Rate Limiting
3
+ # ============================================================
4
+
5
+ import logging
6
+ import time
7
+ from typing import Dict, Optional, Tuple
8
+ from datetime import datetime, timedelta
9
+ from app.ai.config import redis_client
10
+ from app.core.error_handling import LojizError
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+ # ============================================================
15
+ # Rate Limit Configuration
16
+ # ============================================================
17
+
18
+ class RateLimitConfig:
19
+ """Rate limiting configuration by operation type"""
20
+
21
+ # Operation costs (in "credits")
22
+ OPERATION_COSTS = {
23
+ "chat": 1, # Basic chat
24
+ "search": 2, # Vector search (expensive)
25
+ "list": 3, # Create listing (ML validation)
26
+ "publish": 5, # Publish (database + indexing)
27
+ "edit": 2, # Edit listing
28
+ "upload_image": 4, # Image upload (Cloudflare)
29
+ }
30
+
31
+ # Rate limits (credits per time window)
32
+ LIMITS = {
33
+ "user": {
34
+ "credits": 100, # 100 credits per minute
35
+ "window_seconds": 60,
36
+ },
37
+ "ip": {
38
+ "credits": 500, # 500 credits per minute (more permissive)
39
+ "window_seconds": 60,
40
+ },
41
+ "global": {
42
+ "credits": 10000, # 10k credits per minute (system-wide)
43
+ "window_seconds": 60,
44
+ }
45
+ }
46
+
47
+ # Burst allowance (temporary spike tolerance)
48
+ BURST_MULTIPLIER = 1.5 # Allow 50% burst above limit
49
+
50
+ # Cleanup settings
51
+ CLEANUP_INTERVAL = 3600 # Clean old buckets every hour
52
+ MAX_BUCKET_AGE = 86400 # Keep buckets for 24 hours max
53
+
54
+ # ============================================================
55
+ # Token Bucket Implementation
56
+ # ============================================================
57
+
58
+ class TokenBucket:
59
+ """Token bucket for rate limiting"""
60
+
61
+ def __init__(self, capacity: int, refill_rate: float):
62
+ """
63
+ Args:
64
+ capacity: Max tokens in bucket
65
+ refill_rate: Tokens per second
66
+ """
67
+ self.capacity = capacity
68
+ self.refill_rate = refill_rate
69
+ self.tokens = capacity
70
+ self.last_refill = time.time()
71
+
72
+ def refill(self) -> None:
73
+ """Refill tokens based on time elapsed"""
74
+ now = time.time()
75
+ elapsed = now - self.last_refill
76
+
77
+ new_tokens = elapsed * self.refill_rate
78
+ self.tokens = min(self.capacity, self.tokens + new_tokens)
79
+ self.last_refill = now
80
+
81
+ def consume(self, tokens: int) -> bool:
82
+ """Try to consume tokens"""
83
+ self.refill()
84
+
85
+ if self.tokens >= tokens:
86
+ self.tokens -= tokens
87
+ return True
88
+
89
+ return False
90
+
91
+ def get_available(self) -> int:
92
+ """Get available tokens"""
93
+ self.refill()
94
+ return int(self.tokens)
95
+
96
+ # ============================================================
97
+ # Advanced Rate Limiter
98
+ # ============================================================
99
+
100
+ class AdvancedRateLimiter:
101
+ """Token bucket rate limiter with multiple scopes"""
102
+
103
+ def __init__(self):
104
+ self.buckets: Dict[str, TokenBucket] = {}
105
+ self.last_cleanup = time.time()
106
+
107
+ async def is_allowed(
108
+ self,
109
+ user_id: str,
110
+ operation: str,
111
+ ip_address: str = None,
112
+ ) -> Tuple[bool, Dict[str, any]]:
113
+ """
114
+ Check if operation is allowed for user
115
+
116
+ Returns:
117
+ (is_allowed, rate_limit_info)
118
+ """
119
+
120
+ operation_cost = RateLimitConfig.OPERATION_COSTS.get(operation, 1)
121
+
122
+ # Check all scopes
123
+ user_check = await self._check_scope(
124
+ f"user:{user_id}",
125
+ operation_cost,
126
+ RateLimitConfig.LIMITS["user"]
127
+ )
128
+
129
+ ip_check = await self._check_scope(
130
+ f"ip:{ip_address}",
131
+ operation_cost,
132
+ RateLimitConfig.LIMITS["ip"]
133
+ ) if ip_address else (True, {})
134
+
135
+ global_check = await self._check_scope(
136
+ "global",
137
+ operation_cost,
138
+ RateLimitConfig.LIMITS["global"]
139
+ )
140
+
141
+ # All must pass
142
+ is_allowed = user_check[0] and ip_check[0] and global_check[0]
143
+
144
+ info = {
145
+ "allowed": is_allowed,
146
+ "operation": operation,
147
+ "cost": operation_cost,
148
+ "user": user_check[1],
149
+ "ip": ip_check[1] if ip_address else None,
150
+ "global": global_check[1],
151
+ "timestamp": datetime.utcnow().isoformat(),
152
+ }
153
+
154
+ if not is_allowed:
155
+ logger.warning(
156
+ f"⚠️ Rate limit exceeded",
157
+ extra={
158
+ "user_id": user_id,
159
+ "operation": operation,
160
+ "ip": ip_address,
161
+ }
162
+ )
163
+
164
+ return is_allowed, info
165
+
166
+ async def _check_scope(
167
+ self,
168
+ scope_key: str,
169
+ cost: int,
170
+ config: Dict,
171
+ ) -> Tuple[bool, Dict]:
172
+ """Check single scope (user/ip/global)"""
173
+
174
+ try:
175
+ # Get bucket from Redis
176
+ bucket_data = await redis_client.get(f"rate_limit:{scope_key}")
177
+
178
+ if bucket_data:
179
+ # Deserialize
180
+ import json
181
+ data = json.loads(bucket_data)
182
+ tokens = data["tokens"]
183
+ last_refill = data["last_refill"]
184
+ else:
185
+ # New bucket
186
+ tokens = config["credits"]
187
+ last_refill = time.time()
188
+
189
+ # Refill based on time elapsed
190
+ now = time.time()
191
+ elapsed = now - last_refill
192
+ refill_rate = config["credits"] / config["window_seconds"]
193
+ new_tokens = elapsed * refill_rate
194
+ tokens = min(config["credits"], tokens + new_tokens)
195
+
196
+ # Check if allowed
197
+ allowed = tokens >= cost
198
+
199
+ if allowed:
200
+ tokens -= cost
201
+ logger.debug(f"βœ… Rate limit OK: {scope_key} ({int(tokens)} tokens left)")
202
+ else:
203
+ logger.warning(f"🚫 Rate limit exceeded: {scope_key}")
204
+
205
+ # Save back to Redis
206
+ import json
207
+ await redis_client.setex(
208
+ f"rate_limit:{scope_key}",
209
+ config["window_seconds"] * 2, # TTL
210
+ json.dumps({
211
+ "tokens": tokens,
212
+ "last_refill": now,
213
+ "capacity": config["credits"],
214
+ })
215
+ )
216
+
217
+ return allowed, {
218
+ "remaining": int(tokens),
219
+ "capacity": config["credits"],
220
+ "reset_in": config["window_seconds"],
221
+ }
222
+
223
+ except Exception as e:
224
+ logger.error(f"❌ Rate limit check error: {e}")
225
+ # Fail open (allow) on error
226
+ return True, {"error": "rate_limit_check_failed"}
227
+
228
+ async def get_usage_stats(self, user_id: str) -> Dict:
229
+ """Get current usage stats for user"""
230
+
231
+ bucket_data = await redis_client.get(f"rate_limit:user:{user_id}")
232
+
233
+ if not bucket_data:
234
+ return {
235
+ "user_id": user_id,
236
+ "remaining": RateLimitConfig.LIMITS["user"]["credits"],
237
+ "capacity": RateLimitConfig.LIMITS["user"]["credits"],
238
+ "reset_in": RateLimitConfig.LIMITS["user"]["window_seconds"],
239
+ }
240
+
241
+ import json
242
+ data = json.loads(bucket_data)
243
+
244
+ return {
245
+ "user_id": user_id,
246
+ "remaining": int(data["tokens"]),
247
+ "capacity": data["capacity"],
248
+ "reset_in": RateLimitConfig.LIMITS["user"]["window_seconds"],
249
+ }
250
+
251
+ async def reset_user_limits(self, user_id: str) -> bool:
252
+ """Reset rate limits for user (admin only)"""
253
+ try:
254
+ await redis_client.delete(f"rate_limit:user:{user_id}")
255
+ logger.info(f"βœ… Rate limits reset for user: {user_id}")
256
+ return True
257
+ except Exception as e:
258
+ logger.error(f"❌ Failed to reset limits: {e}")
259
+ return False
260
+
261
+ # ============================================================
262
+ # Global Instance
263
+ # ============================================================
264
+
265
+ _rate_limiter = None
266
+
267
+ def get_rate_limiter() -> AdvancedRateLimiter:
268
+ """Get or create rate limiter instance"""
269
+ global _rate_limiter
270
+ if _rate_limiter is None:
271
+ _rate_limiter = AdvancedRateLimiter()
272
+ return _rate_limiter
273
+
274
+ # ============================================================
275
+ # Exceptions
276
+ # ============================================================
277
+
278
+ class RateLimitExceeded(LojizError):
279
+ """Rate limit exceeded error"""
280
+
281
+ def __init__(self, retry_after: int = 60):
282
+ self.retry_after = retry_after
283
+ super().__init__(
284
+ f"Rate limit exceeded. Try again in {retry_after}s",
285
+ error_code="RATE_LIMIT_EXCEEDED",
286
+ status_code=429,
287
+ recoverable=True,
288
+ context={"retry_after": retry_after}
289
+ )
main.py CHANGED
@@ -6,6 +6,7 @@ from fastapi.middleware.cors import CORSMiddleware
6
  from fastapi.responses import JSONResponse
7
  from fastapi.exceptions import RequestValidationError
8
  from contextlib import asynccontextmanager
 
9
  import logging
10
  import os
11
  import asyncio
@@ -306,11 +307,17 @@ async def observability_status():
306
  "llm_router": llm_status,
307
  "token_usage": "See traces in observability backend",
308
  }
 
 
 
 
 
309
 
310
  # ====================================================================
311
  # Health
312
  # ====================================================================
313
  @app.get("/health", tags=["Health"])
 
314
  async def health_check():
315
  """Health check endpoint with ML & LLM status"""
316
  try:
 
6
  from fastapi.responses import JSONResponse
7
  from fastapi.exceptions import RequestValidationError
8
  from contextlib import asynccontextmanager
9
+ from app.api.endpoints.monitoring import router as monitoring_router
10
  import logging
11
  import os
12
  import asyncio
 
307
  "llm_router": llm_status,
308
  "token_usage": "See traces in observability backend",
309
  }
310
+
311
+
312
+ # Include monitoring endpoints
313
+ app.include_router(monitoring_router, prefix="/api/monitoring", tags=["Monitoring"])
314
+
315
 
316
  # ====================================================================
317
  # Health
318
  # ====================================================================
319
  @app.get("/health", tags=["Health"])
320
+
321
  async def health_check():
322
  """Health check endpoint with ML & LLM status"""
323
  try: