srivatsavdamaraju commited on
Commit
75c39de
Β·
verified Β·
1 Parent(s): 2287980

Create r8.py

Browse files
Files changed (1) hide show
  1. s3/r8.py +593 -0
s3/r8.py ADDED
@@ -0,0 +1,593 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #with chunking and proper file metadata upload including file_hash
2
+ from fastapi import FastAPI, UploadFile, File, HTTPException, Query, APIRouter, Form
3
+ from fastapi.responses import JSONResponse
4
+ import pandas as pd
5
+ from autoviz.AutoViz_Class import AutoViz_Class
6
+ import io, os, boto3, tempfile, glob, matplotlib, json, hashlib, shutil
7
+ matplotlib.use('Agg')
8
+ import matplotlib.pyplot as plt
9
+ import sys
10
+ from pathlib import Path
11
+ from typing import List, Optional
12
+ import httpx
13
+ import datetime
14
+ import numpy as np
15
+
16
+ # --- Project Root Setup ---
17
+ PROJECT_ROOT = Path(__file__).resolve().parents[1]
18
+ if str(PROJECT_ROOT) not in sys.path:
19
+ sys.path.insert(0, str(PROJECT_ROOT))
20
+
21
+ from retrieve_secret import *
22
+ from s3.meta_data_creation_from_s3 import create_file_metadata_from_df
23
+ from s3.create_dataset_graphs import create_data_set_graphs_dict
24
+
25
+ # --- File Validation ---
26
+ MAX_FILE_SIZE_BYTES = 100 * 1024 * 1024 # 100 MiB
27
+ MAX_ROWS_ALLOWED = 1_000_000
28
+ ALLOWED_EXTENSIONS = {".csv", ".xlsx", ".xls", ".ods"}
29
+ ALLOWED_METADATA_EXTENSIONS = {".json", ".csv", ".xlsx", ".xls"}
30
+
31
+ # --- AWS S3 Config ---
32
+ print("AWS S3 config:", AWS_S3_CREDS_KEY_ID, AWS_S3_CREDS_SECRET_KEY, BUCKET_NAME)
33
+ ACCESS_KEY = AWS_S3_CREDS_KEY_ID
34
+ SECRET_KEY = AWS_S3_CREDS_SECRET_KEY
35
+ BUCKET_NAME = BUCKET_NAME
36
+ REGION_NAME = "us-east-1"
37
+
38
+ s3 = boto3.client(
39
+ "s3",
40
+ aws_access_key_id=ACCESS_KEY,
41
+ aws_secret_access_key=SECRET_KEY,
42
+ region_name=REGION_NAME
43
+ )
44
+
45
+ ENDPOINT_URL = f"https://s3.{REGION_NAME}.amazonaws.com"
46
+
47
+ # --- FastAPI Router ---
48
+ s3_bucket_router1 = APIRouter(prefix="/s3/v3", tags=["s3_v3"])
49
+
50
+
51
+ # --- Helper: S3 Key ---
52
+ def make_key(path: str, filename: str) -> str:
53
+ return f"{path.strip('/')}/{filename}" if path else filename
54
+
55
+
56
+ # --- Sanitize for JSON ---
57
+ def sanitize_for_json(obj):
58
+ """Recursively sanitize objects to be JSON serializable"""
59
+ if isinstance(obj, dict):
60
+ return {k: sanitize_for_json(v) for k, v in obj.items()}
61
+ elif isinstance(obj, list):
62
+ return [sanitize_for_json(item) for item in obj]
63
+ elif isinstance(obj, tuple):
64
+ return [sanitize_for_json(item) for item in obj]
65
+ elif isinstance(obj, (pd.Timestamp, pd.DatetimeTZDtype, datetime.datetime, datetime.date, datetime.time)):
66
+ return obj.isoformat()
67
+ elif isinstance(obj, (np.integer, np.int64, np.int32, np.int16, np.int8)):
68
+ return int(obj)
69
+ elif isinstance(obj, (np.floating, np.float64, np.float32, np.float16)):
70
+ return float(obj)
71
+ elif isinstance(obj, (np.bool_, bool)):
72
+ return bool(obj)
73
+ elif isinstance(obj, np.ndarray):
74
+ return sanitize_for_json(obj.tolist())
75
+ elif pd.isna(obj):
76
+ return None
77
+ elif isinstance(obj, (int, float, str, bool, type(None))):
78
+ return obj
79
+ else:
80
+ return str(obj)
81
+
82
+
83
+ # --- Vector DB Placeholders ---
84
+ def check_vdb(user_id: str):
85
+ print(f"Checking VDB for user: {user_id}")
86
+
87
+ async def add_metadata_only(collection_name: str, metadata: dict):
88
+ print(f"Adding metadata to collection: {collection_name}")
89
+ return {"status": "success", "collection": collection_name}
90
+
91
+
92
+ # --- Convert to Parquet ---
93
+ import pyarrow as pa
94
+ import pyarrow.parquet as pq
95
+
96
+ def convert_df_to_parquet(df: pd.DataFrame) -> io.BytesIO:
97
+ """
98
+ Robust DataFrame β†’ Parquet conversion with automated dtype correction.
99
+ Solves:
100
+ - Mixed-type object columns
101
+ - Non-JSON-serializable datetime values
102
+ - NaN / NaT issues
103
+ """
104
+
105
+ df_copy = df.copy()
106
+
107
+ for col in df_copy.columns:
108
+ # Skip if already correct dtype
109
+ if pd.api.types.is_numeric_dtype(df_copy[col]) or \
110
+ pd.api.types.is_datetime64_any_dtype(df_copy[col]) or \
111
+ pd.api.types.is_bool_dtype(df_copy[col]):
112
+ continue
113
+
114
+ # Clean object columns
115
+ if df_copy[col].dtype == "object":
116
+ sample = df_copy[col].dropna()
117
+
118
+ if len(sample) == 0:
119
+ df_copy[col] = None
120
+ continue
121
+
122
+ # Detect if column has datetime-like values
123
+ sample_values = sample.head(50)
124
+
125
+ has_datetime = any(
126
+ isinstance(x, (pd.Timestamp, datetime.date, datetime.time)) or
127
+ (isinstance(x, str) and any(k in x.lower() for k in ["-", "/", ":"]))
128
+ for x in sample_values
129
+ )
130
+
131
+ if has_datetime:
132
+ try:
133
+ df_copy[col] = pd.to_datetime(df_copy[col], errors="coerce")
134
+ # Convert to ISO string for Parquet
135
+ df_copy[col] = df_copy[col].dt.strftime("%Y-%m-%d %H:%M:%S")
136
+ continue
137
+ except Exception:
138
+ pass
139
+
140
+ # Try numeric conversion
141
+ try:
142
+ numeric_conv = pd.to_numeric(df_copy[col], errors="coerce")
143
+ if numeric_conv.notna().sum() / sample.count() > 0.70:
144
+ df_copy[col] = numeric_conv
145
+ continue
146
+ except Exception:
147
+ pass
148
+
149
+ # Final fallback β†’ string
150
+ df_copy[col] = df_copy[col].astype(str)
151
+
152
+ # Replace remaining bad values (for Parquet safety)
153
+ df_copy = df_copy.replace({"nan": None, "NaT": None, "None": None})
154
+
155
+ # Ensure all timezone-aware datetime columns convert safely
156
+ for col in df_copy.columns:
157
+ if pd.api.types.is_datetime64_any_dtype(df_copy[col]):
158
+ df_copy[col] = df_copy[col].dt.tz_localize(None).astype(str)
159
+
160
+ # Convert to Parquet binary buffer
161
+ buffer = io.BytesIO()
162
+ table = pa.Table.from_pandas(df_copy)
163
+ pq.write_table(table, buffer, compression="snappy")
164
+ buffer.seek(0)
165
+
166
+ print(f"πŸ“¦ Parquet conversion OK β€” {len(buffer.getvalue()):,} bytes")
167
+ return buffer
168
+
169
+
170
+ # --- Check File Hash Exists ---
171
+ async def check_file_hash_exists(user_id: str, file_hash: str) -> dict:
172
+ """Check if a file hash already exists for a user."""
173
+ url = f"https://mr-mvp-api-dev.dev.ingenspark.com/auth/UserMetadata/{user_id}/check_file_hash?file_hash={file_hash}"
174
+ headers = {"accept": "application/json"}
175
+ async with httpx.AsyncClient(timeout=10.0) as client:
176
+ try:
177
+ r = await client.get(url, headers=headers)
178
+ r.raise_for_status()
179
+ result = r.json()
180
+
181
+ print(f"Hash check API response: {result}")
182
+
183
+ exists = result.get("exists", None)
184
+
185
+ if exists is None and "message" in result:
186
+ message_lower = result["message"].lower()
187
+ exists = "already existed" in message_lower or "already exists" in message_lower or "duplicate" in message_lower
188
+
189
+ if exists is None:
190
+ exists = False
191
+
192
+ return {
193
+ "success": True,
194
+ "exists": exists,
195
+ "data": result
196
+ }
197
+ except httpx.HTTPStatusError as e:
198
+ print(f"Hash check HTTP error: {e.response.status_code} - {e.response.text}")
199
+ return {
200
+ "success": False,
201
+ "exists": False,
202
+ "message": f"HTTP {e.response.status_code}",
203
+ "error_detail": e.response.text
204
+ }
205
+ except Exception as e:
206
+ print(f"Hash check exception: {str(e)}")
207
+ return {
208
+ "success": False,
209
+ "exists": False,
210
+ "message": f"Request failed: {str(e)}"
211
+ }
212
+
213
+
214
+ # --- PostgreSQL Metadata Upload ---
215
+ async def user_metadata_upload_pg(
216
+ user_id: str,
217
+ user_metadata: str,
218
+ path: str,
219
+ url: str,
220
+ filename: str,
221
+ file_type: str,
222
+ file_size_bytes: int,
223
+ file_hash: str,
224
+ timeout: float = 10.0,
225
+ data_sets_preview_graph: str = None,
226
+ is_metadata_file: bool = False,
227
+ metadata_file_path: str = None
228
+ ):
229
+ payload = {
230
+ "user_id": user_id,
231
+ "user_metadata": user_metadata,
232
+ "path": path,
233
+ "url": url,
234
+ "filename": filename,
235
+ "file_type": file_type,
236
+ "file_size_bytes": file_size_bytes,
237
+ "file_hash": file_hash,
238
+ "data_sets_preview_graph": data_sets_preview_graph,
239
+ "is_metadata_file": is_metadata_file,
240
+ "metadata_file_path": metadata_file_path
241
+ }
242
+ print(f"PostgreSQL payload file_hash: {payload['file_hash']}")
243
+ async with httpx.AsyncClient() as client:
244
+ try:
245
+ r = await client.post(
246
+ "https://mr-mvp-api-dev.dev.ingenspark.com/auth/UserMetadataCreate",
247
+ json=payload,
248
+ timeout=timeout
249
+ )
250
+ r.raise_for_status()
251
+ result = r.json()
252
+ result["file_hash"] = file_hash
253
+ return {"success": True, "data": result}
254
+ except httpx.HTTPStatusError as e:
255
+ return {
256
+ "success": False,
257
+ "error": "HTTP error",
258
+ "status_code": e.response.status_code,
259
+ "detail": e.response.text,
260
+ "file_hash": file_hash
261
+ }
262
+ except Exception as e:
263
+ return {
264
+ "success": False,
265
+ "error": "Request failed",
266
+ "detail": str(e),
267
+ "file_hash": file_hash
268
+ }
269
+
270
+
271
+ # --- DEBUG ENDPOINT ---
272
+ @s3_bucket_router1.get("/debug/check_hash/{user_id}/{file_hash}")
273
+ async def debug_check_hash(user_id: str, file_hash: str):
274
+ """Debug endpoint to test hash checking"""
275
+ result = await check_file_hash_exists(user_id, file_hash)
276
+ return {
277
+ "raw_result": result,
278
+ "exists_value": result.get("exists"),
279
+ "exists_type": type(result.get("exists")).__name__,
280
+ "success_value": result.get("success"),
281
+ "interpretation": "File exists" if result.get("exists") is True else "File does not exist or check failed"
282
+ }
283
+
284
+
285
+ # --- MAIN ENDPOINT WITH OPTIONAL METADATA FILE ---
286
+ @s3_bucket_router1.post("/upload_datasets_v3/")
287
+ async def upload_file(
288
+ file: UploadFile = File(..., description="Main data file"),
289
+ user_id: str = Query(..., description="User ID"),
290
+ path: str = Query("", description="Optional subpath"),
291
+ is_metadata_file: bool = Form(False, description="Toggle for separate metadata file upload"),
292
+ metadata_file: Optional[UploadFile] = File(None, description="Optional separate metadata file")
293
+ ):
294
+ """
295
+ Upload dataset with optional separate metadata file.
296
+
297
+ - If is_metadata_file=False (default): Single file upload, metadata generated from data
298
+ - If is_metadata_file=True: Main file + separate metadata file required
299
+ """
300
+ html_tmp_dir = None
301
+ bokeh_tmp_dir = None
302
+ file_content = None
303
+ metadata_content = None
304
+
305
+ try:
306
+ # 1. Validate main file extension
307
+ file_ext = os.path.splitext(file.filename)[1].lower()
308
+ if file_ext not in ALLOWED_EXTENSIONS:
309
+ raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_ext}")
310
+
311
+ # 2. Handle metadata file toggle
312
+ if is_metadata_file:
313
+ if not metadata_file:
314
+ raise HTTPException(
315
+ status_code=400,
316
+ detail="Metadata file is required when is_metadata_file=True"
317
+ )
318
+
319
+ # Validate metadata file extension
320
+ metadata_ext = os.path.splitext(metadata_file.filename)[1].lower()
321
+ if metadata_ext not in ALLOWED_METADATA_EXTENSIONS:
322
+ raise HTTPException(
323
+ status_code=400,
324
+ detail=f"Unsupported metadata file type: {metadata_ext}"
325
+ )
326
+
327
+ # Read metadata file
328
+ metadata_content = await metadata_file.read()
329
+ if not metadata_content:
330
+ raise HTTPException(status_code=400, detail="Empty metadata file")
331
+
332
+ print(f"πŸ“‹ Separate metadata file provided: {metadata_file.filename}")
333
+
334
+ # 3. Read main file
335
+ file_content = await file.read()
336
+ if not file_content:
337
+ raise HTTPException(status_code=400, detail="Empty file")
338
+ if len(file_content) > MAX_FILE_SIZE_BYTES:
339
+ raise HTTPException(status_code=413, detail="File exceeds 100 MiB limit")
340
+
341
+ # 4. Generate hash
342
+ file_hash = hashlib.sha256(file_content).hexdigest()
343
+ print(f"Generated file hash: {file_hash}")
344
+
345
+ # 5. Check hash via API
346
+ hash_result = await check_file_hash_exists(user_id, file_hash)
347
+ print(f"Hash check result: {hash_result}")
348
+
349
+ if not hash_result.get("success", False):
350
+ print(f"⚠️ Warning: Hash check API failed: {hash_result.get('message')}")
351
+
352
+ if hash_result.get("exists") is True:
353
+ print(f"🚫 Duplicate file detected: {file_hash}")
354
+ return JSONResponse(
355
+ status_code=409,
356
+ content={
357
+ "message": "File already uploaded.",
358
+ "reason": "Duplicate file detected via SHA-256 hash.",
359
+ "file_hash": file_hash,
360
+ "user_id": user_id,
361
+ "filename": file.filename,
362
+ "action": "skipped",
363
+ "existing_file_info": hash_result.get("data")
364
+ }
365
+ )
366
+
367
+ print("βœ… Hash check passed. New file - proceeding with upload.")
368
+
369
+ # 6. Load DataFrame
370
+ try:
371
+ if file_ext == ".csv":
372
+ df = pd.read_csv(io.BytesIO(file_content))
373
+ elif file_ext in {".xlsx", ".xls"}:
374
+ engine = 'openpyxl' if file_ext == ".xlsx" else 'xlrd'
375
+ df = pd.read_excel(io.BytesIO(file_content), engine=engine)
376
+ elif file_ext == ".ods":
377
+ df = pd.read_excel(io.BytesIO(file_content), engine='odf')
378
+ except Exception as e:
379
+ raise HTTPException(status_code=400, detail=f"Failed to parse file: {str(e)}")
380
+
381
+ if len(df) > MAX_ROWS_ALLOWED:
382
+ raise HTTPException(status_code=413, detail=f"Too many rows: {len(df):,} > {MAX_ROWS_ALLOWED:,}")
383
+
384
+ # 7. Convert to Parquet
385
+ parquet_buffer = convert_df_to_parquet(df)
386
+ parquet_size = parquet_buffer.getbuffer().nbytes
387
+
388
+ # 8. Upload Parquet to S3
389
+ base_filename = os.path.splitext(file.filename)[0]
390
+ parquet_filename = f"{base_filename}.parquet"
391
+ file_key = f"{user_id}/files/datasets/{parquet_filename}"
392
+ file_url = f"{ENDPOINT_URL}/{BUCKET_NAME}/{file_key}"
393
+
394
+ s3.upload_fileobj(parquet_buffer, BUCKET_NAME, file_key,
395
+ ExtraArgs={'ContentType': 'application/octet-stream'})
396
+ print(f"Uploaded Parquet: {file_url}")
397
+
398
+ # 9. Handle metadata file upload to S3 (if separate metadata file provided)
399
+ metadata_file_s3_url = None
400
+ metadata_file_s3_key = None
401
+ if is_metadata_file and metadata_content:
402
+ # Upload metadata file directly to S3 without conversion
403
+ print("πŸ“‹ Uploading separate metadata file to S3...")
404
+ try:
405
+ metadata_filename = metadata_file.filename
406
+ metadata_ext = os.path.splitext(metadata_filename)[1].lower()
407
+
408
+ # Determine content type
409
+ content_type_map = {
410
+ '.json': 'application/json',
411
+ '.csv': 'text/csv',
412
+ '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
413
+ '.xls': 'application/vnd.ms-excel'
414
+ }
415
+ content_type = content_type_map.get(metadata_ext, 'application/octet-stream')
416
+
417
+ # Upload to S3 in metadata subfolder
418
+ metadata_file_s3_key = f"{user_id}/files/datasets/{base_filename}/metadata/{metadata_filename}"
419
+ metadata_file_s3_url = f"{ENDPOINT_URL}/{BUCKET_NAME}/{metadata_file_s3_key}"
420
+
421
+ s3.put_object(
422
+ Bucket=BUCKET_NAME,
423
+ Key=metadata_file_s3_key,
424
+ Body=metadata_content,
425
+ ContentType=content_type
426
+ )
427
+ print(f"βœ… Uploaded metadata file to S3: {metadata_file_s3_url}")
428
+
429
+ except Exception as e:
430
+ print(f"⚠️ Failed to upload metadata file to S3: {e}")
431
+ raise HTTPException(
432
+ status_code=500,
433
+ detail=f"Failed to upload metadata file: {str(e)}"
434
+ )
435
+
436
+ # 10. Generate auto metadata from data
437
+ print("πŸ”„ Auto-generating metadata from data...")
438
+ try:
439
+ metadata = create_file_metadata_from_df(df, parquet_filename, file_key)
440
+ print("βœ… Raw metadata generated")
441
+ except Exception as e:
442
+ print(f"⚠️ Error generating metadata: {e}")
443
+ import traceback
444
+ traceback.print_exc()
445
+ # Fallback minimal metadata
446
+ metadata = {
447
+ "filename": parquet_filename,
448
+ "s3_path": file_key,
449
+ "rows": len(df),
450
+ "columns": len(df.columns),
451
+ "column_names": list(df.columns),
452
+ "error": f"Metadata generation failed: {str(e)}"
453
+ }
454
+
455
+ # βœ… CRITICAL FIX: Sanitize metadata immediately after creation
456
+ print("πŸ”„ Sanitizing metadata for JSON compatibility...")
457
+ metadata = sanitize_for_json(metadata)
458
+ print("βœ… Metadata sanitized successfully")
459
+
460
+ # Add metadata file info if provided
461
+ if is_metadata_file and metadata_file_s3_url:
462
+ metadata["metadata_source"] = "separate_file"
463
+ metadata["metadata_file"] = {
464
+ "filename": metadata_file.filename,
465
+ "s3_path": metadata_file_s3_key,
466
+ "s3_url": metadata_file_s3_url,
467
+ "size_bytes": len(metadata_content)
468
+ }
469
+ else:
470
+ metadata["metadata_source"] = "auto_generated"
471
+
472
+ # Add common metadata fields
473
+ metadata.update({
474
+ "user_id": user_id,
475
+ "s3_path": file_key,
476
+ "s3_url": file_url,
477
+ "source_file": file.filename,
478
+ "source_file_type": file_ext[1:],
479
+ "file_type": "parquet",
480
+ "original_file_size_bytes": len(file_content),
481
+ "parquet_file_size_bytes": parquet_size,
482
+ "compression_ratio": f"{(1 - parquet_size/len(file_content))*100:.1f}%",
483
+ "file_hash": file_hash,
484
+ "has_separate_metadata": is_metadata_file
485
+ })
486
+
487
+ # 11. Generate dataset preview graphs
488
+ print("Generating dataset preview graphs...")
489
+ dataset_graphs = None
490
+ try:
491
+ dataset_graphs = create_data_set_graphs_dict(df, max_rows=200)
492
+ # βœ… Sanitize graphs immediately
493
+ dataset_graphs = sanitize_for_json(dataset_graphs)
494
+ print(f"βœ… Generated graphs for {len(dataset_graphs.get('columnSummaries', []))} columns")
495
+ except Exception as e:
496
+ print(f"⚠️ Failed to generate dataset graphs: {e}")
497
+ import traceback
498
+ traceback.print_exc()
499
+ dataset_graphs = {
500
+ "error": str(e),
501
+ "columnSummaries": []
502
+ }
503
+
504
+ # Ensure dataset_graphs is sanitized
505
+ if dataset_graphs is None:
506
+ dataset_graphs = {"columnSummaries": []}
507
+
508
+ # 12. Vector DB
509
+ check_vdb(user_id)
510
+ try:
511
+ vdb_res = await add_metadata_only("sri_1_files_&_files_metadata", metadata)
512
+ vdb_success = vdb_res.get("status") == "success"
513
+ print(f"VDB upload success: {vdb_success}")
514
+ except Exception as e:
515
+ print(f"⚠️ VDB upload failed: {e}")
516
+ vdb_success = False
517
+
518
+ # 13. PostgreSQL Metadata
519
+ try:
520
+ # Convert metadata to JSON string
521
+ metadata_json_str = json.dumps(metadata)
522
+ dataset_graphs_json = dataset_graphs # Already sanitized, pass as dict
523
+
524
+ pg_result = await user_metadata_upload_pg(
525
+ user_id=user_id,
526
+ user_metadata=metadata_json_str,
527
+ path=file_key,
528
+ url=file_url,
529
+ filename=parquet_filename,
530
+ file_type="parquet",
531
+ file_size_bytes=parquet_size,
532
+ file_hash=file_hash,
533
+ data_sets_preview_graph=dataset_graphs_json,
534
+ is_metadata_file=is_metadata_file,
535
+ metadata_file_path=metadata_file_s3_key
536
+ )
537
+ print(f"PostgreSQL upload result: {pg_result}")
538
+ pg_success = pg_result.get("success", False)
539
+ except Exception as e:
540
+ print(f"⚠️ PostgreSQL upload failed: {e}")
541
+ import traceback
542
+ traceback.print_exc()
543
+ pg_success = False
544
+ pg_result = {"success": False, "error": str(e)}
545
+
546
+ graphs_count = len(dataset_graphs.get("columnSummaries", []))
547
+
548
+ # 14. Return success
549
+ response_data = {
550
+ "message": "Upload successful.",
551
+ "filename": parquet_filename,
552
+ "original_filename": file.filename,
553
+ "user_id": user_id,
554
+ "file_path": file_key,
555
+ "file_url": file_url,
556
+ "file_hash": file_hash,
557
+ "source_file_type": file_ext[1:],
558
+ "file_type": "parquet",
559
+ "original_file_size_bytes": len(file_content),
560
+ "parquet_file_size_bytes": parquet_size,
561
+ "compression_ratio": f"{(1 - parquet_size/len(file_content))*100:.1f}%",
562
+ "rows": len(df),
563
+ "columns": len(df.columns),
564
+ "has_separate_metadata": is_metadata_file,
565
+ "metadata": metadata,
566
+ "upload_dataset_vdb": vdb_success,
567
+ "upload_dataset_pg": pg_success,
568
+ "pg_details": pg_result,
569
+ "graphs_generated": graphs_count
570
+ }
571
+
572
+ if is_metadata_file and metadata_file_s3_url:
573
+ response_data["metadata_file_uploaded"] = {
574
+ "filename": metadata_file.filename,
575
+ "s3_path": metadata_file_s3_key,
576
+ "s3_url": metadata_file_s3_url,
577
+ "size_bytes": len(metadata_content)
578
+ }
579
+
580
+ return response_data
581
+
582
+ except HTTPException:
583
+ raise
584
+ except Exception as e:
585
+ print(f"Unexpected error: {e}")
586
+ import traceback
587
+ traceback.print_exc()
588
+ raise HTTPException(status_code=500, detail=str(e))
589
+ finally:
590
+ # Clean up temp directories
591
+ for d in (html_tmp_dir, bokeh_tmp_dir):
592
+ if d and os.path.exists(d):
593
+ shutil.rmtree(d, ignore_errors=True)