Spaces:
Sleeping
Sleeping
| # requirements.txt dependencies: | |
| # fastapi | |
| # uvicorn[standard] | |
| # langchain-qdrant | |
| # langchain-openai | |
| # langchain-community | |
| # qdrant-client | |
| # fastembed | |
| # langextract | |
| # python-multipart | |
| # python-dotenv | |
| # pypdf | |
| # pandas | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, APIRouter, Form | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, APIRouter, Form, Query | |
| from fastapi.responses import JSONResponse | |
| from typing import Optional, List, Dict, Any | |
| from pydantic import BaseModel | |
| import tempfile | |
| import os | |
| from dotenv import load_dotenv | |
| from langchain_qdrant import QdrantVectorStore, RetrievalMode, FastEmbedSparse | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_core.documents import Document | |
| from langchain_community.document_loaders import ( | |
| PyPDFLoader, | |
| CSVLoader, | |
| TextLoader, | |
| UnstructuredWordDocumentLoader, | |
| UnstructuredExcelLoader | |
| ) | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import Distance, VectorParams | |
| import langextract as lx | |
| from retrieve_secret import * | |
| load_dotenv() | |
| # Configuration | |
| # QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333") | |
| # QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", None) | |
| # OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| # Initialize embeddings | |
| embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY) | |
| sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25") | |
| # Initialize Qdrant client | |
| # print(QDRANT_URL , " " ) | |
| # print(QDRANT_API_KEY) | |
| # # client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY) | |
| # if not client: | |
| # raise Exception("❌ Failed to connect to Qdrant.") | |
| # client = QdrantClient( | |
| # host="localhost", | |
| # port="6333" | |
| # ) | |
| client = QdrantClient( | |
| host=QDRANT_HOST, | |
| port=QDRANT_PORT | |
| ) | |
| # Option 2: Using get_collections() | |
| try: | |
| collections = client.get_collections() | |
| print("✅ Connected to Qdrant. Collections:", collections) | |
| except Exception as e: | |
| raise Exception(f"❌ Failed to connect to Qdrant: {e}") | |
| # Option 3: Simple connection test | |
| # FastAPI Qdrant_router | |
| # Routers | |
| # = APIRouter(prefix="/collections", tags=["Collections"]) | |
| # Qdrant_router = APIRouter(prefix="/documents", tags=["Documents"]) | |
| # Qdrant_router = APIRouter(prefix="/search", tags=["Search"]) | |
| # Qdrant_router = APIRouter(prefix="/qdrant",tags=["Qdrant_Collections"]) | |
| Qdrant_router = APIRouter(prefix="/qdrant", tags=["Qdrant_Collections"]) | |
| # ==================== PYDANTIC MODELS ==================== | |
| class CollectionCreate(BaseModel): | |
| name: str | |
| vector_size: int = 1536 | |
| distance: str = "Cosine" | |
| metadata: Optional[Dict[str, Any]] = None | |
| class CollectionUpdate(BaseModel): | |
| metadata: Optional[Dict[str, Any]] = None | |
| class DocumentCreate(BaseModel): | |
| content: str | |
| metadata: Optional[Dict[str, Any]] = None | |
| class SearchQuery(BaseModel): | |
| query: str | |
| collection_name: str | |
| k: int = 3 | |
| filter: Optional[Dict[str, Any]] = None | |
| # ==================== HELPER FUNCTIONS ==================== | |
| def get_file_loader(file_path: str, file_type: str): | |
| """Return Qdrant_routerropriate document loader based on file type""" | |
| loaders = { | |
| 'pdf': PyPDFLoader, | |
| 'csv': CSVLoader, | |
| 'txt': TextLoader, | |
| 'docx': UnstructuredWordDocumentLoader, | |
| 'doc': UnstructuredWordDocumentLoader, | |
| 'xlsx': UnstructuredExcelLoader, | |
| 'xls': UnstructuredExcelLoader | |
| } | |
| loader_class = loaders.get(file_type.lower()) | |
| if not loader_class: | |
| raise ValueError(f"Unsupported file type: {file_type}") | |
| return loader_class(file_path) | |
| def extract_with_langextract(file_path: str) -> List[Document]: | |
| """Use LangExtract for advanced PDF extraction""" | |
| # Load PDF content | |
| loader = PyPDFLoader(file_path) | |
| pages = loader.load() | |
| documents = [] | |
| for page in pages: | |
| prompt = """ | |
| Extract key entities, concepts, and relationships from the text. | |
| Focus on important information, facts, and contextual details. | |
| """ | |
| try: | |
| result = lx.extract( | |
| text_or_documents=page.page_content, | |
| prompt_description=prompt, | |
| model_id="gpt-4o-mini", | |
| api_key=OPENAI_API_KEY, | |
| fence_output=True, | |
| use_schema_constraints=False | |
| ) | |
| # Create enhanced document with extractions | |
| metadata = page.metadata.copy() | |
| metadata['extractions'] = [ | |
| { | |
| 'class': e.extraction_class, | |
| 'text': e.extraction_text, | |
| 'attributes': e.attributes or {} | |
| } | |
| for e in result.extractions | |
| ] | |
| documents.Qdrant_routerend(Document( | |
| page_content=page.page_content, | |
| metadata=metadata | |
| )) | |
| except Exception as e: | |
| print(f"LangExtract failed for page, using raw content: {e}") | |
| documents.Qdrant_routerend(page) | |
| return documents | |
| # ==================== COLLECTION CRUD ENDPOINTS ==================== | |
| async def create_collection(collection: CollectionCreate): | |
| """Create a new collection with optional metadata""" | |
| try: | |
| # Check if collection exists | |
| collections = client.get_collections().collections | |
| if any(c.name == collection.name for c in collections): | |
| raise HTTPException(status_code=400, detail=f"Collection '{collection.name}' already exists") | |
| # Map distance metric | |
| distance_map = { | |
| "Cosine": Distance.COSINE, | |
| "Euclid": Distance.EUCLID, | |
| "Dot": Distance.DOT | |
| } | |
| # Create collection with initial empty documents | |
| initial_doc = Document( | |
| page_content="Initial placeholder document", | |
| metadata=collection.metadata or {} | |
| ) | |
| qdrant = QdrantVectorStore.from_documents( | |
| [initial_doc], | |
| embedding=embeddings, | |
| sparse_embedding=sparse_embeddings, | |
| url=QDRANT_URL, | |
| api_key=QDRANT_API_KEY, | |
| collection_name=collection.name, | |
| retrieval_mode=RetrievalMode.HYBRID, | |
| ) | |
| return { | |
| "message": f"Collection '{collection.name}' created successfully", | |
| "collection_name": collection.name, | |
| "metadata": collection.metadata | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def list_collections(): | |
| """List all collections""" | |
| print("** LISTING ALL COLLECTIONS **") | |
| try: | |
| collections = client.get_collections().collections | |
| print("collections", collections) | |
| return { | |
| "collections": [ | |
| { | |
| "name": c.name, | |
| "vectors_count": client.get_collection(c.name).points_count | |
| } | |
| for c in collections | |
| ] | |
| } | |
| except Exception as e: | |
| print("this is excepetion ",e) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_collection_info(collection_name: str): | |
| """Get detailed information about a collection""" | |
| try: | |
| info = client.get_collection(collection_name) | |
| return { | |
| "name": collection_name, | |
| "points_count": info.points_count, | |
| "vectors_count": info.vectors_count, | |
| "status": info.status.value, | |
| "config": { | |
| "params": str(info.config.params) | |
| } | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=404, detail=f"Collection '{collection_name}' not found") | |
| async def update_collection_metadata(collection_name: str, update: CollectionUpdate): | |
| """Update collection metadata (note: this updates point metadata, not collection config)""" | |
| try: | |
| # Verify collection exists | |
| client.get_collection(collection_name) | |
| return { | |
| "message": f"Collection '{collection_name}' metadata update acknowledged", | |
| "note": "To update point metadata, use document update endpoints", | |
| "metadata": update.metadata | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=404, detail=f"Collection '{collection_name}' not found") | |
| async def delete_collection(collection_name: str): | |
| """Delete a collection""" | |
| try: | |
| client.delete_collection(collection_name) | |
| return {"message": f"Collection '{collection_name}' deleted successfully"} | |
| except Exception as e: | |
| raise HTTPException(status_code=404, detail=f"Collection '{collection_name}' not found") | |
| # ==================== DOCUMENT CRUD ENDPOINTS ==================== | |
| async def add_text_document(collection_name: str, doc: DocumentCreate): | |
| """Add a text document to a collection with metadata""" | |
| try: | |
| qdrant = QdrantVectorStore.from_existing_collection( | |
| embedding=embeddings, | |
| collection_name=collection_name, | |
| url=QDRANT_URL, | |
| api_key=QDRANT_API_KEY, | |
| ) | |
| document = Document( | |
| page_content=doc.content, | |
| metadata=doc.metadata or {} | |
| ) | |
| ids = qdrant.add_documents([document]) | |
| return { | |
| "message": "Document added successfully", | |
| "document_ids": ids, | |
| "metadata": doc.metadata | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def upload_file_to_collection( | |
| collection_name: str, | |
| file: UploadFile = File(...), | |
| use_langextract: bool = Form(False), | |
| metadata: Optional[str] = Form(None) | |
| ): | |
| """ | |
| Upload and process a file (PDF, CSV, TXT, DOCX, XLSX) to a collection. | |
| For PDFs, optionally use LangExtract for advanced entity extraction. | |
| """ | |
| try: | |
| # Parse metadata if provided | |
| import json | |
| file_metadata = json.loads(metadata) if metadata else {} | |
| # Get file extension | |
| file_ext = file.filename.split('.')[-1].lower() | |
| # Save uploaded file temporarily | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f'.{file_ext}') as tmp: | |
| content = await file.read() | |
| tmp.write(content) | |
| tmp_path = tmp.name | |
| try: | |
| # Process based on file type | |
| if file_ext == 'pdf' and use_langextract: | |
| documents = extract_with_langextract(tmp_path) | |
| else: | |
| loader = get_file_loader(tmp_path, file_ext) | |
| documents = loader.load() | |
| # Add file metadata to all documents | |
| for doc in documents: | |
| doc.metadata.update({ | |
| 'source_file': file.filename, | |
| 'file_type': file_ext, | |
| **file_metadata | |
| }) | |
| # Add to Qdrant | |
| qdrant = QdrantVectorStore.from_existing_collection( | |
| embedding=embeddings, | |
| collection_name=collection_name, | |
| url=QDRANT_URL, | |
| api_key=QDRANT_API_KEY, | |
| ) | |
| ids = qdrant.add_documents(documents) | |
| return { | |
| "message": f"File '{file.filename}' processed successfully", | |
| "file_type": file_ext, | |
| "documents_added": len(documents), | |
| "document_ids": ids, | |
| "used_langextract": use_langextract and file_ext == 'pdf' | |
| } | |
| finally: | |
| # Clean up temp file | |
| os.unlink(tmp_path) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def add_batch_documents(collection_name: str, docs: List[DocumentCreate]): | |
| """Add multiple text documents at once""" | |
| try: | |
| qdrant = QdrantVectorStore.from_existing_collection( | |
| embedding=embeddings, | |
| collection_name=collection_name, | |
| url=QDRANT_URL, | |
| api_key=QDRANT_API_KEY, | |
| ) | |
| documents = [ | |
| Document(page_content=doc.content, metadata=doc.metadata or {}) | |
| for doc in docs | |
| ] | |
| ids = qdrant.add_documents(documents) | |
| return { | |
| "message": f"Added {len(documents)} documents successfully", | |
| "document_ids": ids | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_document_count(collection_name: str): | |
| """Get the number of documents in a collection""" | |
| try: | |
| info = client.get_collection(collection_name) | |
| return { | |
| "collection_name": collection_name, | |
| "document_count": info.points_count | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=404, detail=f"Collection '{collection_name}' not found") | |
| # ==================== SEARCH ENDPOINTS ==================== | |
| class QueryRequest(BaseModel): | |
| query: str | |
| top_k: Optional[int] = 2 | |
| class QueryResponseItem(BaseModel): | |
| content: str | |
| metadata: dict | |
| class QueryResponse(BaseModel): | |
| collection: str | |
| mode: str | |
| results: List[QueryResponseItem] | |
| # --- Utility function --- | |
| def get_vector_store(collection_name: str, mode: str): | |
| """Return a QdrantVectorStore for the given collection and mode.""" | |
| if mode == "dense": | |
| return QdrantVectorStore( | |
| client=client, | |
| collection_name=collection_name, | |
| embedding=embeddings, | |
| retrieval_mode=RetrievalMode.DENSE, | |
| vector_name="", # unnamed dense vector | |
| ) | |
| elif mode == "sparse": | |
| return QdrantVectorStore( | |
| client=client, | |
| collection_name=collection_name, | |
| sparse_embedding=sparse_embeddings, | |
| retrieval_mode=RetrievalMode.SPARSE, | |
| sparse_vector_name="langchain-sparse", | |
| ) | |
| elif mode == "hybrid": | |
| return QdrantVectorStore( | |
| client=client, | |
| collection_name=collection_name, | |
| embedding=embeddings, | |
| sparse_embedding=sparse_embeddings, | |
| retrieval_mode=RetrievalMode.HYBRID, | |
| vector_name="", | |
| sparse_vector_name="langchain-sparse", | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid mode. Choose one of: dense, sparse, hybrid.", | |
| ) | |
| def search( | |
| request: QueryRequest, | |
| collection_name: str = Query(..., description="Name of the Qdrant collection"), | |
| mode: str = Query(..., regex="^(dense|sparse|hybrid)$", description="Retrieval mode"), | |
| ): | |
| """Perform a similarity search using the specified collection and mode.""" | |
| try: | |
| # Create vector store | |
| vector_store = get_vector_store(collection_name, mode) | |
| # Perform search | |
| results = vector_store.similarity_search(request.query, k=request.top_k) | |
| response_items = [ | |
| QueryResponseItem(content=doc.page_content, metadata=doc.metadata or {}) | |
| for doc in results | |
| ] | |
| return QueryResponse(collection=collection_name, mode=mode, results=response_items) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") | |
| async def health_check(): | |
| """Check if Qdrant connection is healthy""" | |
| try: | |
| collections = client.get_collections() | |
| return { | |
| "status": "healthy", | |
| "qdrant_url": QDRANT_URL, | |
| "collections_count": len(collections.collections) | |
| } | |
| except Exception as e: | |
| return JSONResponse( | |
| status_code=503, | |
| content={"status": "unhealthy", "error": str(e)} | |
| ) | |