""" Leaderboard Module ================== Evaluate tokenizers on real HuggingFace Arabic datasets """ import json import os import statistics from typing import Dict, List, Tuple, Optional from collections import defaultdict import gradio as gr from datasets import load_dataset from transformers import AutoTokenizer from config import LEADERBOARD_DATASETS from tokenizer_manager import tokenizer_manager # File path for persistent storage of submitted tokenizers SUBMISSIONS_FILE = os.path.join(os.path.dirname(__file__), "submissions.json") # File path for cached leaderboard results LEADERBOARD_CACHE_FILE = os.path.join(os.path.dirname(__file__), "leaderboard_cache.json") def load_submitted_tokenizers() -> Dict[str, Dict]: """Load submitted tokenizers from persistent storage""" if os.path.exists(SUBMISSIONS_FILE): try: with open(SUBMISSIONS_FILE, 'r', encoding='utf-8') as f: return json.load(f) except (json.JSONDecodeError, IOError): return {} return {} def save_submitted_tokenizer(model_id: str, data: Dict) -> None: """Save a submitted tokenizer to persistent storage""" submissions = load_submitted_tokenizers() submissions[model_id] = data try: with open(SUBMISSIONS_FILE, 'w', encoding='utf-8') as f: json.dump(submissions, f, indent=2, ensure_ascii=False) except IOError as e: print(f"Warning: Could not save submission: {e}") def load_leaderboard_cache() -> Optional[Dict]: """Load cached leaderboard results""" if os.path.exists(LEADERBOARD_CACHE_FILE): try: with open(LEADERBOARD_CACHE_FILE, 'r', encoding='utf-8') as f: return json.load(f) except (json.JSONDecodeError, IOError): return None return None def save_leaderboard_cache(leaderboard_html: str, per_dataset_html: str, status: str) -> None: """Save leaderboard results to cache""" cache_data = { "leaderboard_html": leaderboard_html, "per_dataset_html": per_dataset_html, "status": status } try: with open(LEADERBOARD_CACHE_FILE, 'w', encoding='utf-8') as f: json.dump(cache_data, f, ensure_ascii=False) except IOError as e: print(f"Warning: Could not save leaderboard cache: {e}") class HFDatasetLoader: """Load Arabic datasets from HuggingFace""" def __init__(self): self.cache = {} def load_dataset_texts(self, dataset_key: str) -> Tuple[List[str], str]: """Load texts from a HuggingFace dataset""" if dataset_key in self.cache: return self.cache[dataset_key], f"āœ… Loaded {len(self.cache[dataset_key])} samples (cached)" config = LEADERBOARD_DATASETS.get(dataset_key) if not config: return [], f"āŒ Unknown dataset: {dataset_key}" try: # Load dataset from HuggingFace if config.get("subset"): ds = load_dataset( config["hf_id"], config["subset"], split=config["split"], trust_remote_code=True ) else: ds = load_dataset( config["hf_id"], split=config["split"], trust_remote_code=True ) texts = [] text_col = config["text_column"] # Try to find text column if text_col not in ds.column_names: for col in ["text", "content", "sentence", "arabic", "context", "Tweet", "question", "poem_text", "hadith_text_ar"]: if col in ds.column_names: text_col = col break # Extract texts max_samples = config.get("samples", 500) for i, item in enumerate(ds): if i >= max_samples: break text = item.get(text_col, "") if text and isinstance(text, str) and len(text.strip()) > 10: texts.append(text.strip()) self.cache[dataset_key] = texts return texts, f"āœ… Loaded {len(texts)} samples from HuggingFace" except Exception as e: return [], f"āŒ Error loading {config['hf_id']}: {str(e)[:80]}" def evaluate_tokenizer_on_texts(tokenizer, texts: List[str]) -> Optional[Dict]: """Evaluate a tokenizer on a list of texts""" fertilities = [] compressions = [] unk_counts = 0 total_tokens = 0 for text in texts: try: tokens = tokenizer.encode(text, add_special_tokens=False) decoded = tokenizer.convert_ids_to_tokens(tokens) num_tokens = len(tokens) num_words = len(text.split()) or 1 num_bytes = len(text.encode('utf-8')) fertility = num_tokens / num_words compression = num_bytes / num_tokens if num_tokens > 0 else 0 # Count UNKs unk_token = getattr(tokenizer, 'unk_token', '[UNK]') unks = sum(1 for t in decoded if t and (t == unk_token or '' in str(t).lower() or '[unk]' in str(t).lower())) fertilities.append(fertility) compressions.append(compression) unk_counts += unks total_tokens += num_tokens except Exception: continue if not fertilities: return None return { "avg_fertility": statistics.mean(fertilities), "std_fertility": statistics.stdev(fertilities) if len(fertilities) > 1 else 0, "avg_compression": statistics.mean(compressions), "unk_ratio": unk_counts / total_tokens if total_tokens > 0 else 0, "samples": len(fertilities) } def calculate_leaderboard_score(fertility: float, compression: float, unk_ratio: float) -> float: """Calculate overall score (0-100, higher is better)""" # Lower fertility is better (ideal ~1.0 for Arabic) fertility_score = max(0, min(1, 2.0 / fertility)) if fertility > 0 else 0 # Higher compression is better compression_score = min(1, compression / 6) # Lower UNK is better unk_score = 1 - min(1, unk_ratio * 20) # Weighted combination score = (fertility_score * 0.45 + compression_score * 0.35 + unk_score * 0.20) * 100 return round(score, 1) def get_cached_leaderboard(progress=gr.Progress()) -> Tuple[str, str, str]: """ Get leaderboard results from cache if available. If no cache exists, shows a message to run evaluation. Returns: (leaderboard_html, per_dataset_html, status_message) """ cache = load_leaderboard_cache() if cache: # Also include any new submissions that were added after the cache return ( cache.get("leaderboard_html", ""), cache.get("per_dataset_html", ""), cache.get("status", "") + "\n\nšŸ“¦ *Loaded from cache. Click 'Re-evaluate All' to refresh.*" ) # No cache exists - show message to run evaluation no_data_html = """

šŸ“Š No evaluation data available yet.

Click "Re-evaluate All" button above to run the full evaluation.

This will evaluate all tokenizers on all 8 Arabic datasets (~5-10 minutes).

""" return ( no_data_html, no_data_html, "āš ļø **No cached results found.** Click 'Re-evaluate All' to run the evaluation." ) def run_leaderboard_evaluation( progress=gr.Progress() ) -> Tuple[str, str, str]: """ Run the full leaderboard evaluation with real HF datasets Evaluates ALL tokenizers on ALL datasets Returns: (leaderboard_html, per_dataset_html, status_message) """ # Use ALL datasets selected_datasets = list(LEADERBOARD_DATASETS.keys()) # Use ALL available tokenizers selected_tokenizers = tokenizer_manager.get_tokenizer_choices() loader = HFDatasetLoader() results = defaultdict(dict) # Status tracking status_lines = [] # Load datasets from HuggingFace status_lines.append("šŸ“š **Loading Datasets from HuggingFace:**\n") loaded_datasets = {} for i, ds_key in enumerate(selected_datasets): progress((i + 1) / len(selected_datasets) * 0.3, f"Loading {ds_key}...") texts, msg = loader.load_dataset_texts(ds_key) ds_name = LEADERBOARD_DATASETS[ds_key]["name"] status_lines.append(f" • {ds_name}: {msg}") if texts: loaded_datasets[ds_key] = texts if not loaded_datasets: return "", "", "\n".join(status_lines) + "\n\nāŒ No datasets loaded successfully" # Evaluate tokenizers status_lines.append("\nšŸ”„ **Evaluating Tokenizers:**\n") tokenizer_cache = {} total_steps = len(selected_tokenizers) * len(loaded_datasets) current_step = 0 for tok_choice in selected_tokenizers: # Get model ID from choice tok_id = tokenizer_manager.get_model_id_from_choice(tok_choice) tok_info = tokenizer_manager.get_available_tokenizers().get(tok_id) tok_name = tok_info.name if tok_info else tok_choice # Load tokenizer try: if tok_id not in tokenizer_cache: tokenizer_cache[tok_id] = AutoTokenizer.from_pretrained( tok_id, trust_remote_code=True ) tokenizer = tokenizer_cache[tok_id] status_lines.append(f" • {tok_name}: āœ… Loaded") except Exception as e: status_lines.append(f" • {tok_name}: āŒ Failed ({str(e)[:30]})") continue # Evaluate on each dataset for ds_key, texts in loaded_datasets.items(): current_step += 1 progress(0.3 + (current_step / total_steps) * 0.6, f"Evaluating {tok_name} on {ds_key}...") metrics = evaluate_tokenizer_on_texts(tokenizer, texts) if metrics: results[tok_choice][ds_key] = metrics # Generate leaderboard progress(0.95, "Generating leaderboard...") leaderboard_data = [] per_dataset_data = [] for tok_choice, ds_results in results.items(): if not ds_results: continue tok_id = tokenizer_manager.get_model_id_from_choice(tok_choice) tok_info = tokenizer_manager.get_available_tokenizers().get(tok_id) # Aggregate across datasets all_fertility = [m["avg_fertility"] for m in ds_results.values()] all_compression = [m["avg_compression"] for m in ds_results.values()] all_unk = [m["unk_ratio"] for m in ds_results.values()] avg_fertility = statistics.mean(all_fertility) avg_compression = statistics.mean(all_compression) avg_unk = statistics.mean(all_unk) score = calculate_leaderboard_score(avg_fertility, avg_compression, avg_unk) leaderboard_data.append({ "name": tok_info.name if tok_info else tok_choice, "type": tok_info.type.value if tok_info else "Unknown", "org": tok_info.organization if tok_info else "Unknown", "score": score, "fertility": avg_fertility, "compression": avg_compression, "unk_ratio": avg_unk, "num_datasets": len(ds_results) }) # Per-dataset row per_ds_row = {"Tokenizer": tok_info.name if tok_info else tok_choice} for ds_key in selected_datasets: ds_name = LEADERBOARD_DATASETS[ds_key]["name"] if ds_key in ds_results: per_ds_row[ds_name] = round(ds_results[ds_key]["avg_fertility"], 2) else: per_ds_row[ds_name] = "-" per_dataset_data.append(per_ds_row) # Add submitted tokenizers to the leaderboard submitted = load_submitted_tokenizers() for model_id, sub_data in submitted.items(): # Check if already in leaderboard (avoid duplicates) if any(d["name"] == sub_data["name"] for d in leaderboard_data): continue leaderboard_data.append({ "name": sub_data["name"], "type": sub_data.get("type", "Custom"), "org": sub_data.get("org", "Community"), "score": sub_data["score"], "fertility": sub_data["fertility"], "compression": sub_data["compression"], "unk_ratio": sub_data["unk_ratio"], "num_datasets": len(sub_data.get("per_dataset", {})) }) # Add per-dataset row for submitted tokenizer per_ds_row = {"Tokenizer": sub_data["name"]} per_dataset_results = sub_data.get("per_dataset", {}) for ds_key in selected_datasets: ds_name = LEADERBOARD_DATASETS[ds_key]["name"] if ds_key in per_dataset_results: per_ds_row[ds_name] = round(per_dataset_results[ds_key]["avg_fertility"], 2) else: per_ds_row[ds_name] = "-" per_dataset_data.append(per_ds_row) # Sort by score leaderboard_data.sort(key=lambda x: x["score"], reverse=True) # Create HTML tables leaderboard_html = generate_leaderboard_html(leaderboard_data) per_dataset_html = generate_per_dataset_html(per_dataset_data, selected_datasets) status_lines.append(f"\nāœ… **Evaluation Complete!** Evaluated {len(results)} tokenizers on {len(loaded_datasets)} datasets.") status_message = "\n".join(status_lines) # Save results to cache save_leaderboard_cache(leaderboard_html, per_dataset_html, status_message) return leaderboard_html, per_dataset_html, status_message def generate_leaderboard_html(data: List[Dict]) -> str: """Generate HTML for main leaderboard - dark theme design""" if not data: return "

No results to display

" html = """ """ for i, entry in enumerate(data): rank = i + 1 rank_class = f"rank-{rank}" if rank <= 3 else "" # Medal for top 3 if rank == 1: rank_display = 'šŸ„‡ 1' elif rank == 2: rank_display = '🄈 2' elif rank == 3: rank_display = 'šŸ„‰ 3' else: rank_display = f"#{rank}" fert_class = "metric-good" if entry["fertility"] < 2.0 else "metric-bad" if entry["fertility"] > 3.0 else "" comp_class = "metric-good" if entry["compression"] > 3.5 else "" unk_class = "metric-good" if entry["unk_ratio"] < 0.01 else "metric-bad" if entry["unk_ratio"] > 0.05 else "" html += f""" """ html += """
Rank Tokenizer Type Organization Score Fertility Compression UNK Rate Datasets
{rank_display} {entry["name"]} {entry["type"]} {entry["org"]} {entry["score"]} {entry["fertility"]:.3f} {entry["compression"]:.2f} {entry["unk_ratio"]:.2%} {entry["num_datasets"]}
Metrics: Score (0-100, higher=better) • Fertility (tokens/word, lower=better) • Compression (bytes/token, higher=better) • UNK Rate (lower=better)
""" return html def generate_per_dataset_html(data: List[Dict], dataset_keys: List[str]) -> str: """Generate HTML for per-dataset fertility table - dark theme design""" if not data: return "

No per-dataset results

" ds_names = [LEADERBOARD_DATASETS[k]["name"] for k in dataset_keys] html = """ """ for ds_name in ds_names: html += f"" html += """ """ for row in data: html += f"" for ds_name in ds_names: val = row.get(ds_name, "-") if val != "-": if val < 1.8: cls = "fert-excellent" elif val < 2.5: cls = "fert-good" else: cls = "fert-poor" html += f'' else: html += '' html += "" html += """
Tokenizer{ds_name}
{row['Tokenizer']}{val}-
""" return html def evaluate_submitted_tokenizer( model_id: str, model_name: str, organization: str, model_type: str, progress=gr.Progress() ) -> Tuple[str, str]: """ Evaluate a user-submitted tokenizer on ALL datasets Returns: (result_html, status_message) """ if not model_id or not model_id.strip(): return "", "āŒ Please enter a HuggingFace model ID (e.g., 'google/gemma-2-9b')" model_id = model_id.strip() # Use ALL datasets selected_datasets = list(LEADERBOARD_DATASETS.keys()) # Try to load the tokenizer progress(0.1, f"Loading tokenizer: {model_id}...") try: tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) except Exception as e: return "", f"āŒ Failed to load tokenizer '{model_id}': {str(e)[:100]}" # Get tokenizer info vocab_size = tokenizer.vocab_size if hasattr(tokenizer, 'vocab_size') else len(tokenizer) display_name = model_name.strip() if model_name and model_name.strip() else model_id.split('/')[-1] org = organization.strip() if organization and organization.strip() else (model_id.split('/')[0] if '/' in model_id else "Unknown") progress(0.2, "Loading datasets...") # Load datasets loader = HFDatasetLoader() loaded_datasets = {} for ds_key in selected_datasets: texts, _ = loader.load_dataset_texts(ds_key) if texts: loaded_datasets[ds_key] = texts if not loaded_datasets: return "", "āŒ Failed to load any datasets for evaluation" # Evaluate progress(0.4, "Evaluating tokenizer...") all_fertility = [] all_compression = [] all_unk = [] per_dataset_results = {} for ds_key, texts in loaded_datasets.items(): progress(0.4 + (len(per_dataset_results) / len(loaded_datasets)) * 0.4, f"Evaluating on {LEADERBOARD_DATASETS[ds_key]['name']}...") metrics = evaluate_tokenizer_on_texts(tokenizer, texts) if metrics: per_dataset_results[ds_key] = metrics all_fertility.append(metrics["avg_fertility"]) all_compression.append(metrics["avg_compression"]) all_unk.append(metrics["unk_ratio"]) if not all_fertility: return "", "āŒ Evaluation failed - no valid results" # Calculate overall metrics avg_fertility = statistics.mean(all_fertility) avg_compression = statistics.mean(all_compression) avg_unk = statistics.mean(all_unk) score = calculate_leaderboard_score(avg_fertility, avg_compression, avg_unk) progress(0.9, "Saving results...") # Save submission to persistent storage submission_data = { "name": display_name, "org": org, "type": model_type or "Custom", "vocab_size": vocab_size, "score": score, "fertility": avg_fertility, "compression": avg_compression, "unk_ratio": avg_unk, "per_dataset": per_dataset_results } save_submitted_tokenizer(model_id, submission_data) # Generate result HTML result_html = generate_submission_result_html( display_name, org, model_type, vocab_size, score, avg_fertility, avg_compression, avg_unk, per_dataset_results, selected_datasets ) status = f"āœ… **{display_name}** has been evaluated on {len(loaded_datasets)} datasets and added to the leaderboard! Refresh the Leaderboard tab to see the updated rankings." return result_html, status def generate_submission_result_html( name: str, org: str, model_type: str, vocab_size: int, score: float, fertility: float, compression: float, unk_ratio: float, per_dataset: Dict, dataset_keys: List[str] ) -> str: """Generate HTML for submission results - dark theme design""" # Determine score quality if score >= 70: score_color = "#10b981" score_label = "Excellent" elif score >= 50: score_color = "#4a90d9" score_label = "Good" elif score >= 30: score_color = "#f59e0b" score_label = "Fair" else: score_color = "#f87171" score_label = "Needs Improvement" html = f"""

šŸ“Š Evaluation Results

{name} by {org}

{score}
Overall Score
{score_label}
{fertility:.3f}
Fertility
tokens/word
{compression:.2f}
Compression
bytes/token
{unk_ratio:.2%}
UNK Rate
unknown tokens

Model Details

Type: {model_type or 'Custom'}
Vocab Size: {vocab_size:,}

šŸ“ˆ Per-Dataset Results

""" for ds_key in dataset_keys: if ds_key in per_dataset: m = per_dataset[ds_key] ds_name = LEADERBOARD_DATASETS[ds_key]["name"] fert_val = m["avg_fertility"] if fert_val < 1.8: fert_style = "background: rgba(16, 185, 129, 0.25); color: #34d399;" elif fert_val < 2.5: fert_style = "background: rgba(245, 158, 11, 0.25); color: #fbbf24;" else: fert_style = "background: rgba(248, 113, 113, 0.25); color: #f87171;" html += f""" """ html += """
Dataset Fertility Compression Samples
{ds_name} {fert_val:.3f} {m["avg_compression"]:.2f} {m["samples"]}
""" return html