import os import json import math import torch import pandas as pd import torch.nn as nn import torch.nn.functional as F from datasets import Dataset import transformers from transformers import AutoModelForCausalLM, DataCollatorForLanguageModeling, Trainer, TrainingArguments from peft import LoraConfig, get_peft_model from sentence_transformers import SentenceTransformer, util # ----------------------------- # ENVIRONMENT / CACHE # ----------------------------- os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface_cache" os.environ["HF_HOME"] = "/tmp/huggingface_cache" os.environ["HF_DATASETS_CACHE"] = "/tmp/huggingface_cache" os.environ["HF_METRICS_CACHE"] = "/tmp/huggingface_cache" os.environ["WANDB_MODE"] = "disabled" # ----------------------------- # SETTINGS # ----------------------------- device = "cuda" if torch.cuda.is_available() else "cpu" tokenizer = transformers.AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0") embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") model = None # ----------------------------- # LoRA / MoE Modules # ----------------------------- class LoraLinear(nn.Module): def __init__(self, in_features, out_features, r=8, lora_alpha=16, lora_dropout=0.05, bias=False): super().__init__() self.in_features = in_features self.out_features = out_features self.r = r self.scaling = lora_alpha / r if r > 0 else 1.0 self.weight = nn.Parameter(torch.empty(out_features, in_features), requires_grad=False) self.bias = nn.Parameter(torch.zeros(out_features), requires_grad=False) if bias else None if r > 0: self.lora_A = nn.Parameter(torch.zeros((r, in_features))) self.lora_B = nn.Parameter(torch.zeros((out_features, r))) nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5)) nn.init.zeros_(self.lora_B) self.lora_dropout = nn.Dropout(p=lora_dropout) else: self.lora_A, self.lora_B, self.lora_dropout = None, None, None def forward(self, x): result = F.linear(x, self.weight, self.bias) if self.r > 0: lora_out = self.lora_dropout(x) @ self.lora_A.T @ self.lora_B.T result = result + self.scaling * lora_out return result class MoELoRALinear(nn.Module): def __init__(self, base_linear, r, num_experts=2, k=1, lora_alpha=16, lora_dropout=0.05): super().__init__() self.base_linear = base_linear self.num_experts = num_experts self.k = k self.experts = nn.ModuleList([ LoraLinear(base_linear.in_features, base_linear.out_features, r=r, lora_alpha=lora_alpha, lora_dropout=lora_dropout) for _ in range(num_experts) ]) self.gate = nn.Linear(base_linear.in_features, num_experts) def forward(self, x): base_out = self.base_linear(x) gate_scores = torch.softmax(self.gate(x), dim=-1) expert_out = 0 for i, expert in enumerate(self.experts): expert_out += gate_scores[..., i:i+1] * expert(x) return base_out + expert_out def replace_proj_with_moe_lora(model, r=8, num_experts=2, k=1, lora_alpha=16, lora_dropout=0.05): for layer in model.model.layers: for proj_name in ["up_proj", "down_proj"]: old = getattr(layer.mlp, proj_name) moe = MoELoRALinear( base_linear=old, r=r, num_experts=num_experts, k=k, lora_alpha=lora_alpha, lora_dropout=lora_dropout, ).to(next(old.parameters()).device) setattr(layer.mlp, proj_name, moe) return model # ----------------------------- # DATA PREPROCESSING # ----------------------------- def preprocess(example): tokens = tokenizer(example['text'], truncation=True, padding=False) text = example['text'] assistant_index = text.find("<|assistant|>") prefix_ids = tokenizer(text[:assistant_index], add_special_tokens=False)['input_ids'] prefix_len = len(prefix_ids) labels = tokens['input_ids'].copy() labels[:prefix_len] = [-100] * prefix_len tokens['labels'] = labels return tokens # ----------------------------- # LOAD & TRAIN MODEL # ----------------------------- def load_and_train(model_id="TinyLlama/TinyLlama-1.1B-Chat-v1.0"): global model current_dir = os.path.dirname(os.path.abspath(__file__)) json_file_path = os.path.join(current_dir, 'makemytrip_qa_full.json') with open(json_file_path, 'r', encoding='utf-8') as f: data = json.load(f) df = pd.DataFrame(data) print(f"Loaded dataset containing {len(df)} questions") system_prompt = "You are a helpful assistant that provides financial data from MakeMyTrip reports." training_data = [ {"text": f"<|system|>\n{system_prompt}\n<|user|>\n{row['question']}\n<|assistant|>\n{row['answer']}"} for _, row in df.iterrows() ] dataset = Dataset.from_list(training_data) tokenized_dataset = dataset.map(preprocess, remove_columns=["text"]) base_model = AutoModelForCausalLM.from_pretrained(model_id, trust_remote_code=True).to(device) model = replace_proj_with_moe_lora(base_model) peft_config = LoraConfig(r=8, lora_alpha=16, lora_dropout=0.05, target_modules=["o_proj"], bias="none", task_type="CAUSAL_LM") model = get_peft_model(model, peft_config) model.config.use_cache = False model.gradient_checkpointing_disable() data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) training_args = TrainingArguments( learning_rate=1e-4, lr_scheduler_type="cosine", output_dir="./results", num_train_epochs=10, per_device_train_batch_size=1, gradient_accumulation_steps=4, logging_steps=1, save_steps=10, save_total_limit=2, fp16=True, bf16=False, ) trainer = Trainer( model=model, args=training_args, train_dataset=tokenized_dataset, data_collator=data_collator ) print(torch.cuda.is_available()) # True if GPU is detected print(next(model.parameters()).device) # Shows where your model is print("Training started") trainer.train() model.eval() # ---------------- Guardrails ---------------- BLOCKED_TERMS = ["weather", "cricket", "movie", "song", "football", "holiday", "travel", "recipe", "music", "game", "sports", "politics", "election"] FINANCE_DOMAINS = [ "financial reporting", "balance sheet", "income statement", "assets and liabilities", "equity", "revenue", "profit and loss", "goodwill impairment", "cash flow", "dividends", "taxation", "investment", "valuation", "capital structure", "ownership interests", "subsidiaries", "shareholders equity", "expenses", "earnings", "debt", "amortization", "depreciation" ] finance_embeds = embed_model.encode(FINANCE_DOMAINS, convert_to_tensor=True) #-------------------------------------------------------------- # GUARD RAIL #-------------------------------------------------------------- def validate_query(query: str, threshold: float = 0.5) -> bool: q_lower = query.lower() if any(bad in q_lower for bad in BLOCKED_TERMS): print("[Guardrail] Rejected by blocklist.") return False q_emb = embed_model.encode(query, convert_to_tensor=True) sim_scores = util.cos_sim(q_emb, finance_embeds) max_score = float(sim_scores.max()) if max_score > threshold: print(f"[Guardrail] Accepted (semantic match {max_score:.2f})") return True else: print(f"[Guardrail] Rejected (low semantic score {max_score:.2f})") return False # ----------------------------- # GENERATE ANSWER # ----------------------------- def generate_answer(prompt, max_tokens=200): if prompt.strip() == "": return "Please enter a prompt!" if not validate_query(prompt): print("Query rejected: Not finance-related.") return "Query rejected: Please ask finance-related questions." system_prompt = "You are a helpful assistant that provides financial data from MakeMyTrip reports." messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}] input_text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = tokenizer(input_text, return_tensors="pt").to(device) with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=max_tokens, do_sample=True, top_p=0.9, temperature=0.7, ) decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True) answer_start_token = '<|assistant|>' answer_start_index = decoded_output.rfind(answer_start_token) if answer_start_index != -1: generated_answer = decoded_output[answer_start_index + len(answer_start_token):].strip() if generated_answer.endswith(''): generated_answer = generated_answer[:-len('')].strip() else: generated_answer = "Could not extract answer from model output." return generated_answer