import psycopg2 from psycopg2 import sql import json # PostgreSQL credentials PGHOST = 'ep-steep-dream-adqtvjel-pooler.c-2.us-east-1.aws.neon.tech' PGDATABASE = 'neondb' PGUSER = 'neondb_owner' PGPASSWORD = 'npg_Qq0B1uWRXavx' PGSSLMODE = 'require' # Function to connect to the PostgreSQL database def connect_to_db(): try: conn = psycopg2.connect( host=PGHOST, database=PGDATABASE, user=PGUSER, password=PGPASSWORD, sslmode=PGSSLMODE ) return conn except Exception as e: print(f"Error connecting to PostgreSQL database: {e}") return None # Function to delete the table def delete_table_by_name(table_name: str): conn = connect_to_db() if not conn: print("Failed to connect to the database") return cursor = conn.cursor() # Create the DROP TABLE query dynamically using the table_name argument drop_table_query = sql.SQL("DROP TABLE IF EXISTS {}").format(sql.Identifier(table_name)) try: cursor.execute(drop_table_query) conn.commit() print(f"Table '{table_name}' deleted successfully.") except Exception as e: conn.rollback() print(f"Error deleting table '{table_name}': {e}") finally: cursor.close() conn.close() # Function to create the table if it doesn't exist def create_table(): conn = connect_to_db() if not conn: return False cursor = conn.cursor() create_table_query = """ CREATE TABLE IF NOT EXISTS "stored_convoId_data" ( id SERIAL PRIMARY KEY, convo_id VARCHAR(255) NOT NULL, user_query JSONB NOT NULL, response JSONB NOT NULL, file_metadata JSONB, is_saved BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ # Create index for faster queries create_index_query = """ CREATE INDEX IF NOT EXISTS idx_convo_id_is_saved ON "stored_convoId_data" (convo_id, is_saved); """ try: cursor.execute(create_table_query) cursor.execute(create_index_query) conn.commit() print("Table 'stored_convoId_data' created successfully or already exists.") return True except Exception as e: print(f"Error creating table: {e}") conn.rollback() return False finally: cursor.close() conn.close() # Function to insert the data into PostgreSQL table def insert_convo_data(convo_id, data, is_saved=False): conn = connect_to_db() if not conn: return cursor = conn.cursor() insert_query = sql.SQL(""" INSERT INTO "stored_convoId_data" (convo_id, user_query, response, file_metadata, is_saved) VALUES (%s, %s, %s, %s, %s) """) # Prepare data to insert (use json.dumps for JSONB columns) user_query = json.dumps(data["user_query"]) response = json.dumps(data["response"]) file_metadata = json.dumps(data["response"]["artifacts"]) try: cursor.execute(insert_query, (convo_id, user_query, response, file_metadata, is_saved)) conn.commit() print(f"Data for convo_id {convo_id} inserted successfully with is_saved={is_saved}.") except Exception as e: print(f"Error inserting data: {e}") conn.rollback() finally: cursor.close() conn.close() # Function to update the is_saved flag def update_saved_status(convo_id, is_saved=True): conn = connect_to_db() if not conn: return False cursor = conn.cursor() update_query = """ UPDATE "stored_convoId_data" SET is_saved = %s, updated_at = CURRENT_TIMESTAMP WHERE convo_id = %s """ try: cursor.execute(update_query, (is_saved, convo_id)) conn.commit() if cursor.rowcount > 0: print(f"Updated {cursor.rowcount} record(s) for convo_id {convo_id} to is_saved={is_saved}.") return True else: print(f"No records found for convo_id {convo_id}.") return False except Exception as e: print(f"Error updating saved status: {e}") conn.rollback() return False finally: cursor.close() conn.close() # Function to retrieve data by convo_id only def get_convo_data(convo_id): conn = connect_to_db() if not conn: return None cursor = conn.cursor() select_query = """ SELECT convo_id, user_query, response, file_metadata, is_saved, created_at, updated_at FROM "stored_convoId_data" WHERE convo_id = %s ORDER BY created_at DESC """ try: cursor.execute(select_query, (convo_id,)) results = cursor.fetchall() if results: conversations = [] for result in results: stored_convo_id, user_query_json, response_json, file_metadata_json, is_saved, created_at, updated_at = result # JSONB columns are already returned as dict/list by psycopg2 user_query = user_query_json if isinstance(user_query_json, dict) else json.loads(user_query_json) response = response_json if isinstance(response_json, dict) else json.loads(response_json) file_metadata = file_metadata_json if isinstance(file_metadata_json, (dict, list)) else json.loads(file_metadata_json) conversations.append({ "convo_id": stored_convo_id, "user_query": user_query, "response": response, "file_metadata": file_metadata, "is_saved": is_saved, "created_at": str(created_at), "updated_at": str(updated_at) }) return conversations else: print(f"No data found for convo_id: {convo_id}") return None except Exception as e: print(f"Error retrieving data: {e}") return None finally: cursor.close() conn.close() # Function to retrieve data by convo_id and is_saved status def get_convo_data_by_saved_status(convo_id, is_saved=True): conn = connect_to_db() if not conn: return None cursor = conn.cursor() select_query = """ SELECT convo_id, user_query, response, file_metadata, is_saved, created_at, updated_at FROM "stored_convoId_data" WHERE convo_id = %s AND is_saved = %s ORDER BY created_at DESC """ try: cursor.execute(select_query, (convo_id, is_saved)) results = cursor.fetchall() if results: conversations = [] for result in results: stored_convo_id, user_query_json, response_json, file_metadata_json, is_saved_flag, created_at, updated_at = result user_query = user_query_json if isinstance(user_query_json, dict) else json.loads(user_query_json) response = response_json if isinstance(response_json, dict) else json.loads(response_json) file_metadata = file_metadata_json if isinstance(file_metadata_json, (dict, list)) else json.loads(file_metadata_json) conversations.append({ "convo_id": stored_convo_id, "user_query": user_query, "response": response, "file_metadata": file_metadata, "is_saved": is_saved_flag, "created_at": str(created_at), "updated_at": str(updated_at) }) return conversations else: print(f"No data found for convo_id: {convo_id} with is_saved={is_saved}") return None except Exception as e: print(f"Error retrieving data: {e}") return None finally: cursor.close() conn.close() # Function to get all saved conversations (across all convo_ids) def get_all_saved_conversations(): conn = connect_to_db() if not conn: return None cursor = conn.cursor() select_query = """ SELECT convo_id, user_query, response, file_metadata, is_saved, created_at, updated_at FROM "stored_convoId_data" WHERE is_saved = TRUE ORDER BY updated_at DESC """ try: cursor.execute(select_query) results = cursor.fetchall() if results: conversations = [] for result in results: stored_convo_id, user_query_json, response_json, file_metadata_json, is_saved, created_at, updated_at = result user_query = user_query_json if isinstance(user_query_json, dict) else json.loads(user_query_json) response = response_json if isinstance(response_json, dict) else json.loads(response_json) file_metadata = file_metadata_json if isinstance(file_metadata_json, (dict, list)) else json.loads(file_metadata_json) conversations.append({ "convo_id": stored_convo_id, "user_query": user_query, "response": response, "file_metadata": file_metadata, "is_saved": is_saved, "created_at": str(created_at), "updated_at": str(updated_at) }) return conversations else: print("No saved conversations found.") return None except Exception as e: print(f"Error retrieving saved conversations: {e}") return None finally: cursor.close() conn.close() # Sample data data = { "user_query": { "query_id": "12345", "text": "What is the weather like today?", "user_metadata": { "location": "New York", "language": "en" } }, "response": { "text": "The weather today in New York is sunny and 75°F. Would you like to see a detailed report or a forecast?", "status": "success", "response_time": "2025-10-13T08:10:00Z", "duration": "2s", "artifacts": [ { "artifact_id": "artifact_1", "file_id": "file_1", "file_name": "weather_report.pdf", "file_type": "pdf", "file_size": 1024, "file_url": "path_to_file", "upload_timestamp": "2025-10-13T08:00:00Z", "metadata": { "created_by": "system", "associated_query_id": "12345", "associated_session_id": "session_001", "artifact_timestamp": "2025-10-13T08:10:00Z" } }, { "artifact_id": "artifact_2", "file_id": "file_2", "file_name": "weather_forecast_image.jpg", "file_type": "jpg", "file_size": 2048, "file_url": "path_to_image", "upload_timestamp": "2025-10-13T08:05:00Z", "metadata": { "created_by": "system", "associated_query_id": "12345", "associated_session_id": "session_001", "artifact_timestamp": "2025-10-13T08:10:00Z" } } ] }, "metadata": { "query_response_metadata": { "response_time": "2025-10-13T08:10:00Z", "response_status": "success", "response_duration": "2s" }, "file_metadata": [ { "file_id": "file_1", "file_name": "weather_report.pdf", "file_size": 1024, "upload_timestamp": "2025-10-13T08:00:00Z" }, { "file_id": "file_2", "file_name": "weather_forecast_image.jpg", "file_size": 2048, "upload_timestamp": "2025-10-13T08:05:00Z" } ] } } def get_all_convo_data_filtered(is_saved=None): """ Retrieve all conversations across all convo_ids. - If is_saved=True → returns only saved conversations - If is_saved=False → returns only unsaved conversations - If is_saved=None → returns all conversations """ conn = connect_to_db() if not conn: return None cursor = conn.cursor() # Build the query dynamically if is_saved is None: select_query = """ SELECT convo_id, user_query, response, file_metadata, is_saved, created_at, updated_at FROM "stored_convoId_data" ORDER BY updated_at DESC """ params = () else: select_query = """ SELECT convo_id, user_query, response, file_metadata, is_saved, created_at, updated_at FROM "stored_convoId_data" WHERE is_saved = %s ORDER BY updated_at DESC """ params = (is_saved,) try: cursor.execute(select_query, params) results = cursor.fetchall() if not results: print(f"No conversations found with is_saved={is_saved}") return None conversations = [] for result in results: stored_convo_id, user_query_json, response_json, file_metadata_json, is_saved_flag, created_at, updated_at = result user_query = user_query_json if isinstance(user_query_json, dict) else json.loads(user_query_json) response = response_json if isinstance(response_json, dict) else json.loads(response_json) file_metadata = file_metadata_json if isinstance(file_metadata_json, (dict, list)) else json.loads(file_metadata_json) conversations.append({ "convo_id": stored_convo_id, "user_query": user_query, "response": response, "file_metadata": file_metadata, "is_saved": is_saved_flag, "created_at": str(created_at), "updated_at": str(updated_at) }) return conversations except Exception as e: print(f"Error retrieving conversations: {e}") return None finally: cursor.close() conn.close() if __name__ == "__main__": # Step 1: Create the table #deleting the table print("Step none: Deleting table...") delete_table_by_name("stored_convoId_data") # Deletes the 'stored_convoId_data' table print("Table 'stored_convoId_data' deleted successfully or does not exist.") print("Step 1: Creating table...") if create_table(): # Step 2: Insert data with is_saved=False print("\nStep 2: Inserting data with is_saved=False...") insert_convo_data("12345", data, is_saved=False) # Step 3: Insert another conversation with is_saved=True print("\nStep 3: Inserting another conversation with is_saved=True...") insert_convo_data("67890", data, is_saved=True) # Step 4: Retrieve all data by convo_id print("\n" + "="*60) print("Step 4: Retrieving all data for convo_id='12345'...") print("="*60) convo_data = get_convo_data("12345") if convo_data: print(json.dumps(convo_data, indent=2)) # Step 5: Update is_saved status print("\n" + "="*60) print("Step 5: Updating is_saved status to True for convo_id='12345'...") print("="*60) update_saved_status("12345", is_saved=True) # Step 6: Retrieve saved conversations for specific convo_id print("\n" + "="*60) print("Step 6: Retrieving saved conversations for convo_id='12345'...") print("="*60) saved_convo = get_convo_data_by_saved_status("12345", is_saved=True) if saved_convo: print(json.dumps(saved_convo, indent=2)) # Step 7: Retrieve all saved conversations print("\n" + "="*60) print("Step 7: Retrieving ALL saved conversations...") print("="*60) all_saved = get_all_saved_conversations() if all_saved: print(f"Found {len(all_saved)} saved conversation(s):") print(json.dumps(all_saved, indent=2)) else: print("Failed to create table. Exiting...") # Step 8: Retrieve all conversations # Get all saved conversations print("\n" + "="*60) saved_convos = get_all_convo_data_filtered(is_saved=True) if saved_convos: print(f"Found {len(saved_convos)} saved conversation(s):") print(json.dumps(saved_convos, indent=2)) print("="*60) # Get all unsaved conversations unsaved_convos = get_all_convo_data_filtered(is_saved=False) if unsaved_convos: print(f"Found {len(unsaved_convos)} unsaved conversation(s):") print(json.dumps(unsaved_convos, indent=2)) print("="*60) # Get all conversations (both saved and unsaved) all_convos = get_all_convo_data_filtered(is_saved=None) if all_convos: print(f"Found {len(all_convos)} conversation(s):") print(json.dumps(all_convos, indent=2))