import base64 import cv2 import glob import json import math import os import pytz import re import time import zipfile import asyncio import streamlit as st import streamlit.components.v1 as components from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm import concurrent # Foundational Imports from audio_recorder_streamlit import audio_recorder from bs4 import BeautifulSoup from collections import deque from datetime import datetime from dotenv import load_dotenv from gradio_client import Client from io import BytesIO from moviepy.editor import VideoFileClip from PIL import Image from PyPDF2 import PdfReader # OpenAI & Data Handling import openai from openai import OpenAI import pandas as pd # Load environment variables load_dotenv() # --- Core Classes for Functionality --- class PerformanceTracker: """Tracks and displays the performance of executed tasks.""" def track(self, model_name_provider): # โฑ๏ธ Times our functions and brags about how fast they are. def decorator(func): def wrapper(*args, **kwargs): start_time = time.time() # Execute the function in a thread pool for non-blocking UI with ThreadPoolExecutor() as executor: future = executor.submit(func, *args, **kwargs) result = future.result() # Wait for the function to complete end_time = time.time() duration = end_time - start_time model_used = model_name_provider() if callable(model_name_provider) else model_name_provider st.success(f"โœ… **Execution Complete!**") st.info(f"Model: `{model_used}` | Runtime: `{duration:.2f} seconds`") return result return wrapper return decorator class FileHandler: """Manages all file system operations like naming, saving, and zipping.""" def __init__(self, should_save=True): # ๐Ÿ—‚๏ธ I'm the librarian for all your digital stuff. self.should_save = should_save self.central_tz = pytz.timezone('US/Central') def generate_filename(self, prompt, file_type, original_name=None): # ๐Ÿท๏ธ Slapping a unique, SFW name on your file so you can find it later. safe_date_time = datetime.now(self.central_tz).strftime("%m%d_%H%M") safe_prompt = re.sub(r'[<>:"/\\|?*\n]', ' ', prompt).strip()[:50] file_stem = f"{safe_date_time}_{safe_prompt}" if original_name: base_name = os.path.splitext(original_name)[0] file_stem = f"{file_stem}_{base_name}" return f"{file_stem[:100]}.{file_type}" def save_file(self, content, filename, prompt=None): # ๐Ÿ’พ Saving your masterpiece before you accidentally delete it. if not self.should_save: return None with open(filename, "w", encoding="utf-8") as f: if prompt: f.write(prompt + "\n\n") f.write(content) return filename def save_uploaded_file(self, uploaded_file): # ๐Ÿ“ฅ Taking your uploaded file and tucking it safely on the server. path = os.path.join(uploaded_file.name) with open(path, "wb") as f: f.write(uploaded_file.getvalue()) return path def create_zip_archive(self, files_to_zip): # ๐Ÿค Zipping up your files nice and tight. zip_path = "Filtered_Files.zip" with zipfile.ZipFile(zip_path, 'w') as zipf: for file in files_to_zip: zipf.write(file) return zip_path @st.cache_data def get_base64_download_link(_self, file_path, link_text, mime_type): # ๐Ÿ”— Creating a magical link to download your file. with open(file_path, 'rb') as f: data = f.read() b64 = base64.b64encode(data).decode() return f'{link_text}' class OpenAIProcessor: """Handles all interactions with the OpenAI API.""" def __init__(self, api_key, org_id, model): # ๐Ÿค– I'm the brainiac talking to the OpenAI overlords. self.client = OpenAI(api_key=api_key, organization=org_id) self.model = model def execute_text_completion(self, messages): # โœ๏ธ Turning your prompts into pure AI gold. completion = self.client.chat.completions.create( model=self.model, messages=[{"role": m["role"], "content": m["content"]} for m in messages], stream=False ) return completion.choices[0].message.content def execute_image_completion(self, prompt, image_bytes): # ๐Ÿ–ผ๏ธ Analyzing your pics with my digital eyeballs. base64_image = base64.b64encode(image_bytes).decode("utf-8") response = self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": "You are a helpful assistant that responds in Markdown."}, {"role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} ]} ], temperature=0.0 ) return response.choices[0].message.content def execute_video_completion(self, frames, transcript): # ๐ŸŽฌ Watching your video and giving you the summary, so you don't have to. response = self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": "Summarize the video and its transcript in Markdown."}, {"role": "user", "content": [ "Video frames:", *map(lambda x: {"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{x}"}}, frames), {"type": "text", "text": f"Transcription: {transcript}"} ]} ] ) return response.choices[0].message.content def transcribe_audio(self, audio_bytes): # ๐ŸŽค I'm all ears... turning your sounds into words. try: transcription = self.client.audio.transcriptions.create( model="whisper-1", file=BytesIO(audio_bytes) ) return transcription.text except openai.BadRequestError as e: st.error(f"Audio processing error: {e}") return None class MediaProcessor: """Handles processing of media files like video and audio.""" def extract_video_components(self, video_path, seconds_per_frame=2): # โœ‚๏ธ Chopping up your video into frames and snatching the audio. base64Frames = [] video = cv2.VideoCapture(video_path) total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) fps = video.get(cv2.CAP_PROP_FPS) frames_to_skip = int(fps * seconds_per_frame) curr_frame = 0 while curr_frame < total_frames - 1: video.set(cv2.CAP_PROP_POS_FRAMES, curr_frame) success, frame = video.read() if not success: break _, buffer = cv2.imencode(".jpg", frame) base64Frames.append(base64.b64encode(buffer).decode("utf-8")) curr_frame += frames_to_skip video.release() audio_path = f"{os.path.splitext(video_path)[0]}.mp3" try: clip = VideoFileClip(video_path) if clip.audio: clip.audio.write_audiofile(audio_path, bitrate="32k") else: audio_path = None except Exception: audio_path = None return base64Frames, audio_path class RAGManager: """Manages Retrieval-Augmented Generation processes.""" def __init__(self, openai_client): # ๐Ÿ“š Building a library and then acing the open-book test. self.client = openai_client def create_vector_store(self, name): # ๐Ÿ—„๏ธ Creating a shiny new digital filing cabinet. vector_store = self.client.vector_stores.create(name=name) return vector_store.id # ... Other RAG methods would go here ... class ExternalAPIHandler: """Handles calls to external APIs like ArXiv.""" def search_arxiv(self, query): # ๐Ÿ‘จโ€๐Ÿ”ฌ Pestering the digital librarians at ArXiv for juicy papers. client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") response = client.predict( message=query, llm_results_use=5, database_choice="Semantic Search", llm_model_picked="mistralai/Mistral-7B-Instruct-v0.2", api_name="/update_with_rag_md" ) return response[0] + response[1] # --- Streamlit UI Class --- class StreamlitUI: """Main class to build and run the Streamlit user interface.""" def __init__(self): # ๐ŸŽจ I'm the artist painting your beautiful web app. self.setup_page() self.initialize_state() # Initialize helper classes self.file_handler = FileHandler(should_save=st.session_state.should_save) self.openai_processor = OpenAIProcessor( api_key=os.getenv('OPENAI_API_KEY'), org_id=os.getenv('OPENAI_ORG_ID'), model=st.session_state.openai_model ) self.media_processor = MediaProcessor() self.external_api_handler = ExternalAPIHandler() # Initialize performance tracker global performance_tracker performance_tracker = PerformanceTracker() def setup_page(self): # โœจ Setting the stage for our amazing app. st.set_page_config( page_title="๐Ÿ”ฌ๐Ÿง ScienceBrain.AI", page_icon=Image.open("icons.ico"), layout="wide", initial_sidebar_state="auto", menu_items={ 'Get Help': 'https://huggingface.co/awacke1', 'Report a bug': 'https://huggingface.co/spaces/awacke1', 'About': "๐Ÿ”ฌ๐Ÿง ScienceBrain.AI" } ) def initialize_state(self): # ๐Ÿ“ Keeping notes so we don't forget stuff between clicks. if "openai_model" not in st.session_state: st.session_state.openai_model = "gpt-4o-2024-05-13" if "messages" not in st.session_state: st.session_state.messages = [] def display_sidebar(self): # ๐Ÿ‘ˆ Everything you see on the left? That's me. st.sidebar.title("Configuration & Files") st.session_state.should_save = st.sidebar.checkbox("๐Ÿ’พ Save Session", value=True) if st.sidebar.button("๐Ÿ—‘๏ธ Clear Chat History"): st.session_state.messages = [] st.rerun() st.sidebar.markdown("---") # File management logic here... def display_main_interface(self): # ๐Ÿ–ฅ๏ธ This is the main event, the star of the show! st.markdown("##### GPT-4o Omni: Text, Audio, Image, Video & RAG") model_options = ["gpt-4o-2024-05-13", "gpt-3.5-turbo"] st.session_state.openai_model = st.selectbox( "Select OpenAI Model", model_options, index=model_options.index(st.session_state.openai_model) ) input_type = st.selectbox("Select Input Type", ("Text", "Image", "Audio", "Video", "ArXiv Search", "RAG PDF Gallery")) if input_type == "Text": self.handle_text_input() elif input_type == "Image": self.handle_image_input() elif input_type == "Video": self.handle_video_input() elif input_type == "ArXiv Search": self.handle_arxiv_search() # ... other handlers def handle_text_input(self): # ๐Ÿ’ฌ You talk, I listen (and then make the AI talk back). prompt = st.text_input("Enter your text prompt:", key="text_prompt") if st.button("Submit Text", key="submit_text"): if prompt: st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) with st.chat_message("assistant"): with st.spinner("Thinking..."): # Use the performance tracker decorator @performance_tracker.track(lambda: self.openai_processor.model) def run_completion(): return self.openai_processor.execute_text_completion(st.session_state.messages) response = run_completion() st.markdown(response) st.session_state.messages.append({"role": "assistant", "content": response}) filename = self.file_handler.generate_filename(prompt, "md") self.file_handler.save_file(response, filename, prompt=prompt) st.rerun() def handle_image_input(self): # ๐Ÿ“ธ Say cheese! Let's see what the AI thinks of your photo. prompt = st.text_input("Enter a prompt for the image:", value="Describe this image in detail.") uploaded_image = st.file_uploader("Upload an image:", type=["png", "jpg", "jpeg"]) if st.button("Submit Image") and uploaded_image and prompt: with st.chat_message("user"): st.image(uploaded_image, width=250) st.markdown(prompt) with st.chat_message("assistant"): with st.spinner("Analyzing image..."): image_bytes = uploaded_image.getvalue() @performance_tracker.track(lambda: self.openai_processor.model) def run_image_analysis(): return self.openai_processor.execute_image_completion(prompt, image_bytes) response = run_image_analysis() st.markdown(response) filename = self.file_handler.generate_filename(prompt, "md", original_name=uploaded_image.name) self.file_handler.save_file(response, filename, prompt=prompt) st.rerun() def handle_video_input(self): # ๐Ÿ“ผ Roll the tape! Time to process that video. prompt = st.text_input("Enter a prompt for the video:", value="Summarize the key events in this video.") uploaded_video = st.file_uploader("Upload a video:", type=["mp4", "mov"]) if st.button("Submit Video") and uploaded_video and prompt: with st.chat_message("user"): st.markdown(f"Analyzing video: `{uploaded_video.name}` with prompt: `{prompt}`") with st.chat_message("assistant"): with st.spinner("Processing video... this may take a moment."): video_path = self.file_handler.save_uploaded_file(uploaded_video) @performance_tracker.track(lambda: self.openai_processor.model) def run_video_analysis(): frames, audio_path = self.media_processor.extract_video_components(video_path) transcript = "No audio found." if audio_path: with open(audio_path, "rb") as af: transcript = self.openai_processor.transcribe_audio(af.read()) return self.openai_processor.execute_video_completion(frames, transcript) response = run_video_analysis() st.markdown(response) filename = self.file_handler.generate_filename(prompt, "md", original_name=uploaded_video.name) self.file_handler.save_file(response, filename, prompt=prompt) st.rerun() def handle_arxiv_search(self): # ๐Ÿ”ฌ Diving deep into the archives of science! query = st.text_input("Search ArXiv for scholarly articles:") if st.button("Search ArXiv") and query: with st.chat_message("user"): st.markdown(f"ArXiv Search: `{query}`") with st.chat_message("assistant"): with st.spinner("Searching ArXiv..."): @performance_tracker.track("Mistral-7B-Instruct-v0.2") # Model is fixed for this endpoint def run_arxiv_search(): return self.external_api_handler.search_arxiv(query) response = run_arxiv_search() st.markdown(response) st.session_state.messages.append({"role": "assistant", "content": response}) filename = self.file_handler.generate_filename(query, "md") self.file_handler.save_file(response, filename, prompt=query) st.rerun() def display_chat_history(self): # ๐Ÿ“œ Let's review what we've talked about so far. for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) def run(self): # โ–ถ๏ธ Lights, camera, action! Let's get this show on the road. self.display_sidebar() self.display_chat_history() self.display_main_interface() # --- Main Execution --- if __name__ == "__main__": app = StreamlitUI() app.run()