Spaces:
Running
Running
| # This modules handles the task queue, results, and leaderboard storage. | |
| import json | |
| import uuid | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional | |
| import asyncio | |
| import pandas as pd | |
| from inference import evaluate_model | |
| # Get absolute path | |
| CURRENT_DIR = Path(__file__).parent.absolute() | |
| # Constants | |
| QUEUE_DIR = CURRENT_DIR / "queue" | |
| PATHS = { | |
| "tasks": QUEUE_DIR / "tasks.json", | |
| "results": QUEUE_DIR / "results.json", | |
| "leaderboard": QUEUE_DIR / "leaderboard.json", | |
| } | |
| # Handle storing and loading data from JSON files | |
| class StorageManager: | |
| """Handles all JSON storage operations""" | |
| def __init__(self, paths: dict[str, Path]): | |
| self.paths = paths | |
| self._ensure_directories() | |
| def _ensure_directories(self): | |
| """Ensure all necessary directories and files exist""" | |
| for path in self.paths.values(): | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| if not path.exists(): | |
| path.write_text("[]") | |
| def load(self, key: str) -> list: | |
| """Load JSON file""" | |
| return json.loads(self.paths[key].read_text()) | |
| def save(self, key: str, data: list): | |
| """Save data to JSON file""" | |
| self.paths[key].write_text( | |
| json.dumps(data, indent=4, default=str, ensure_ascii=False) | |
| ) | |
| def update_task(self, task_id: str, updates: dict): | |
| """Update specific task with new data""" | |
| tasks = self.load("tasks") | |
| for task in tasks: | |
| if task["id"] == task_id: | |
| task.update(updates) | |
| break | |
| self.save("tasks", tasks) | |
| # Initialize storage manager | |
| storage_manager = StorageManager(PATHS) | |
| # Export external functions | |
| def get_leaderboard_data(): | |
| """Return leaderboard data as DataFrame""" | |
| try: | |
| return pd.DataFrame(storage_manager.load("leaderboard")) | |
| except Exception as e: | |
| print(f"Error loading leaderboard: {e}") | |
| return pd.DataFrame() | |
| def get_results(): | |
| """Return list of evaluation results""" | |
| return storage_manager.load("results") | |
| def get_tasks(): | |
| """Return list of tasks""" | |
| return storage_manager.load("tasks") | |
| def get_status(query: str) -> dict: | |
| """Check status of a model evaluation task_id or model_name""" | |
| if not query: | |
| return {"error": "Please enter a model name or task ID"} | |
| try: | |
| results = get_results() | |
| tasks = get_tasks() | |
| # First try to find by task ID | |
| result = next((r for r in results if r["task_id"] == query), None) | |
| task = next((t for t in tasks if t["id"] == query), None) | |
| # If not found, try to find by model name | |
| if not result: | |
| result = next((r for r in results if r["model"] == query), None) | |
| if not task: | |
| task = next((t for t in tasks if t["model"] == query), None) | |
| if result: | |
| # If we found results, return them | |
| return { | |
| "status": "completed", | |
| "model": result["model"], | |
| "subset": result["subset"], | |
| "num_files": result["num_files"], | |
| "average_per": result["average_per"], | |
| "average_pwed": result["average_pwed"], | |
| "detailed_results": result["detailed_results"], | |
| "timestamp": result["timestamp"], | |
| } | |
| elif task: | |
| # If we only found task status, return that | |
| return task | |
| else: | |
| return {"error": f"No results found for '{query}'"} | |
| except Exception as e: | |
| print(f"Error checking status: {e}") | |
| return {"error": f"Error checking status: {str(e)}"} | |
| def start_eval_task( | |
| model_name: str, submission_name: str, github_url: Optional[str] = None | |
| ) -> str: | |
| """Start evaluation task in background. Returns task ID that can be used to check status.""" | |
| # Generate a task ID | |
| task_id = str(uuid.uuid4()) | |
| # Create task entry | |
| task = { | |
| "id": task_id, | |
| "model": model_name, | |
| "subset": "test", | |
| "submission_name": submission_name, | |
| "github_url": github_url, | |
| "status": "queued", | |
| "submitted_at": datetime.now().isoformat(), | |
| } | |
| # Save task | |
| tasks = storage_manager.load("tasks") | |
| tasks.append(task) | |
| storage_manager.save("tasks", tasks) | |
| # Start evaluation in background | |
| asyncio.run(_eval_task(task_id, model_name, submission_name, "test", github_url)) | |
| return task_id | |
| async def _eval_task( | |
| task_id: str, | |
| model_name: str, | |
| submission_name: str, | |
| subset: str = "test", | |
| github_url: Optional[str] = None, | |
| max_samples: Optional[int] = None, | |
| ): | |
| """Background task to evaluate model and save updated results""" | |
| try: | |
| # Indicate task is processing | |
| storage_manager.update_task(task_id, {"status": "processing"}) | |
| # Evaluate model | |
| result = evaluate_model(model_name, subset, max_samples) | |
| avg_per = result["average_per"] | |
| avg_pwed = result["average_pwed"] | |
| # Save results | |
| print("Saving results...") | |
| current_results = storage_manager.load("results") | |
| current_results.append(result) | |
| storage_manager.save("results", current_results) | |
| # Update leaderboard | |
| print("Updating leaderboard...") | |
| leaderboard = storage_manager.load("leaderboard") | |
| entry = next( | |
| (e for e in leaderboard if e["submission_name"] == submission_name), | |
| None, | |
| ) | |
| if entry: | |
| # Simply update with new scores | |
| entry.update( | |
| { | |
| "task_id": task_id, | |
| "average_per": avg_per, | |
| "average_pwed": avg_pwed, | |
| "model": model_name, | |
| "subset": subset, | |
| "github_url": github_url, | |
| "submission_date": datetime.now().isoformat(), | |
| } | |
| ) | |
| else: | |
| leaderboard.append( | |
| { | |
| "task_id": task_id, | |
| "submission_id": str(uuid.uuid4()), | |
| "submission_name": submission_name, | |
| "model": model_name, | |
| "average_per": avg_per, | |
| "average_pwed": avg_pwed, | |
| "subset": subset, | |
| "github_url": github_url, | |
| "submission_date": datetime.now().isoformat(), | |
| } | |
| ) | |
| storage_manager.save("leaderboard", leaderboard) | |
| storage_manager.update_task(task_id, {"status": "completed"}) | |
| print("Evaluation completed successfully") | |
| except Exception as e: | |
| error_msg = f"Evaluation failed: {str(e)}" | |
| print(error_msg) | |
| storage_manager.update_task(task_id, {"status": "failed", "error": error_msg}) | |