import os import shutil import subprocess import uuid import json import time import asyncio import random import importlib.util from datetime import datetime from typing import List, Optional, Union, Dict from fastapi import FastAPI, UploadFile, File, Form, HTTPException from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware import google.generativeai as genai from pydantic import BaseModel from PIL import Image, ImageDraw, ImageFont app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) TEMP_DIR = "temp" STATIC_DIR = "static" STYLES_DIR = "styles" os.makedirs(TEMP_DIR, exist_ok=True) os.makedirs(STATIC_DIR, exist_ok=True) os.makedirs(STYLES_DIR, exist_ok=True) app.mount("/temp", StaticFiles(directory="temp"), name="temp") app.mount("/static", StaticFiles(directory="static"), name="static") MODEL_NAME = "gemini-2.5-flash" FONT_DIR = "font" FONT_FILES_MAP = { "vazir": "Vazirmatn.ttf", "lalezar": "Lalezar.ttf", "bangers": "Bangers.ttf", "roboto": "Roboto.ttf" } # --- Dynamic Style Loading System --- loaded_styles = {} # Map ID -> Module style_configs = {} # Map ID -> Config Dict style_templates = {} # Map ID -> Frontend Template String def load_all_styles(): print("--- Loading Styles from /styles ---") for filename in os.listdir(STYLES_DIR): if filename.endswith(".py") and filename != "__init__.py": module_name = filename[:-3] file_path = os.path.join(STYLES_DIR, filename) spec = importlib.util.spec_from_file_location(module_name, file_path) if spec and spec.loader: mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) if hasattr(mod, 'config'): ids = mod.config.get("ids", []) for style_id in ids: loaded_styles[style_id] = mod style_configs[style_id] = mod.config if hasattr(mod, 'frontend_template'): style_templates[style_id] = mod.frontend_template.strip() print(f"Loaded Style: {style_id}") # Load styles on startup load_all_styles() raw_keys = os.getenv("ALL_GEMINI_API_KEYS", "") API_KEYS = [k.strip() for k in raw_keys.split(",") if k.strip()] if not API_KEYS: single_key = os.getenv("GEMINI_API_KEY") if single_key: API_KEYS.append(single_key) print(f"--- {len(API_KEYS)} Gemini Keys Detected ---") class WordInfo(BaseModel): word: str; start: float; end: float highlight: Optional[bool] = False color: Optional[str] = None class SubtitleSegment(BaseModel): id: Union[str, int]; start: float; end: float; text: str words: Optional[List[WordInfo]] = [] class StyleConfig(BaseModel): font: str; fontSize: int; primaryColor: str; outlineColor: str backType: str; marginV: int x: Optional[int] = 0 name: Optional[str] = "classic" radius: Optional[int] = 16 paddingX: Optional[int] = 20 paddingY: Optional[int] = 10 total_video_duration: Optional[float] = None current_render_time: Optional[float] = None entry_anim_progress: Optional[float] = 1.0 alphaColor: Optional[str] = None # اضافه شده برای رنگ متن آلفا class ProcessRequest(BaseModel): file_id: str; segments: List[SubtitleSegment] video_width: int; video_height: int; style: StyleConfig class StylePrompt(BaseModel): description: str class JobStatus: QUEUED = "queued"; PROCESSING = "processing" COMPLETED = "completed"; FAILED = "failed" class Job: def __init__(self, job_id: str, request_data: ProcessRequest): self.id = job_id; self.data = request_data; self.status = JobStatus.QUEUED self.created_at = datetime.now(); self.result_url = None; self.error_message = None render_queue = asyncio.Queue() jobs_db: Dict[str, Job] = {} async def queue_worker(): print("--- Queue Worker Started ---") while True: job_id = await render_queue.get() job = jobs_db.get(job_id) if job: try: print(f"Processing job: {job_id}") job.status = JobStatus.PROCESSING output_url = process_render_logic(job.data) job.result_url = output_url job.status = JobStatus.COMPLETED print(f"Job {job_id} completed.") except Exception as e: print(f"Job {job_id} failed: {e}") job.status = JobStatus.FAILED job.error_message = str(e) render_queue.task_done() @app.on_event("startup") async def startup_event(): asyncio.create_task(queue_worker()) def get_video_info(path): try: cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height,duration", "-of", "json", path] res = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(res.stdout) stream = data['streams'][0] w = int(stream.get('width', 1080)); h = int(stream.get('height', 1920)); dur = stream.get('duration') if not dur: cmd_dur = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", path] res_dur = subprocess.run(cmd_dur, capture_output=True, text=True) data_dur = json.loads(res_dur.stdout) dur = data_dur['format'].get('duration', 60) return w, h, float(dur) except: return 1080, 1920, 60.0 def get_font_object(style_font_name, size): target_filename = FONT_FILES_MAP.get(style_font_name, "Vazirmatn.ttf") target_path = os.path.join(FONT_DIR, target_filename) if not os.path.exists(target_path): target_path = os.path.join(FONT_DIR, "Vazirmatn.ttf") if os.path.exists(target_path): return ImageFont.truetype(target_path, size) return ImageFont.load_default() def get_color_tuple(color_str: str, default=(255, 255, 255, 255)): if not color_str or not isinstance(color_str, str): return default color_str = color_str.strip().lower() if color_str.startswith('#'): try: hex_val = color_str.lstrip('#') if len(hex_val) == 6: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4)) + (255,) elif len(hex_val) == 8: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4, 6)) except: pass elif color_str.startswith('rgba'): try: content = color_str[color_str.find('(')+1 : color_str.rfind(')')] parts = [x.strip() for x in content.split(',')] if len(parts) >= 4: r, g, b = int(parts[0]), int(parts[1]), int(parts[2]) a = int(float(parts[3]) * 255) return (r, g, b, a) except: pass elif color_str.startswith('rgb'): try: content = color_str[color_str.find('(')+1 : color_str.rfind(')')] parts = [x.strip() for x in content.split(',')] if len(parts) >= 3: return (int(parts[0]), int(parts[1]), int(parts[2]), 255) except: pass return default # --- Main Drawing Function (Refactored) --- def create_subtitle_image(text_parts: list, active_idx: int, width: int, height: int, style: StyleConfig, word_infos: Optional[List[WordInfo]] = None): img = Image.new('RGBA', (width, height), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) font = get_font_object(style.font, style.fontSize) # Text Wrapping Logic lines = [] # اگر استایل موزیکال بود، همه کلمات در یک خط باشند (بدون محدودیت) if style.name == "music_player": lines.append(text_parts) else: # برای بقیه استایل‌ها: محدودیت 5 کلمه MAX_WORDS_PER_LINE = 5 current_line = [] for i, word in enumerate(text_parts): current_line.append(word) if len(current_line) == MAX_WORDS_PER_LINE: lines.append(current_line) current_line = [] if current_line: lines.append(current_line) # Pre-calculate line metrics (width, etc) line_metrics = [] max_line_width = 0 for line_words in lines: w_widths = [] l_width = 0 full_line_text = " ".join(line_words) try: l_width = draw.textlength(full_line_text, font=font, direction='rtl', language='fa') except: l_width = font.getlength(full_line_text) if l_width > max_line_width: max_line_width = l_width # We also need individual word widths for the styles for w in line_words: try: wl = draw.textlength(w, font=font, direction='rtl', language='fa') except: wl = font.getlength(w) w_widths.append(wl) line_metrics.append({"width": l_width, "words": line_words, "word_widths": w_widths}) # --- Delegate to Style Module --- style_module = loaded_styles.get(style.name) # ... بخشی از کد ... if style_module and hasattr(style_module, 'draw_frame'): style_module.draw_frame( draw=draw, img=img, width=width, height=height, style_config=style, lines=lines, line_metrics=line_metrics, active_idx=active_idx, font=font, color_parser=get_color_tuple, word_infos=word_infos # <--- این خط اضافه شد ) else: # Fallback if style not found (e.g. use classic logic inline or default) print(f"Warning: Style {style.name} not found, using default.") # Simple fallback text y = height - style.marginV draw.text((width/2, y), "Style Error", font=font, fill="red") return img def generate_subtitle_video(data: ProcessRequest, temp_dir: str): list_file = os.path.join(temp_dir, f"{data.file_id}_list.txt") empty_img_path = os.path.join(temp_dir, "empty.png") if not os.path.exists(empty_img_path): Image.new('RGBA', (data.video_width, data.video_height), (0, 0, 0, 0)).save(empty_img_path) # --- محاسبه زمان کل ویدیو --- sorted_segments = sorted(data.segments, key=lambda x: x.start) if sorted_segments: setattr(data.style, 'total_video_duration', sorted_segments[-1].end) else: setattr(data.style, 'total_video_duration', 1.0) with open(list_file, "w") as f: current_timeline = 0.0 last_generated_image = "empty.png" # <--- این خط حتماً باید اضافه شود sorted_segments = sorted(data.segments, key=lambda x: x.start) for idx, seg in enumerate(sorted_segments): start_time = round(max(seg.start, current_timeline), 3) end_time = round(max(seg.end, start_time + 0.1), 3) if end_time - start_time < 0.04: continue # --- پر کردن فاصله خالی (Gap Filling) --- gap = round(start_time - current_timeline, 3) if gap > 0.005: # اگر استایل موزیکال است، فریم‌های متحرک بساز + متن جمله قبلی را نگه دار if data.style.name == "music_player": # پیدا کردن متن جمله قبلی if idx > 0: prev_seg = sorted_segments[idx-1] text_to_show = [w.word for w in prev_seg.words] if prev_seg.words else prev_seg.text.split() else: text_to_show = [] # اگر هنوز جمله اول شروع نشده، متن خالی باشه gap_cursor = current_timeline GAP_FPS = 0.05 while gap_cursor < start_time: setattr(data.style, 'current_render_time', gap_cursor) gap_name = f"sub_gap_{data.file_id}_{int(gap_cursor*1000)}.png" # اینجا به جای []، متن جمله قبلی (text_to_show) را می‌فرستیم img = create_subtitle_image(text_to_show, -1, data.video_width, data.video_height, data.style) img.save(os.path.join(temp_dir, gap_name)) f.write(f"file '{gap_name}'\nduration {GAP_FPS:.3f}\n") gap_cursor += GAP_FPS last_generated_image = gap_name current_timeline = start_time else: # برای سایر استایل‌ها همان منطق قبلی (تصویر ثابت) if last_generated_image != "empty.png": fill_img = last_generated_image else: fill_img = "empty.png" f.write(f"file '{fill_img}'\nduration {gap:.3f}\n") current_timeline += gap current_timeline = start_time available_duration = round(end_time - current_timeline, 3) words = [w.word for w in seg.words] if seg.words else seg.text.split() if seg.words and len(words) > 0: seg.words.sort(key=lambda x: x.start) words = [w.word for w in seg.words] # --- منطق استایل موزیکال (فریم به فریم برای روانی حرکت) --- if data.style.name == "music_player": SUB_FRAME_DURATION = 0.05 time_cursor = start_time ANIMATION_DURATION = 0.4 while time_cursor < end_time: active_word_index = -1 for i, w_info in enumerate(seg.words): if time_cursor >= w_info.start and time_cursor < w_info.end: active_word_index = i break setattr(data.style, 'current_render_time', time_cursor) # محاسبه انیمیشن time_into_segment = time_cursor - start_time anim_progress = min(1.0, time_into_segment / ANIMATION_DURATION) setattr(data.style, 'entry_anim_progress', anim_progress) name = f"sub_{data.file_id}_{idx}_{int(time_cursor*1000)}.png" img = create_subtitle_image(words, active_word_index, data.video_width, data.video_height, data.style, word_infos=seg.words) img.save(os.path.join(temp_dir, name)) # *** تغییر ۲: ذخیره نام آخرین عکس *** last_generated_image = name f.write(f"file '{name}'\nduration {SUB_FRAME_DURATION:.3f}\n") time_cursor += SUB_FRAME_DURATION current_timeline = end_time else: # --- منطق سایر استایل‌ها --- word_files, total_word_raw_duration = [], 0 for i, w_info in enumerate(seg.words): name = f"sub_{data.file_id}_{idx}_{i}.png" img = create_subtitle_image(words, i, data.video_width, data.video_height, data.style, word_infos=seg.words) img.save(os.path.join(temp_dir, name)) raw_dur = max(0.04, w_info.end - w_info.start) word_files.append({"file": name, "dur": raw_dur}) total_word_raw_duration += raw_dur scale_factor = available_duration / total_word_raw_duration if total_word_raw_duration > 0 else 1 accumulated_written = 0.0 for wf in word_files: final_dur = max(0.01, round(wf["dur"] * scale_factor, 3)) f.write(f"file '{wf['file']}'\nduration {final_dur:.3f}\n") accumulated_written += final_dur # آخرین عکس را نگه می‌داریم (هرچند در استایل‌های دیگر معمولاً گپ سیاه است) last_generated_image = wf['file'] current_timeline += accumulated_written else: # حالت بدون کلمات زمانی (کل سگمنت یکجا) name = f"sub_{data.file_id}_{idx}_full.png" img = create_subtitle_image(words, -1, data.video_width, data.video_height, data.style, word_infos=seg.words) img.save(os.path.join(temp_dir, name)) f.write(f"file '{name}'\nduration {available_duration:.3f}\n") last_generated_image = name current_timeline += available_duration f.write(f"file 'empty.png'\nduration 30.0\n") return list_file def process_render_logic(req: ProcessRequest) -> str: req.segments = [s for s in req.segments if s.end > s.start] req.segments.sort(key=lambda x: x.start) lst = generate_subtitle_video(req, TEMP_DIR) inp = f"{TEMP_DIR}/{req.file_id}.mp4" if not os.path.exists(inp): raise Exception("Input video not found") sub_video_path = f"{TEMP_DIR}/{req.file_id}_sub_render.mov" out = f"{TEMP_DIR}/{req.file_id}_final_{int(time.time())}.mp4" cmd_step1 = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", lst, "-r", "30", "-s", f"{req.video_width}x{req.video_height}", "-c:v", "png", "-pix_fmt", "rgba", sub_video_path] res1 = subprocess.run(cmd_step1, capture_output=True, text=True) if res1.returncode != 0: raise Exception(f"Subtitle generation failed: {res1.stderr}") cmd_step2 = ["ffmpeg", "-y", "-i", inp, "-i", sub_video_path, "-filter_complex", "[0:v][1:v]overlay=0:0:eof_action=pass[outv]", "-map", "[outv]", "-map", "0:a", "-c:v", "libx264", "-r", "30", "-preset", "ultrafast", "-c:a", "aac", out] res2 = subprocess.run(cmd_step2, capture_output=True, text=True) if res2.returncode != 0: raise Exception(f"Merge failed: {res2.stderr}") if os.path.exists(sub_video_path): os.remove(sub_video_path) return f"/temp/{os.path.basename(out)}" @app.get("/") async def index(): return FileResponse("index.html") # --- New Endpoint for Styles --- @app.get("/api/styles") def get_style_definitions(): return { "styles": style_configs, "templates": style_templates } @app.post("/api/generate-style") def generate_style_api(req: StylePrompt): if not API_KEYS: raise HTTPException(500, "API Keys Missing") for _ in range(3): try: genai.configure(api_key=random.choice(API_KEYS)) model = genai.GenerativeModel(MODEL_NAME) prompt = f"""You are a JSON generator. Create a subtitle style based on: "{req.description}". Return JSON only. Keys: primaryColor, outlineColor, backType (solid/transparent/outline), font (vazir/lalezar/bangers/roboto), fontSize (30-90).""" res = model.generate_content(prompt, generation_config={"response_mime_type": "application/json"}) data = json.loads(res.text.replace('```json', '').replace('```', '').strip()) return {"primaryColor": data.get("primaryColor", "#FFFFFF"), "outlineColor": data.get("outlineColor", "#000000"), "backType": data.get("backType", "solid"), "font": data.get("font", "vazir"), "fontSize": int(data.get("fontSize", 60))} except: continue return {"primaryColor":"#FFFFFF", "outlineColor":"#000000", "font":"vazir", "fontSize":60, "backType":"solid"} @app.post("/api/upload") def upload(file: UploadFile = File(...)): if not API_KEYS: raise HTTPException(500, "API Keys Missing") fid = str(uuid.uuid4())[:8]; ext = file.filename.split('.')[-1] raw_path, fixed_path, audio_path = f"{TEMP_DIR}/{fid}_raw.{ext}", f"{TEMP_DIR}/{fid}.mp4", f"{TEMP_DIR}/{fid}.mp3" try: with open(raw_path, "wb") as f: shutil.copyfileobj(file.file, f) subprocess.run(["ffmpeg", "-y", "-i", raw_path, "-r", "30", "-c:v", "libx264", "-preset", "ultrafast", "-c:a", "copy", fixed_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) w, h, duration = get_video_info(fixed_path) subprocess.run(["ffmpeg", "-y", "-i", fixed_path, "-vn", "-acodec", "libmp3lame", "-q:a", "4", audio_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) file_to_send = audio_path except Exception as e: raise HTTPException(500, f"File Processing Error: {e}") for _ in range(50): try: genai.configure(api_key=random.choice(API_KEYS)) vf = genai.upload_file(path=file_to_send) while vf.state.name == "PROCESSING": time.sleep(2); vf = genai.get_file(vf.name) if vf.state.name == "FAILED": raise Exception("Gemini Failed") model = genai.GenerativeModel(MODEL_NAME) prompt = f"The audio is {duration:.2f}s. Transcribe Persian speech to JSON. Timestamps MUST NOT exceed {duration:.2f}s. JSON: {{segments: [{{start, end, text, keywords}}], style_suggestion: {{...}}}}" res = model.generate_content([vf, prompt], generation_config={"response_mime_type": "application/json"}) data = json.loads(res.text.replace('```json', '').replace('```', '').strip()) raw_segs = data.get("segments", []); final_segs = [] if not raw_segs: raise Exception("Empty transcript") seg_cnt = 0 for s in raw_segs: base_start, base_end = float(s.get("start", 0)), float(s.get("end", 0)) if base_start >= duration: continue base_end = min(base_end, duration) if base_end <= base_start: base_end = base_start + 1.0 raw_words = s.get("text", "").split() if not raw_words: continue full_dur = base_end - base_start total_wc = len(raw_words) if total_wc == 0: continue for k in range(0, total_wc, 9): chunk = raw_words[k : k+9] if not chunk: continue c_start = round(base_start + (full_dur * (k / total_wc)), 3) c_end = round(base_start + (full_dur * ((k + len(chunk)) / total_wc)), 3) c_words = [] chunk_dur = c_end - c_start for j, w in enumerate(chunk): w_s = round(c_start + (chunk_dur * j / len(chunk)), 3) w_e = round(c_start + (chunk_dur * (j + 1) / len(chunk)), 3) c_words.append({ "word": w, "start": w_s, "end": w_e, "highlight": w in s.get("keywords", []) }) final_segs.append({ "id": seg_cnt, "start": c_start, "end": c_end, "text": " ".join(chunk), "words": c_words }) seg_cnt += 1 try: genai.delete_file(vf.name) except: pass if os.path.exists(audio_path): os.remove(audio_path) if os.path.exists(raw_path): os.remove(raw_path) return {"file_id": fid, "url": f"/temp/{fid}.mp4", "width": w, "height": h, "segments": final_segs, "suggested_style": data.get("style_suggestion")} except Exception as e: print(e); continue raise HTTPException(500, "Failed after 50 attempts") @app.post("/api/reupload") async def reupload_video(file: UploadFile = File(...), file_id: str = Form(...)): if not file_id or '/' in file_id or '\\' in file_id: raise HTTPException(400, "Invalid file_id") target_path = os.path.join(TEMP_DIR, f"{file_id}.mp4") try: with open(target_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) except Exception as e: raise HTTPException(500, f"Could not save file: {e}") finally: await file.close() return {"status": "success", "message": f"File {file_id}.mp4 restored."} @app.post("/api/enqueue-render") async def enqueue_render(req: ProcessRequest): if not os.path.exists(os.path.join(TEMP_DIR, f"{req.file_id}.mp4")): return JSONResponse(status_code=200, content={"error": "Video not found", "error_code": "VIDEO_NOT_FOUND"}) job_id = str(uuid.uuid4()) jobs_db[job_id] = Job(job_id, req) await render_queue.put(job_id) return {"job_id": job_id, "status": JobStatus.QUEUED} @app.get("/api/job-status/{job_id}") async def get_job_status(job_id: str): job = jobs_db.get(job_id) if not job: raise HTTPException(404, "Job not found") response = {"job_id": job.id, "status": job.status} if job.status == JobStatus.QUEUED: response["queue_position"] = sum(1 for j in jobs_db.values() if j.status == JobStatus.QUEUED and j.created_at < job.created_at) + 1 elif job.status == JobStatus.COMPLETED: response["url"] = job.result_url elif job.status == JobStatus.FAILED: response["error"] = job.error_message return response