import os import sys import logging import torch import librosa import whisper import numpy as np import warnings import json import gc from collections import Counter from pyannote.audio import Pipeline from transformers import pipeline, Wav2Vec2Processor, Wav2Vec2ForSequenceClassification from datetime import datetime os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" logging.getLogger("pyannote").setLevel(logging.ERROR) logging.getLogger("transformers").setLevel(logging.ERROR) warnings.filterwarnings("ignore") class NumpyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super(NumpyEncoder, self).default(obj) class PlatinumAnalyticsPipeline: def __init__(self, hf_token=None, device=None): self._flush_memory() self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu") print(f"Initializing Engine on {self.device}...") try: self.stt_model = whisper.load_model("large-v3", device=self.device) except: print("VRAM fallback: Using 'medium.en'") self.stt_model = whisper.load_model("medium.en", device=self.device) self.diarization_pipeline = None if hf_token: try: self.diarization_pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", token=hf_token ).to(torch.device(self.device)) except: pass self.emotion_classifier = pipeline( "audio-classification", model="superb/wav2vec2-base-superb-er", device=0 if self.device == "cuda" else -1 ) self.ag_model_name = "audeering/wav2vec2-large-robust-24-ft-age-gender" self.ag_processor = Wav2Vec2Processor.from_pretrained(self.ag_model_name) self.ag_model = Wav2Vec2ForSequenceClassification.from_pretrained(self.ag_model_name) self.ag_model.to(self.device) self.ag_model.eval() def process_call(self, audio_path): if not os.path.exists(audio_path): raise FileNotFoundError("File missing") self._flush_memory() wav, sr = librosa.load(audio_path, sr=16000, mono=True) wav = wav.astype(np.float32) segments = self._run_diarization(wav, sr) merged = self._merge_segments(segments) results = [] spk_buffer = {} pad = int(0.05 * sr) for i, seg in enumerate(merged): duration = seg['end'] - seg['start'] if duration < 0.1: continue start = max(0, int(seg['start'] * sr) - pad) end = min(len(wav), int(seg['end'] * sr) + pad) chunk = wav[start:end] if self._is_silence(chunk): continue if duration > 1.0: if seg['speaker'] not in spk_buffer: spk_buffer[seg['speaker']] = [] spk_buffer[seg['speaker']].append(chunk) text = self._transcribe_chunk(chunk, sr) if not text: continue results.append({ "segment_id": i + 1, "start": float(f"{seg['start']:.2f}"), "end": float(f"{seg['end']:.2f}"), "speaker": seg['speaker'], "role": "UNKNOWN", "emotion": self._detect_emotion(chunk), "text": text, "tone": self._calculate_tone_physics(chunk, sr, text) }) results = self._assign_roles(results) identification = {} for r in results: identification[r['speaker']] = r['role'] biometrics = self._analyze_biometrics_voting(spk_buffer, results) cust_metrics = self._analyze_customer_journey(results) agent_metrics = self._analyze_agent_kpi(results, cust_metrics['impact_score']) final_output = { "metadata": biometrics, "identification": identification, "agent_metrics": agent_metrics, "customer_metrics": cust_metrics, "transcript": results } self._flush_memory() return final_output def _analyze_biometrics_voting(self, buffer, transcript): profiles = {} context_map = {} for line in transcript: if line['role'] == "AGENT": txt = line['text'].lower() target_list = [x['speaker'] for x in transcript if x['role'] == "CUSTOMER"] if not target_list: continue target = target_list[0] if any(w in txt for w in ["sir", "mr.", "mister", "man"]): context_map[target] = "MALE" if any(w in txt for w in ["ma'am", "miss", "mrs", "madam"]): context_map[target] = "FEMALE" for spk, chunks in buffer.items(): if not chunks: continue pitches = [t['tone']['pitch_hz'] for t in transcript if t['speaker'] == spk and t['tone']['pitch_hz'] > 60] avg_pitch = float(np.mean(pitches)) if pitches else 0.0 raw = np.concatenate(chunks) if len(raw) > 16000 * 10: raw = raw[:16000*10] norm = (raw - np.mean(raw)) / (np.std(raw) + 1e-7) ai_gender = "UNKNOWN" age_bracket = "26-35" try: inputs = self.ag_processor(norm, sampling_rate=16000, return_tensors="pt").to(self.device) with torch.no_grad(): logits = self.ag_model(**inputs).logits probs = torch.softmax(logits, dim=-1)[0].cpu().numpy() scores = {self.ag_model.config.id2label[i]: float(probs[i]) for i in range(len(probs))} ai_gender = "MALE" if scores.get('male', 0) > scores.get('female', 0) else "FEMALE" s_ch = scores.get('child',0) + scores.get('teen',0) s_sn = scores.get('senior',0) if s_ch > 0.35: age_bracket = "18-25" elif s_sn > 0.40: age_bracket = "56+" elif s_sn > 0.15: age_bracket = "46-55" else: age_bracket = "26-35" except: pass final_gender = ai_gender if spk in context_map: final_gender = context_map[spk] elif avg_pitch > 0 and avg_pitch < 155 and ai_gender == "FEMALE": final_gender = "MALE" role = [r['role'] for r in transcript if r['speaker'] == spk][0] if role == "AGENT" and age_bracket == "18-25": age_bracket = "18-25 (Young Adult)" profiles[spk] = { "gender": final_gender, "age_bracket": age_bracket, "debug_pitch": int(avg_pitch) } return profiles def _analyze_customer_journey(self, results): cust_segs = [r for r in results if r['role'] == "CUSTOMER"] if not cust_segs: return {"initial_emotion": "N/A", "final_emotion": "N/A", "impact_score": 0, "impact_label": "N/A"} def get_mode(segs): return Counter([s['emotion'] for s in segs]).most_common(1)[0][0] initial = get_mode(cust_segs[:5]) final = get_mode(cust_segs[-5:]) val_map = {"HAPPY": 2, "NEUTRAL": 1, "SAD": 0, "ANGRY": 0} score_diff = val_map.get(final, 1) - val_map.get(initial, 1) impact_label = "STANDARD" if score_diff > 0: impact_label = "POSITIVE UPLIFT" elif score_diff < 0: impact_label = "NEGATIVE CHURN RISK" return { "initial_emotion": initial, "final_emotion": final, "impact_score": score_diff, "impact_label": impact_label } def _analyze_agent_kpi(self, results, impact_bonus): segs = [r for r in results if r['role'] == "AGENT"] if not segs: return {} wpm = float(np.mean([s['tone']['wpm'] for s in segs])) vol = float(np.mean([s['tone']['volume'] for s in segs])) pitch = float(np.mean([s['tone']['pitch_hz'] for s in segs if s['tone']['pitch_hz'] > 0])) score = 60 if 130 <= wpm <= 165: score += 15 elif wpm > 185 or wpm < 110: score -= 10 pitch_std = float(np.std([s['tone']['pitch_hz'] for s in segs if s['tone']['pitch_hz'] > 0])) if pitch_std > 20: score += 10 emotions = [s['emotion'] for s in segs] neg_ratio = (emotions.count("ANGRY") + emotions.count("SAD")) / len(segs) sentiment = "NEUTRAL" if neg_ratio > 0.1: sentiment = "NEGATIVE" score -= 20 elif emotions.count("HAPPY") > len(segs) * 0.2: sentiment = "POSITIVE" score += 10 engagement = "NORMAL" if score > 75: engagement = "HIGH" if score < 50: engagement = "LOW/WITHDRAWN" score += (impact_bonus * 15) score = max(0, min(100, int(score))) return { "overall_score": score, "overall_emotion": Counter(emotions).most_common(1)[0][0], "sentiment_level": sentiment, "engagement_level": engagement, "avg_pace_wpm": round(wpm, 1), "avg_pitch_hz": round(pitch, 1), "avg_volume": round(vol, 1) } def _is_silence(self, chunk): return np.sqrt(np.mean(chunk**2)) < 0.003 def _transcribe_chunk(self, chunk, sr): if len(chunk) < sr: pad = np.zeros(int(sr*0.5), dtype=np.float32) chunk = np.concatenate([pad, chunk, pad]) try: res = self.stt_model.transcribe(chunk.astype(np.float32), language="en", beam_size=5, temperature=0.0) text = res['text'].strip() if len(text) < 2 or text.lower() in ["you", "bye."]: return None return text except: return None def _detect_emotion(self, chunk): try: emotions = self.emotion_classifier(chunk.astype(np.float32), top_k=None) scores = {e['label']: e['score'] for e in emotions} if scores.get('ang', 0) > 0.25: return "ANGRY" if scores.get('hap', 0) > 0.40: return "HAPPY" if scores.get('sad', 0) > 0.40: return "SAD" return "NEUTRAL" except: return "NEUTRAL" def _calculate_tone_physics(self, chunk, sr, text): rms = float(np.mean(librosa.feature.rms(y=chunk))) * 1000 f0 = librosa.yin(chunk.astype(np.float64), fmin=60, fmax=400) f0 = f0[f0 > 0] pitch = float(np.mean(f0)) if len(f0) > 0 else 0.0 wpm = int(len(text.split()) / ((len(chunk)/sr)/60)) return {"pitch_hz": round(pitch, 1), "volume": round(rms, 2), "wpm": wpm} def _run_diarization(self, wav, sr): segments = [] if not self.diarization_pipeline: return [{"start":0,"end":len(wav)/sr,"speaker":"SPEAKER_00"}] try: tensor = torch.from_numpy(wav).float().unsqueeze(0) output = self.diarization_pipeline({"waveform": tensor, "sample_rate": sr}) dia = output.speaker_diarization if hasattr(output, "speaker_diarization") else output.annotation for t, _, s in dia.itertracks(yield_label=True): segments.append({"start":t.start,"end":t.end,"speaker":s}) except: segments = [{"start":0,"end":len(wav)/sr,"speaker":"SPEAKER_00"}] return sorted(segments, key=lambda x: x['start']) if segments else [{"start":0,"end":len(wav)/sr,"speaker":"SPEAKER_00"}] def _merge_segments(self, segments): if not segments: return [] merged = [segments[0]] for curr in segments[1:]: if curr['speaker'] == merged[-1]['speaker'] and (curr['start'] - merged[-1]['end'] < 1.0): merged[-1]['end'] = curr['end'] else: merged.append(curr) return merged def _assign_roles(self, results): if not results: return results counts = Counter([r['speaker'] for r in results for _ in r['text'].split()]) if not counts: return results agent = counts.most_common(1)[0][0] for r in results: r['role'] = "AGENT" if r['speaker'] == agent else "CUSTOMER" return results def _flush_memory(self): gc.collect() torch.cuda.empty_cache() def save_json(self, data, base): fn = f"{base}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(fn, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False, cls=NumpyEncoder) return fn