Last active
February 18, 2025 02:35
-
-
Save cxfcxf/15ffc741db388d7d8ef73c67c998e13c to your computer and use it in GitHub Desktop.
use faster_whisper to transcribe and qwen2.5 32b to translate from japanese to chinese
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import os | |
| import re | |
| import gc | |
| import argparse | |
| import ollama | |
| import langdetect | |
| from faster_whisper import WhisperModel | |
| local_model_translate_failures = 0 | |
| def parse_args(): | |
| '''Parse command line arguments.''' | |
| parser = argparse.ArgumentParser(description="Translate Japanese subtitles to Chinese.") | |
| parser.add_argument("-i", dest="input_file", help="Path to the input SRT file") | |
| parser.add_argument("-t", dest="vad_threshold", type=float, default=0.5, help="VAD threshold") | |
| parser.add_argument("--tmp-srt", dest="tmp_srt", default="tmp.srt", help="Path to the temporary SRT file") | |
| return parser.parse_args() | |
| def translate_to_chinese(text, model="qwen2.5:32b"): | |
| '''Translate Japanese text to Chinese using local or openai model.''' | |
| messages = [ | |
| {"role": "system", "content": '你是一个专业的日语译简体中文翻译器. \ | |
| 把下面的文本日文翻译成中文, 保证输入和输出内容的格式一致, 输出文本为最终翻译文本, 固不要输出无关内容, 不要输出任何注释.'}, | |
| {"role": "user", "content": str(text)} | |
| ] | |
| response = ollama.chat( | |
| model=model, | |
| messages=messages, | |
| stream=False, | |
| options={"temperature": 0.1} | |
| ) | |
| return response.message.content.strip() | |
| def is_japanese(text): | |
| '''Check if the text is Japanese.''' | |
| try: | |
| lang = langdetect.detect(text) | |
| return lang == 'ja' | |
| except: | |
| return False | |
| def parse_srt(file_path): | |
| '''Parse an SRT file and return a list of subtitles.''' | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| content = file.read() | |
| # Split the content into subtitle blocks | |
| subtitle_blocks = re.split(r'\n\n', content.strip()) | |
| subtitles = [] | |
| for block in subtitle_blocks: | |
| parts = block.split('\n', 2) | |
| if len(parts) >= 3: | |
| subtitle_number = parts[0] | |
| time_code = parts[1] | |
| text = parts[2].replace('\n', ' ').strip() | |
| subtitles.append((subtitle_number, time_code, text)) | |
| return subtitles | |
| def translate_srt(input_file, output_file): | |
| '''Translate an SRT file to Chinese.''' | |
| global local_model_translate_failures | |
| subtitles = parse_srt(input_file) | |
| # keep a cache of translated text to keep translations consistent | |
| translated_cache = {} | |
| translated_subtitles = [] | |
| for subtitle_number, time_code, text in subtitles: | |
| if translated_cache.get(text): | |
| translated_text = translated_cache[text] | |
| else: | |
| translated_text = translate_to_chinese(text) | |
| if is_japanese(translated_text): | |
| translated_text = translate_to_chinese(text) | |
| local_model_translate_failures += 1 | |
| translated_cache[text] = translated_text | |
| print(f"{text} => {translated_text}") | |
| translated_subtitles.append((subtitle_number, time_code, translated_text)) | |
| with open(output_file, 'w', encoding='utf-8') as file: | |
| for subtitle in translated_subtitles: | |
| file.write(f"{subtitle[0]}\n{subtitle[1]}\n{subtitle[2]}\n\n") | |
| def format_time(seconds): | |
| '''Convert seconds to HH:MM:SS,mmm format.''' | |
| minutes, seconds = divmod(seconds, 60) | |
| hours, minutes = divmod(minutes, 60) | |
| milliseconds = (seconds - int(seconds)) * 1000 | |
| return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d},{int(milliseconds):03d}" | |
| def transcribe_video(args): | |
| '''Transcribe audio from a video file and generate an SRT file.''' | |
| model_size = "large-v3-turbo" | |
| model = WhisperModel(model_size, device="cuda", compute_type="float16") | |
| segments, info = model.transcribe( | |
| args.input_file, | |
| beam_size=5, | |
| vad_filter=True, | |
| vad_parameters={"threshold": args.vad_threshold}, | |
| language='ja', | |
| ) | |
| print(f"Detected language '{info.language}' with probability {info.language_probability}") | |
| subtitles = [] | |
| allowed_gap = 5 | |
| for segment in segments: | |
| duration = segment.end - segment.start | |
| if duration >= allowed_gap: | |
| start_time = format_time(segment.end - allowed_gap) | |
| end_time = format_time(segment.end) | |
| else: | |
| start_time = format_time(segment.start) | |
| end_time = format_time(segment.end) | |
| text = segment.text | |
| segment_id = segment.id + 1 | |
| line_out = f"{segment_id}\n{start_time} --> {end_time}\n{text.lstrip()}\n\n" | |
| print(line_out) | |
| subtitles.append(line_out) | |
| with open(args.tmp_srt, 'w', encoding='utf-8') as srt_file: | |
| for line in subtitles: | |
| srt_file.write(line) | |
| srt_file.flush() | |
| # unload model when finish | |
| del model | |
| gc.collect() | |
| def main(): | |
| '''Main function for the script.''' | |
| args = parse_args() | |
| transcribe_video(args) | |
| output_srt = os.path.splitext(args.input_file)[0] + ".srt" | |
| translate_srt(args.tmp_srt, output_srt) | |
| print(f"Translation complete. Output saved to {output_srt}") | |
| print(f"Removing temporary SRT file {args.tmp_srt}") | |
| os.remove(args.tmp_srt) | |
| print(f"Local model translation failures: {local_model_translate_failures}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment