Spaces:
Sleeping
Sleeping
| import json | |
| import base64 | |
| import os, io | |
| import mimetypes | |
| from PIL import Image | |
| import gradio as gr | |
| def import_history(history, file): | |
| if os.path.getsize(file.name) > 100e6: | |
| raise ValueError("History larger than 100 MB") | |
| with open(file.name, mode="rb") as f: | |
| content = f.read().decode('utf-8', 'replace') | |
| import_data = json.loads(content) | |
| # Handle different import formats | |
| if 'messages' in import_data: | |
| # New OpenAI-style format | |
| messages = import_data['messages'] | |
| system_prompt_value = '' | |
| chat_history = [] | |
| msg_num = 1 | |
| for msg in messages: | |
| if msg['role'] == 'system': | |
| system_prompt_value = msg['content'] | |
| continue | |
| if msg['role'] == 'user': | |
| content = msg['content'] | |
| if isinstance(content, list): | |
| for item in content: | |
| if item.get('type', '') == 'image_url': | |
| # Create gr.Image from data URI | |
| image_data = base64.b64decode(item['image_url']['url'].split(',')[1]) | |
| img = Image.open(io.BytesIO(image_data)) | |
| chat_history.append({ | |
| "role": msg['role'], | |
| "content": gr.Image(value=img) | |
| }) | |
| elif item.get('type', '') == 'file': | |
| # Handle file content with gr.File | |
| fname = os.path.basename(item['file'].get('name', f'download{msg_num}')) | |
| dir_path = os.path.dirname(file.name) | |
| temp_path = os.path.join(dir_path, fname) | |
| file_data = base64.b64decode(item['file']['url'].split(',')[1]) | |
| if (len(file_data) > 15e6): | |
| raise ValueError(f"file content `{fname}` larger than 15 MB") | |
| with open(temp_path, "wb") as tempf: | |
| tempf.write(file_data) | |
| chat_history.append({ | |
| "role": msg['role'], | |
| "content": gr.File(value=temp_path, | |
| label=fname) | |
| }) | |
| else: | |
| chat_history.append(msg) | |
| else: | |
| chat_history.append(msg) | |
| elif msg['role'] == 'assistant': | |
| chat_history.append(msg) | |
| msg_num = msg_num + 1 | |
| else: | |
| # Legacy format handling | |
| if 'history' in import_data: | |
| legacy_history = import_data['history'] | |
| system_prompt_value = import_data.get('system_prompt', '') | |
| else: | |
| legacy_history = import_data | |
| system_prompt_value = '' | |
| chat_history = [] | |
| # Convert tuple/pair format to messages format | |
| for pair in legacy_history: | |
| if pair[0]: # User message | |
| if isinstance(pair[0], dict) and 'file' in pair[0]: | |
| if 'data' in pair[0]['file']: | |
| # Legacy format with embedded data | |
| file_data = pair[0]['file']['data'] | |
| mime_type = file_data.split(';')[0].split(':')[1] | |
| if mime_type.startswith('image/'): | |
| image_data = base64.b64decode(file_data.split(',')[1]) | |
| img = Image.open(io.BytesIO(image_data)) | |
| chat_history.append({ | |
| "role": "user", | |
| "content": gr.Image(value=img) | |
| }) | |
| else: | |
| fname = pair[0]['file'].get('name', 'download') | |
| dir_path = os.path.dirname(file.name) | |
| temp_path = os.path.join(dir_path, fname) | |
| file_data = base64.b64decode(file_data.split(',')[1]) | |
| with open(temp_path, "wb") as tempf: | |
| tempf.write(file_data) | |
| chat_history.append({ | |
| "role": "user", | |
| "content": gr.File(value=temp_path, | |
| label=fname) | |
| }) | |
| else: | |
| # Keep as-is but convert to message format | |
| chat_history.append({ | |
| "role": "user", | |
| "content": pair[0] | |
| }) | |
| else: | |
| chat_history.append({ | |
| "role": "user", | |
| "content": pair[0] | |
| }) | |
| if pair[1]: # Assistant message | |
| chat_history.append({ | |
| "role": "assistant", | |
| "content": pair[1] | |
| }) | |
| return chat_history, system_prompt_value | |
| def get_export_js(): | |
| return """ | |
| async (chat_history, system_prompt) => { | |
| let messages = []; | |
| if (system_prompt) { | |
| messages.push({ | |
| "role": "system", | |
| "content": system_prompt | |
| }); | |
| } | |
| async function processFile(file_url) { | |
| const response = await fetch(file_url); | |
| const blob = await response.blob(); | |
| return new Promise((resolve) => { | |
| const reader = new FileReader(); | |
| reader.onloadend = () => resolve({ | |
| data: reader.result, | |
| type: blob.type | |
| }); | |
| reader.onerror = (error) => resolve(null); | |
| reader.readAsDataURL(blob); | |
| }); | |
| } | |
| for (let message of chat_history) { | |
| if (!message.role || !message.content) continue; | |
| if (message.content && typeof message.content === 'object') { | |
| if (message.content.file) { | |
| try { | |
| const file_data = await processFile(message.content.file.url); | |
| if (!file_data) continue; | |
| if (file_data.type.startsWith('image/')) { | |
| messages.push({ | |
| "role": message.role, | |
| "content": [{ | |
| "type": "image_url", | |
| "image_url": { | |
| "url": file_data.data | |
| } | |
| }] | |
| }); | |
| } else { | |
| const fileLink = document.querySelector(`a[data-testid="chatbot-file"][download][href*="${message.content.file.url.split('/').pop()}"]`); | |
| const fileName = fileLink ? fileLink.getAttribute('download') : (message.content.file.name || "download"); | |
| messages.push({ | |
| "role": message.role, | |
| "content": [{ | |
| "type": "file", | |
| "file": { | |
| "url": file_data.data, | |
| "name": fileName, | |
| "mime_type": file_data.type | |
| } | |
| }] | |
| }); | |
| } | |
| } catch (error) {} | |
| } | |
| } else { | |
| messages.push({ | |
| "role": message.role, | |
| "content": message.content | |
| }); | |
| } | |
| } | |
| const export_data = { messages }; | |
| const blob = new Blob([JSON.stringify(export_data)], {type: 'application/json'}); | |
| const url = URL.createObjectURL(blob); | |
| const a = document.createElement('a'); | |
| a.href = url; | |
| a.download = 'chat_history.json'; | |
| document.body.appendChild(a); | |
| a.click(); | |
| document.body.removeChild(a); | |
| URL.revokeObjectURL(url); | |
| } | |
| """ |