Created
February 5, 2026 09:34
-
-
Save jaj42/27ff4d2b9ddd6cca53b207844701fb90 to your computer and use it in GitHub Desktop.
Transcribe audio using Voxtral form Mistral
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from mistralai import Mistral | |
| from tenacity import retry,stop_after_attempt | |
| import asyncio | |
| from pathlib import Path | |
| from sys import argv | |
| import subprocess | |
| import tempfile | |
| @retry(stop=stop_after_attempt(4)) | |
| def transcribe_file(client, model, filepath): | |
| with open(filepath, "rb") as fd: | |
| transcription_response = client.audio.transcriptions.complete( | |
| model=model, | |
| file={ | |
| "content": fd, | |
| "file_name": filepath.name, | |
| }, | |
| ## language="en" | |
| ) | |
| return transcription_response.text | |
| async def transcribe_audio_file( | |
| infile_path: Path, api_key: str, chunk_minutes: float = 15 | |
| ) -> str | None: | |
| print(f"Converting {infile_path}...") | |
| temp_dir = Path(tempfile.mkdtemp()) | |
| print(f"Temp dir: {temp_dir}") | |
| # Split the MP3 into segments | |
| print(f"\nSplitting audio into {chunk_minutes}-minute segments...") | |
| segment_pattern = temp_dir / "segment_%03d.opus" | |
| chunk_seconds = int(chunk_minutes * 60) | |
| cmd = [ | |
| "ffmpeg", | |
| "-i", str(infile_path), | |
| "-f", "segment", | |
| "-segment_time", str(chunk_seconds), | |
| "-c:a", "libopus", | |
| "-b:a", "64k", | |
| "-vn", | |
| str(segment_pattern), | |
| "-y", | |
| ] # fmt: skip | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print(f"ffmpeg error: {result.stderr}") | |
| return | |
| outfiles = list(temp_dir.glob("*.opus")) | |
| model = "voxtral-mini-latest" | |
| client = Mistral(api_key=api_key) | |
| print("Starting transcription...") | |
| full_transcript = [] | |
| for i, filepath in enumerate(sorted(outfiles)): | |
| print(f"Transcribing file {filepath}, {i + 1}/{len(outfiles)}") | |
| res = transcribe_file(client, model, filepath) | |
| full_transcript.append(res) | |
| # Delete temporary folder | |
| cmd = ["rm", "-rf", str(temp_dir)] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print(f"Failed to delete temp folder: {temp_dir}") | |
| return " ".join(full_transcript) | |
| async def main(): | |
| API_KEY = "***" | |
| infile = Path(argv[1]) | |
| if not infile.exists(): | |
| print(f"{infile} not found") | |
| output_file = f"{infile.stem}_transcript.txt" | |
| transcript = await transcribe_audio_file(infile, API_KEY) | |
| if transcript is None: | |
| print("No transcript data") | |
| return | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| f.write(transcript) | |
| print(f"\nTranscript saved to: {output_file}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment