""" ULTRA-ROBUST CALL CENTER ANALYTICS =================================== ✅ Multiple gender detection models with voting ✅ Best STT model (Whisper Large-v3 + optimizations) ✅ Enhanced for European accents ✅ Robust pitch analysis with multiple methods ✅ Production-grade accuracy MODELS USED: - STT: Whisper Large-v3 (best for accents) - Gender: 3 models + voting system - Age: Wav2Vec2 Large + validation - Diarization: pyannote 3.1 (SOTA) """ from keybert import KeyBERT from sentence_transformers import SentenceTransformer import os import sys import logging import torch import librosa import whisper import numpy as np import warnings import json import gc from collections import Counter, defaultdict from pyannote.audio import Pipeline from transformers import ( pipeline, Wav2Vec2Processor, Wav2Vec2ForSequenceClassification, AutoModelForAudioClassification, AutoFeatureExtractor ) from datetime import datetime from scipy import signal as scipy_signal from scipy.stats import mode as scipy_mode import parselmouth from parselmouth.praat import call os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" logging.getLogger("pyannote").setLevel(logging.ERROR) logging.getLogger("transformers").setLevel(logging.ERROR) warnings.filterwarnings("ignore") class NumpyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super(NumpyEncoder, self).default(obj) class UltraRobustCallAnalytics: def __init__(self, hf_token=None, device=None): # 1. DEFINE DEVICE FIRST (Move this up) self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu") print(f"🚀 Initializing ULTRA-ROBUST Analytics Engine on {self.device}...") print("="*70) # 2. NOW YOU CAN FLUSH MEMORY (Move this down) self._flush_memory() # ===== BEST STT MODEL: Whisper Large-v3 ===== try: print(" → Loading Whisper Large-v3 (BEST for accents)...") self.stt_model = whisper.load_model("large-v3", device=self.device) self.stt_model_name = "large-v3" print(" ✓ Whisper Large-v3 loaded") except: print(" ⚠ Falling back to Large-v2...") try: self.stt_model = whisper.load_model("large-v2", device=self.device) self.stt_model_name = "large-v2" print(" ✓ Whisper Large-v2 loaded") except: print(" ⚠ Final fallback to Medium...") self.stt_model = whisper.load_model("medium", device=self.device) self.stt_model_name = "medium" print(" ✓ Whisper Medium loaded") # ===== DIARIZATION ===== self.diarization_pipeline = None if hf_token: print(f" → Attempting to load Pyannote with token starting: {hf_token[:4]}...") # Universal Loader: Tries 'token' (New) then 'use_auth_token' (Old) try: # Attempt 1: New Syntax self.diarization_pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", token=hf_token ).to(torch.device(self.device)) print(" ✓ Diarization loaded (New Syntax)") except TypeError: # Attempt 2: Old Syntax (Fallback) print(" ⚠ New syntax failed, trying legacy syntax...") try: self.diarization_pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", use_auth_token=hf_token ).to(torch.device(self.device)) print(" ✓ Diarization loaded (Legacy Syntax)") except Exception as e: print(f" ❌ CRITICAL PYANNOTE ERROR (Legacy): {e}") except Exception as e: print(f" ❌ CRITICAL PYANNOTE ERROR: {e}") # ===== EMOTION CLASSIFIER ===== print(" → Loading emotion classifier...") self.emotion_classifier = pipeline( "audio-classification", model="superb/wav2vec2-base-superb-er", device=0 if self.device == "cuda" else -1 ) print(" ✓ Emotion classifier loaded") # ===== MULTIPLE GENDER MODELS FOR VOTING ===== print("\n → Loading MULTIPLE gender detection models...") self.gender_models = {} # Model 1: Age-Gender (Primary) try: print(" Loading Gender Model 1: audeering/wav2vec2-large...") self.ag_model_name = "audeering/wav2vec2-large-robust-24-ft-age-gender" self.ag_processor = Wav2Vec2Processor.from_pretrained(self.ag_model_name) self.ag_model = Wav2Vec2ForSequenceClassification.from_pretrained(self.ag_model_name) self.ag_model.to(self.device).eval() self.gender_models['audeering'] = { 'processor': self.ag_processor, 'model': self.ag_model } print(" ✓ Model 1 loaded") except Exception as e: print(f" ✗ Model 1 failed: {e}") # Model 2: Alefiury Gender Classifier try: print(" Loading Gender Model 2: alefiury/wav2vec2-large-xlsr-53-gender...") model2_name = "alefiury/wav2vec2-large-xlsr-53-gender-recognition-librispeech" processor2 = AutoFeatureExtractor.from_pretrained(model2_name) model2 = AutoModelForAudioClassification.from_pretrained(model2_name) model2.to(self.device).eval() self.gender_models['alefiury'] = { 'processor': processor2, 'model': model2 } print(" ✓ Model 2 loaded") except Exception as e: print(f" ✗ Model 2 failed: {e}") # Model 3: MIT Gender Detection try: print(" Loading Gender Model 3: MIT/ast-finetuned-speech-commands...") model3_name = "MIT/ast-finetuned-speech-commands-v2" processor3 = AutoFeatureExtractor.from_pretrained(model3_name) model3 = AutoModelForAudioClassification.from_pretrained(model3_name) model3.to(self.device).eval() self.gender_models['mit'] = { 'processor': processor3, 'model': model3 } print(" ✓ Model 3 loaded") except Exception as e: print(f" ✗ Model 3 failed: {e}") print(f" ✓ Loaded {len(self.gender_models)} gender detection models") print("\n" + "="*70) print("✅ Engine initialized successfully") print("="*70 + "\n") print(" → Loading KeyBERT for keyword extraction...") try: self.keyword_model = KeyBERT('all-MiniLM-L6-v2') print(" ✓ Keyword extractor loaded") except Exception as e: print(f" ⚠ Keyword model failed: {e}") self.keyword_model = None print(" → Loading zero-shot topic classifier...") try: self.topic_classifier = pipeline( "zero-shot-classification", model="facebook/bart-large-mnli", device=0 if self.device == "cuda" else -1 ) self.topic_labels = [ "billing_payment", "technical_support", "product_inquiry", "complaint_issue", "account_management", "sales_marketing", "service_cancellation", "feedback_survey", "appointment_scheduling", "general_inquiry" ] print(" ✓ Topic classifier loaded") except Exception as e: print(f" ⚠ Topic classifier failed: {e}") self.topic_classifier = None def process_call(self, audio_path): """Main processing with maximum robustness""" if not os.path.exists(audio_path): raise FileNotFoundError(f"Audio file not found: {audio_path}") self._flush_memory() print(f"📁 Processing: {audio_path}") print("="*70) # Load and preprocess wav, sr = librosa.load(audio_path, sr=16000, mono=True) wav = wav.astype(np.float32) # Audio enhancement for call center quality wav = self._enhance_audio_for_callcenter(wav, sr) duration = len(wav) / sr print(f" ✓ Audio loaded: {duration:.1f}s @ {sr}Hz") # Enhanced diarization print("\n → Running enhanced diarization...") segments = self._run_enhanced_diarization(wav, sr, audio_path) print(f" ✓ Found {len(set(s['speaker'] for s in segments))} speakers, {len(segments)} segments") # Smart merging merged = self._merge_segments_smart(segments, min_gap=0.25) print(f" ✓ Merged to {len(merged)} segments") # Process segments results = [] spk_audio_buffer = defaultdict(list) pad = int(0.1 * sr) # Increased padding print("\n → Transcribing with Whisper Large-v3...") for i, seg in enumerate(merged): seg_duration = seg['end'] - seg['start'] if seg_duration < 0.1: continue start_idx = max(0, int(seg['start'] * sr) - pad) end_idx = min(len(wav), int(seg['end'] * sr) + pad) chunk = wav[start_idx:end_idx] if self._is_silence(chunk): continue # Collect audio for biometrics if seg_duration > 0.4: spk_audio_buffer[seg['speaker']].append(chunk) # ENHANCED TRANSCRIPTION text = self._transcribe_chunk_robust(chunk, sr) if not text: continue emotion = self._detect_emotion(chunk) sentiment = self._map_emotion_to_sentiment(emotion) speech_rate = self._calculate_speech_rate(text, seg_duration) keywords = self._extract_keywords(text, top_n=5) topic = self._classify_topic(text) results.append({ "segment_id": i + 1, "start": float(f"{seg['start']:.2f}"), "end": float(f"{seg['end']:.2f}"), "duration": float(f"{seg_duration:.2f}"), "speaker": seg['speaker'], "role": "UNKNOWN", "text": text, "emotion": emotion, "sentiment": sentiment, # NEW "speech_rate": speech_rate, # NEW "keywords": keywords, # NEW "topic": topic, # NEW "tone": self._calculate_tone_advanced(chunk, sr, text) }) if (i + 1) % 10 == 0: print(f" Processed {i + 1}/{len(merged)} segments...") print(f" ✓ Transcribed {len(results)} segments with text") # Assign roles print("\n → Assigning speaker roles...") results = self._assign_roles_smart(results) identification = {} for r in results: identification[r['speaker']] = r['role'] print(f" ✓ Roles: {identification}") # ULTRA-ROBUST BIOMETRICS WITH VOTING print("\n → Analyzing biometrics with multi-model voting...") biometrics = self._analyze_biometrics_ultra_robust(spk_audio_buffer, results, wav, sr) for spk, bio in biometrics.items(): print(f" {spk}: {bio['gender']} (confidence: {bio['gender_confidence']:.2f}), {bio['age_bracket']}") # Customer journey print("\n → Analyzing customer journey...") cust_metrics = self._analyze_customer_journey(results) print(f" ✓ Journey: {cust_metrics['emotional_arc']}") # Agent KPI print("\n → Analyzing agent performance...") agent_metrics = self._analyze_agent_kpi(results, cust_metrics['impact_score']) print(f" ✓ Agent score: {agent_metrics.get('overall_score', 'N/A')}/100") # Compile output call_summary = self._aggregate_call_insights(results) final_output = { "metadata": { "file": os.path.basename(audio_path), "duration_seconds": float(f"{duration:.2f}"), "sample_rate": sr, "total_segments": len(results), "stt_model": self.stt_model_name, "gender_models_used": len(self.gender_models), "speakers": biometrics, "call_summary": call_summary # NEW }, "identification": identification, "agent_metrics": agent_metrics, "customer_metrics": cust_metrics, "transcript": results } self._flush_memory() print("\n" + "="*70) print("✅ Processing complete") print("="*70 + "\n") return final_output def _enhance_audio_for_callcenter(self, wav, sr): """Enhance audio quality for better transcription""" # 1. Normalize wav = wav / (np.max(np.abs(wav)) + 1e-7) # 2. High-pass filter to remove low-frequency noise try: sos = scipy_signal.butter(4, 80, 'hp', fs=sr, output='sos') wav = scipy_signal.sosfilt(sos, wav) except: pass # 3. Gentle compression to balance volume wav = np.sign(wav) * np.log1p(np.abs(wav) * 10) / np.log1p(10) return wav.astype(np.float32) def _transcribe_chunk_robust(self, chunk, sr): """ ULTRA-ROBUST TRANSCRIPTION Optimized for: - European accents - Call center quality - Background noise """ # Ensure minimum length if len(chunk) < sr * 0.3: pad = np.zeros(int(sr * 0.5), dtype=np.float32) chunk = np.concatenate([pad, chunk, pad]) try: # BEST SETTINGS FOR CALL CENTER + EUROPEAN ACCENTS result = self.stt_model.transcribe( chunk.astype(np.float32), language="en", # English only task="transcribe", # Quality settings beam_size=5, # Higher = more accurate but slower best_of=5, # Sample best of 5 runs temperature=0.0, # Deterministic # Accent handling condition_on_previous_text=True, # Use context # Noise handling compression_ratio_threshold=2.4, # More lenient logprob_threshold=-1.0, # More lenient no_speech_threshold=0.6, # Standard # Speed vs accuracy fp16=(self.device == "cuda"), # Use FP16 on GPU # Word timestamps for quality check word_timestamps=True ) text = result['text'].strip() # Quality filters if len(text) < 2: return None # Filter garbage garbage = ["you", "thank you", ".", "...", "bye", "okay"] if text.lower() in garbage: return None # Check if it's actual speech (has vowels and consonants) if not any(c in text.lower() for c in 'aeiou'): return None # Check word-level confidence if available if 'words' in result and result['words']: avg_prob = np.mean([w.get('probability', 1.0) for w in result['words']]) if avg_prob < 0.3: # Very low confidence return None return text except Exception as e: print(f" ⚠ Transcription error: {e}") return None def _analyze_biometrics_ultra_robust(self, audio_buffer, transcript, full_wav, sr): """ ULTRA-ROBUST GENDER DETECTION Uses multiple models + voting + pitch + conversation context """ profiles = {} # Collect conversation context context_gender = self._extract_gender_from_conversation(transcript) for spk, chunks in audio_buffer.items(): if not chunks: continue print(f"\n Analyzing {spk}...") # Concatenate audio (max 15 seconds from different parts) raw_audio = self._prepare_audio_for_analysis(chunks, sr) # ===== METHOD 1: ADVANCED PITCH ANALYSIS ===== pitch_gender, pitch_confidence, pitch_stats = self._analyze_pitch_robust(raw_audio, sr, full_wav, transcript, spk) print(f" Pitch analysis: {pitch_gender} (conf: {pitch_confidence:.2f})") # ===== METHOD 2: MULTI-MODEL AI VOTING ===== ai_gender, ai_confidence, all_predictions = self._multi_model_gender_detection(raw_audio, sr) print(f" AI models: {ai_gender} (conf: {ai_confidence:.2f})") print(f" Individual: {all_predictions}") # ===== METHOD 3: CONVERSATION CONTEXT ===== context_gend = context_gender.get(spk, "UNKNOWN") print(f" Context clues: {context_gend}") # ===== METHOD 4: FORMANT ANALYSIS ===== formant_gender, formant_confidence = self._analyze_formants(raw_audio, sr) print(f" Formant analysis: {formant_gender} (conf: {formant_confidence:.2f})") # ===== VOTING SYSTEM WITH CONFIDENCE WEIGHTING ===== votes = [] # Context vote (HIGHEST priority if available) if context_gend != "UNKNOWN": votes.extend([context_gend] * 4) # 4 votes for context # Pitch vote (HIGH priority) if pitch_confidence > 0.6: votes.extend([pitch_gender] * 3) # 3 votes for confident pitch elif pitch_confidence > 0.4: votes.append(pitch_gender) # 1 vote for moderate pitch # AI models vote (MEDIUM priority) if ai_confidence > 0.7: votes.extend([ai_gender] * 2) # 2 votes for confident AI elif ai_confidence > 0.5: votes.append(ai_gender) # 1 vote for moderate AI # Formant vote (MEDIUM priority) if formant_confidence > 0.6: votes.extend([formant_gender] * 2) elif formant_confidence > 0.4: votes.append(formant_gender) # Count votes if votes: vote_counts = Counter(votes) final_gender = vote_counts.most_common(1)[0][0] total_votes = len(votes) winning_votes = vote_counts[final_gender] final_confidence = winning_votes / total_votes else: # Fallback final_gender = ai_gender if ai_confidence > 0.5 else "UNKNOWN" final_confidence = ai_confidence print(f" FINAL: {final_gender} (confidence: {final_confidence:.2f})") print(f" Vote breakdown: {dict(Counter(votes))}") # ===== AGE DETECTION ===== age_bracket = self._detect_age_robust(raw_audio, sr, pitch_stats) # Get role role = [r['role'] for r in transcript if r['speaker'] == spk] role = role[0] if role else "UNKNOWN" profiles[spk] = { "gender": final_gender, "gender_confidence": round(final_confidence, 2), "gender_methods": { "context": context_gend, "pitch": f"{pitch_gender} ({pitch_confidence:.2f})", "ai_models": f"{ai_gender} ({ai_confidence:.2f})", "formants": f"{formant_gender} ({formant_confidence:.2f})", "vote_breakdown": dict(Counter(votes)) }, "age_bracket": age_bracket, "voice_stats": { "avg_pitch_hz": pitch_stats['mean'], "pitch_range": f"{pitch_stats['min']:.0f}-{pitch_stats['max']:.0f}Hz", "pitch_std": pitch_stats['std'] } } return profiles def _prepare_audio_for_analysis(self, chunks, sr, max_duration=15): """Prepare audio by taking samples from different parts""" raw = np.concatenate(chunks) # Take samples from beginning, middle, end if len(raw) > sr * max_duration: segment_len = sr * 5 # 5 seconds each total_len = len(raw) samples = [] # Beginning samples.append(raw[:segment_len]) # Middle mid_start = (total_len // 2) - (segment_len // 2) samples.append(raw[mid_start:mid_start + segment_len]) # End samples.append(raw[-segment_len:]) raw = np.concatenate(samples) # Normalize raw = raw - np.mean(raw) std = np.std(raw) if std > 1e-7: raw = raw / std return raw def _analyze_pitch_robust(self, audio, sr, full_wav, transcript, speaker): """Advanced pitch analysis using multiple methods""" # Collect all pitch values from transcript transcript_pitches = [ t['tone']['pitch_hz'] for t in transcript if t['speaker'] == speaker and t['tone']['pitch_hz'] > 60 ] # Method 1: YIN algorithm try: f0_yin = librosa.yin(audio.astype(np.float64), fmin=60, fmax=400, sr=sr) f0_yin_valid = f0_yin[f0_yin > 0] except: f0_yin_valid = [] # Method 2: PYIN (probabilistic YIN) try: f0_pyin, voiced_flag, voiced_probs = librosa.pyin( audio.astype(np.float64), fmin=60, fmax=400, sr=sr ) f0_pyin_valid = f0_pyin[~np.isnan(f0_pyin)] except: f0_pyin_valid = [] # Combine all pitch measurements all_pitches = [] if len(f0_yin_valid) > 0: all_pitches.extend(f0_yin_valid) if len(f0_pyin_valid) > 0: all_pitches.extend(f0_pyin_valid) if len(transcript_pitches) > 0: all_pitches.extend(transcript_pitches) if len(all_pitches) == 0: return "UNKNOWN", 0.0, {'mean': 0, 'std': 0, 'min': 0, 'max': 0} # Calculate statistics mean_pitch = np.mean(all_pitches) std_pitch = np.std(all_pitches) min_pitch = np.min(all_pitches) max_pitch = np.max(all_pitches) pitch_stats = { 'mean': round(mean_pitch, 1), 'std': round(std_pitch, 1), 'min': round(min_pitch, 1), 'max': round(max_pitch, 1) } # Gender classification with refined thresholds # Research-based ranges: # Male: 85-180 Hz (average ~120 Hz) # Female: 165-255 Hz (average ~210 Hz) if mean_pitch < 150: gender = "MALE" # Confidence based on how far below 150 confidence = min(1.0, (150 - mean_pitch) / 40) elif mean_pitch > 180: gender = "FEMALE" # Confidence based on how far above 180 confidence = min(1.0, (mean_pitch - 180) / 40) else: # Ambiguous range (150-180 Hz) if mean_pitch < 165: gender = "MALE" confidence = 0.5 else: gender = "FEMALE" confidence = 0.5 return gender, confidence, pitch_stats def _multi_model_gender_detection(self, audio, sr): """Run multiple AI models and aggregate predictions""" predictions = [] confidences = [] for model_name, model_dict in self.gender_models.items(): try: processor = model_dict['processor'] model = model_dict['model'] # Prepare inputs inputs = processor( audio, sampling_rate=sr, return_tensors="pt", padding=True ).to(self.device) # Predict with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits probs = torch.softmax(logits, dim=-1)[0].cpu().numpy() # Extract gender prediction labels = model.config.id2label # Find male/female labels (different models use different names) male_score = 0 female_score = 0 for idx, label in labels.items(): label_lower = label.lower() if 'male' in label_lower and 'female' not in label_lower: male_score = max(male_score, probs[idx]) elif 'female' in label_lower: female_score = max(female_score, probs[idx]) if male_score > female_score: predictions.append("MALE") confidences.append(male_score) else: predictions.append("FEMALE") confidences.append(female_score) except Exception as e: print(f" Model {model_name} error: {e}") continue if not predictions: return "UNKNOWN", 0.0, {} # Aggregate predictions pred_counter = Counter(predictions) majority_vote = pred_counter.most_common(1)[0][0] # Calculate confidence majority_indices = [i for i, p in enumerate(predictions) if p == majority_vote] avg_confidence = np.mean([confidences[i] for i in majority_indices]) # Individual predictions individual = { f"model_{i+1}": f"{pred} ({conf:.2f})" for i, (pred, conf) in enumerate(zip(predictions, confidences)) } return majority_vote, float(avg_confidence), individual def _extract_gender_from_conversation(self, transcript): """Extract gender clues from conversation""" context_map = {} # Extended keyword lists male_keywords = [ "sir", "mr.", "mister", "mr ", "gentleman", "he", "him", "his", "man", "guy", "male", "father", "dad", "son", "brother", "husband" ] female_keywords = [ "ma'am", "miss", "mrs", "mrs.", "madam", "madame", "ms", "ms.", "she", "her", "hers", "woman", "lady", "female", "mother", "mom", "daughter", "sister", "wife" ] for line in transcript: if line['role'] == "AGENT": txt = line['text'].lower() # Find who agent is talking to customers = [x['speaker'] for x in transcript if x['role'] == "CUSTOMER"] if not customers: continue target = customers[0] # Check for keywords if any(keyword in txt for keyword in male_keywords): context_map[target] = "MALE" elif any(keyword in txt for keyword in female_keywords): context_map[target] = "FEMALE" return context_map def _analyze_formants(self, audio, sr): """Analyze formant frequencies (F1, F2) for gender detection""" try: # Use Praat for formant analysis import parselmouth from parselmouth.praat import call snd = parselmouth.Sound(audio, sampling_frequency=sr) formant = snd.to_formant_burg() # Extract F1 and F2 for voiced segments f1_values = [] f2_values = [] duration = snd.get_total_duration() time_step = 0.01 # 10ms steps for t in np.arange(0, duration, time_step): f1 = formant.get_value_at_time(1, t) f2 = formant.get_value_at_time(2, t) if not np.isnan(f1) and not np.isnan(f2): f1_values.append(f1) f2_values.append(f2) if len(f1_values) < 10: return "UNKNOWN", 0.0 avg_f1 = np.mean(f1_values) avg_f2 = np.mean(f2_values) # Gender classification based on formants # Typical ranges: # Male: F1 ~120 Hz, F2 ~1200 Hz # Female: F1 ~220 Hz, F2 ~2100 Hz # Combined metric if avg_f1 < 170 and avg_f2 < 1650: gender = "MALE" confidence = 0.7 elif avg_f1 > 190 and avg_f2 > 1750: gender = "FEMALE" confidence = 0.7 else: # Use F2 as primary indicator if avg_f2 < 1600: gender = "MALE" else: gender = "FEMALE" confidence = 0.5 return gender, confidence except ImportError: return "UNKNOWN", 0.0 except Exception as e: return "UNKNOWN", 0.0 def _detect_age_robust(self, audio, sr, pitch_stats): """Robust age detection""" try: if 'audeering' not in self.gender_models: return "26-35" # Default processor = self.gender_models['audeering']['processor'] model = self.gender_models['audeering']['model'] inputs = processor(audio, sampling_rate=sr, return_tensors="pt").to(self.device) with torch.no_grad(): logits = model(**inputs).logits probs = torch.softmax(logits, dim=-1)[0].cpu().numpy() # Map labels to age buckets (aggregating across genders) # Labels usually look like: 'female_20-29', 'male_20-29', etc. labels = model.config.id2label age_scores = defaultdict(float) for i, score in enumerate(probs): label = labels[i] # Extract age part (assuming format gender_age) parts = label.split('_') if len(parts) > 1: age_group = parts[-1] # e.g., "20-29" age_scores[age_group] += score # Get best age bracket if age_scores: best_age = max(age_scores, key=age_scores.get) return best_age return "UNKNOWN" except Exception as e: print(f" ⚠ Age detection failed: {e}") return "UNKNOWN" def _run_enhanced_diarization(self, wav, sr, file_path): """ Run Pyannote diarization or fallback to simple segmentation """ if self.diarization_pipeline is None: print(" ⚠ No auth token provided, using energy-based fallback segmentation") return self._energy_based_segmentation(wav, sr) try: # Run pipeline diarization = self.diarization_pipeline(file_path, min_speakers=2, max_speakers=2) segments = [] for turn, _, speaker in diarization.itertracks(yield_label=True): segments.append({ "start": turn.start, "end": turn.end, "speaker": speaker }) return segments except Exception as e: print(f" ⚠ Diarization error: {e}, using fallback") return self._energy_based_segmentation(wav, sr) def _energy_based_segmentation(self, wav, sr): """Fallback if deep learning diarization fails""" # Simple energy detection to split speech from silence # Treating as single speaker (SPEAKER_00) intervals = librosa.effects.split(wav, top_db=30) segments = [] for start, end in intervals: segments.append({ "start": start / sr, "end": end / sr, "speaker": "SPEAKER_00" }) return segments def _merge_segments_smart(self, segments, min_gap=0.5): """Merge segments from same speaker that are close together""" if not segments: return [] merged = [] current = segments[0] for next_seg in segments[1:]: # If same speaker and gap is small if (next_seg['speaker'] == current['speaker'] and (next_seg['start'] - current['end']) < min_gap): # Extend current segment current['end'] = next_seg['end'] else: merged.append(current) current = next_seg merged.append(current) return merged def _is_silence(self, chunk, threshold=0.005): """Check if audio chunk is essentially silence""" return np.max(np.abs(chunk)) < threshold def _detect_emotion(self, chunk): """Detect emotion from audio chunk""" try: # Ensure chunk is long enough for model if len(chunk) < 16000 * 0.5: return "neutral" # Use the pipeline loaded in init # Note: Pipeline expects file path or numpy array preds = self.emotion_classifier(chunk, top_k=1) return preds[0]['label'] except: return "neutral" def _calculate_tone_advanced(self, chunk, sr, text): """ Calculate pitch, jitter, and shimmer using Parselmouth (Praat) """ try: if len(chunk) < sr * 0.1: return {"pitch_hz": 0, "jitter": 0, "shimmer": 0} snd = parselmouth.Sound(chunk, sampling_frequency=sr) # Pitch pitch = snd.to_pitch() pitch_val = pitch.selected_array['frequency'] pitch_val = pitch_val[pitch_val != 0] avg_pitch = np.mean(pitch_val) if len(pitch_val) > 0 else 0 # Pulses for Jitter/Shimmer point_process = call(snd, "To PointProcess (periodic, cc)", 75, 500) try: jitter = call(point_process, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3) except: jitter = 0 try: shimmer = call([snd, point_process], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6) except: shimmer = 0 return { "pitch_hz": round(float(avg_pitch), 1), "jitter": round(float(jitter * 100), 2), # percentage "shimmer": round(float(shimmer * 100), 2) # db } except: return {"pitch_hz": 0, "jitter": 0, "shimmer": 0} def _assign_roles_smart(self, results): """ Assign AGENT vs CUSTOMER roles using Golden Phrases and Verbosity. """ speakers = list(set(r['speaker'] for r in results)) if len(speakers) == 1: # If only one speaker found, assume it's the Agent monologuing for r in results: r['role'] = "AGENT" return results speaker_scores = defaultdict(int) word_counts = defaultdict(int) # 1. GOLDEN PHRASES (Almost 100% guarantee of Agent) # These override normal scoring golden_agent_phrases = [ "my name is", "this is steve", "this is sam", "this is mike", # Common names "calling from", "on a recorded line", "green solutions", "energy solutions", "federal government", "rebate program" ] # 2. STANDARD SCORING KEYWORDS agent_keywords = [ "manager", "supervisor", "qualified", "eligible", "whatsapp", "ping you", "verification", "consumption" ] customer_keywords = [ "who is this", "stop calling", "not interested", "take me off", "do not call", "why are you asking" ] agent_found_via_golden = None for res in results: text = res['text'].lower() spk = res['speaker'] # Count words for verbosity check words = text.split() word_counts[spk] += len(words) # Check Golden Phrases (Instant Win) if agent_found_via_golden is None: for phrase in golden_agent_phrases: if phrase in text: print(f" ★ Golden Phrase found for {spk}: '{phrase}'") agent_found_via_golden = spk break # Standard Scoring if any(k in text for k in agent_keywords): speaker_scores[spk] += 2 if any(k in text for k in customer_keywords): speaker_scores[spk] -= 3 # Strong negative for objections # 3. DECISION LOGIC final_agent = None if agent_found_via_golden: # If we found a golden phrase, trust it implicitly final_agent = agent_found_via_golden else: # Fallback: Verbosity Check (Agent usually talks more) # Get speaker with max words talkative_spk = max(word_counts, key=word_counts.get) total_words = sum(word_counts.values()) # If one speaker dominates >60% of conversation, likely the agent if word_counts[talkative_spk] / max(1, total_words) > 0.60: speaker_scores[talkative_spk] += 5 # Validating scores final_agent = max(speaker_scores, key=speaker_scores.get) # 4. ASSIGN ROLES print(f" ✓ Role Assignment: Identified {final_agent} as AGENT") identification = {} for res in results: if res['speaker'] == final_agent: res['role'] = "AGENT" else: res['role'] = "CUSTOMER" identification[res['speaker']] = res['role'] return results def _analyze_customer_journey(self, results): """Analyze sentiment flow of the customer""" cust_segments = [r for r in results if r['role'] == "CUSTOMER"] if not cust_segments: return {"emotional_arc": "No customer audio", "impact_score": 0} # Map emotions to scores emo_map = { "happy": 1.0, "joy": 1.0, "neutral": 0.1, "sad": -0.5, "angry": -1.0, "frustrated": -1.0 } start_score = sum(emo_map.get(s['emotion'], 0) for s in cust_segments[:3]) / min(3, len(cust_segments)) end_score = sum(emo_map.get(s['emotion'], 0) for s in cust_segments[-3:]) / min(3, len(cust_segments)) impact = end_score - start_score if impact > 0.2: arc = "Positive Resolution" elif impact < -0.2: arc = "Negative Escalation" else: arc = "Neutral/Unresolved" return { "emotional_arc": arc, "start_sentiment": round(start_score, 2), "end_sentiment": round(end_score, 2), "impact_score": round(impact, 2) } def _analyze_agent_kpi(self, results, customer_impact): """Calculate Agent performance metrics""" agent_segments = [r for r in results if r['role'] == "AGENT"] if not agent_segments: return {"overall_score": 0} # 1. Politeness (Keyword based) polite_words = ["please", "thank", "sorry", "apologize", "appreciate"] total_words = sum(len(s['text'].split()) for s in agent_segments) polite_count = sum(1 for s in agent_segments if any(w in s['text'].lower() for w in polite_words)) politeness_score = min(100, (polite_count / max(1, len(agent_segments))) * 200) # 2. Tone Consistency (Jitter/Shimmer variance) jitter_vals = [s['tone']['jitter'] for s in agent_segments] tone_stability = 100 - min(100, np.std(jitter_vals) * 10) if jitter_vals else 50 # 3. Resolution Impact (from customer journey) # Map -1.0 to 1.0 range -> 0 to 100 resolution_score = 50 + (customer_impact * 50) resolution_score = max(0, min(100, resolution_score)) # Overall Weighted Score overall = ( (politeness_score * 0.3) + (tone_stability * 0.2) + (resolution_score * 0.5) ) return { "overall_score": int(overall), "politeness": int(politeness_score), "tone_stability": int(tone_stability), "resolution_effectiveness": int(resolution_score) } def _flush_memory(self): """Aggressive memory cleanup""" gc.collect() if self.device == "cuda": torch.cuda.empty_cache() def _map_emotion_to_sentiment(self, emotion): """Map emotion labels to sentiment with polarity score""" emotion_lower = emotion.lower() positive_emotions = { 'happy': 0.8, 'joy': 0.9, 'excited': 0.85, 'pleased': 0.7, 'satisfied': 0.75, 'content': 0.6 } negative_emotions = { 'sad': -0.6, 'angry': -0.9, 'frustrated': -0.8, 'annoyed': -0.7, 'disappointed': -0.65, 'upset': -0.75 } if emotion_lower in positive_emotions: return { "sentiment": "positive", "polarity_score": positive_emotions[emotion_lower], "confidence": "high" } if emotion_lower in negative_emotions: return { "sentiment": "negative", "polarity_score": negative_emotions[emotion_lower], "confidence": "high" } return { "sentiment": "neutral", "polarity_score": 0.0, "confidence": "medium" } def _calculate_speech_rate(self, text, duration_seconds): """Calculate words per minute (WPM) and classify pace""" if duration_seconds < 0.1: return {"wpm": 0, "word_count": 0, "speech_pace": "unknown"} words = text.split() word_count = len(words) wpm = (word_count / (duration_seconds / 60.0)) if duration_seconds > 0 else 0 if wpm < 100: pace = "slow" elif wpm < 140: pace = "normal" elif wpm < 180: pace = "fast" else: pace = "very_fast" return { "wpm": round(wpm, 1), "word_count": word_count, "speech_pace": pace } def _extract_keywords(self, text, top_n=5): """Extract keywords/keyphrases using KeyBERT""" if self.keyword_model is None or len(text.split()) < 3: return [] try: keywords = self.keyword_model.extract_keywords( text, keyphrase_ngram_range=(1, 2), stop_words='english', top_n=top_n, use_maxsum=True, nr_candidates=20 ) return [ {"keyword": kw[0], "relevance": round(float(kw[1]), 3)} for kw in keywords ] except: return [] def _classify_topic(self, text): """Classify text into call center topics""" if self.topic_classifier is None or len(text.split()) < 5: return {"topic": "unknown", "confidence": 0.0} try: result = self.topic_classifier(text, self.topic_labels, multi_label=False) return { "topic": result['labels'][0], "confidence": round(float(result['scores'][0]), 3), "top_3_topics": [ {"topic": label, "score": round(float(score), 3)} for label, score in zip(result['labels'][:3], result['scores'][:3]) ] } except: return {"topic": "unknown", "confidence": 0.0} def _aggregate_call_insights(self, results): """Aggregate keywords and topics at call level""" if not results: return {"top_keywords": [], "primary_topic": {"topic": "unknown"}} all_keywords = {} for seg in results: if 'keywords' in seg: for kw in seg['keywords']: keyword = kw['keyword'] score = kw['relevance'] all_keywords[keyword] = max(all_keywords.get(keyword, 0), score) top_keywords = [ {"keyword": k, "relevance": round(v, 3)} for k, v in sorted(all_keywords.items(), key=lambda x: x[1], reverse=True)[:10] ] # Aggregate topics topic_votes = defaultdict(float) for seg in results: if 'topic' in seg and seg['topic']['confidence'] > 0.5: topic_votes[seg['topic']['topic']] += seg['topic']['confidence'] primary_topic = { "topic": max(topic_votes, key=topic_votes.get) if topic_votes else "unknown", "confidence": round(topic_votes[max(topic_votes, key=topic_votes.get)] / len(results), 3) if topic_votes else 0.0 } # Calculate stats total_words = sum(seg.get('speech_rate', {}).get('word_count', 0) for seg in results) wpm_values = [seg.get('speech_rate', {}).get('wpm', 0) for seg in results if seg.get('speech_rate', {}).get('wpm', 0) > 0] average_wpm = round(np.mean(wpm_values), 1) if wpm_values else 0 return { "top_keywords": top_keywords, "primary_topic": primary_topic, "total_words": total_words, "average_wpm": average_wpm } if __name__ == "__main__": # Example usage print("Initialize with: analyzer = UltraRobustCallAnalytics(hf_token='YOUR_TOKEN')") print("Process with: result = analyzer.process_call('path/to/audio.wav')")