destinyebuka commited on
Commit
6038791
·
1 Parent(s): e82058c
Files changed (2) hide show
  1. app/routes/websocket_listings.py +160 -0
  2. main.py +3 -1
app/routes/websocket_listings.py ADDED
@@ -0,0 +1,160 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/routes/websocket_listings.py
2
+ from fastapi import APIRouter, WebSocket, Query, WebSocketDisconnect
3
+ from typing import Set, Optional
4
+ import json
5
+ import logging
6
+ from datetime import datetime
7
+ from app.database import get_db
8
+ from app.guards.jwt_guard import verify_token
9
+ from app.models.listing import Listing
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+ router = APIRouter(tags=["WebSocket"])
14
+
15
+
16
+ class ConnectionManager:
17
+ """Manage WebSocket connections grouped by filter type"""
18
+ def __init__(self):
19
+ self.active_connections: dict[str, Set[WebSocket]] = {
20
+ "all": set(),
21
+ "rent": set(),
22
+ "roommate": set(),
23
+ "short-stay": set(),
24
+ "sale": set(),
25
+ }
26
+
27
+ async def connect(self, websocket: WebSocket, filter_type: str = "all"):
28
+ await websocket.accept()
29
+ if filter_type not in self.active_connections:
30
+ filter_type = "all"
31
+ self.active_connections[filter_type].add(websocket)
32
+ logger.info(f"[WS] Client connected: {filter_type} | Total: {len(self.active_connections[filter_type])}")
33
+
34
+ def disconnect(self, websocket: WebSocket, filter_type: str = "all"):
35
+ if filter_type in self.active_connections:
36
+ self.active_connections[filter_type].discard(websocket)
37
+ logger.info(f"[WS] Client disconnected: {filter_type}")
38
+
39
+ async def broadcast_to_filter(self, filter_type: str, message: dict):
40
+ """Send message to all clients subscribed to this filter"""
41
+ if filter_type not in self.active_connections:
42
+ return
43
+
44
+ disconnected = set()
45
+ for connection in self.active_connections[filter_type]:
46
+ try:
47
+ await connection.send_json(message)
48
+ except Exception as e:
49
+ logger.warning(f"[WS] Error sending message: {e}")
50
+ disconnected.add(connection)
51
+
52
+ # Clean up disconnected clients
53
+ for conn in disconnected:
54
+ self.active_connections[filter_type].discard(conn)
55
+
56
+ async def broadcast_to_all(self, message: dict):
57
+ """Send message to all connected clients"""
58
+ for filter_type in self.active_connections:
59
+ await self.broadcast_to_filter(filter_type, message)
60
+
61
+
62
+ manager = ConnectionManager()
63
+
64
+
65
+ @router.websocket("/ws/listings")
66
+ async def websocket_listings_endpoint(websocket: WebSocket, token: str = Query(...)):
67
+ """
68
+ WebSocket endpoint for real-time listings updates
69
+ Connect with: ws://localhost:8000/ws/listings?token=YOUR_JWT_TOKEN
70
+ """
71
+
72
+ # Verify JWT token
73
+ try:
74
+ user = verify_token(token)
75
+ if not user:
76
+ await websocket.close(code=4001, reason="Unauthorized")
77
+ return
78
+ logger.info(f"[WS] User authenticated: {user.get('email', 'unknown')}")
79
+ except Exception as e:
80
+ logger.error(f"[WS] Token verification failed: {e}")
81
+ await websocket.close(code=4001, reason="Unauthorized")
82
+ return
83
+
84
+ filter_type = "all"
85
+ await manager.connect(websocket, filter_type)
86
+
87
+ try:
88
+ # Send initial listings on connect
89
+ db = await get_db()
90
+ query = {"status": "active"}
91
+ cursor = db.listings.find(query).sort("created_at", -1)
92
+ listings = []
93
+
94
+ async for doc in cursor:
95
+ if "_id" in doc:
96
+ doc["_id"] = str(doc["_id"])
97
+ try:
98
+ listing = Listing(**doc)
99
+ listings.append(listing.dict(by_alias=True))
100
+ except Exception as e:
101
+ logger.warning(f"[WS] Error parsing listing: {e}")
102
+ continue
103
+
104
+ # Send initial data
105
+ await websocket.send_json({
106
+ "type": "initial_data",
107
+ "data": listings,
108
+ "count": len(listings),
109
+ "timestamp": datetime.utcnow().isoformat()
110
+ })
111
+
112
+ while True:
113
+ # Receive messages from client (filter changes, ping)
114
+ data = await websocket.receive_text()
115
+ message = json.loads(data)
116
+
117
+ if message.get("type") == "filter_change":
118
+ new_filter = message.get("listing_type", "all")
119
+ if new_filter in manager.active_connections:
120
+ # Move connection to new filter group
121
+ manager.disconnect(websocket, filter_type)
122
+ filter_type = new_filter
123
+ await manager.connect(websocket, filter_type)
124
+
125
+ # Send listings for new filter
126
+ db = await get_db()
127
+ query = {"status": "active"}
128
+ if new_filter != "all":
129
+ query["listing_type"] = new_filter
130
+
131
+ cursor = db.listings.find(query).sort("created_at", -1)
132
+ listings = []
133
+
134
+ async for doc in cursor:
135
+ if "_id" in doc:
136
+ doc["_id"] = str(doc["_id"])
137
+ try:
138
+ listing = Listing(**doc)
139
+ listings.append(listing.dict(by_alias=True))
140
+ except Exception as e:
141
+ logger.warning(f"[WS] Error parsing listing: {e}")
142
+ continue
143
+
144
+ await websocket.send_json({
145
+ "type": "listings_update",
146
+ "data": listings,
147
+ "count": len(listings),
148
+ "timestamp": datetime.utcnow().isoformat()
149
+ })
150
+ logger.info(f"[WS] Filter changed to: {filter_type}")
151
+
152
+ elif message.get("type") == "ping":
153
+ # Respond to keep-alive ping
154
+ await websocket.send_json({"type": "pong"})
155
+
156
+ except WebSocketDisconnect:
157
+ manager.disconnect(websocket, filter_type)
158
+ except Exception as e:
159
+ logger.error(f"[WS] Error: {e}")
160
+ manager.disconnect(websocket, filter_type)
main.py CHANGED
@@ -204,12 +204,14 @@ logger.info("Registering routers...")
204
  app.include_router(auth.router, prefix="/api/auth", tags=["Authentication"])
205
  app.include_router(ai_chat_router, prefix="/ai", tags=["Aida AI Chat"])
206
 
207
- # ---------- NEW ROUTERS ----------
208
  from app.routes.listing import router as listing_router
209
  from app.routes.user_public import router as user_public_router
 
210
 
211
  app.include_router(listing_router, prefix="/api/listings", tags=["Listings"])
212
  app.include_router(user_public_router, prefix="/api/users", tags=["Users"])
 
213
 
214
  logger.info("All routers registered")
215
 
 
204
  app.include_router(auth.router, prefix="/api/auth", tags=["Authentication"])
205
  app.include_router(ai_chat_router, prefix="/ai", tags=["Aida AI Chat"])
206
 
207
+ # ---------- LISTING ROUTERS ----------
208
  from app.routes.listing import router as listing_router
209
  from app.routes.user_public import router as user_public_router
210
+ from app.routes.websocket_listings import router as ws_router
211
 
212
  app.include_router(listing_router, prefix="/api/listings", tags=["Listings"])
213
  app.include_router(user_public_router, prefix="/api/users", tags=["Users"])
214
+ app.include_router(ws_router, tags=["WebSocket"])
215
 
216
  logger.info("All routers registered")
217