@@ -24,15 +24,27 @@ def gcloud_API_translate_text(textList, project_id, target_language_code):
client = translate .TranslationServiceClient ()
location = "global"
parent = f"projects/{ project_id } /locations/{ location } "
response = client .translate_text (
request = {
"parent" : parent ,
"contents" : textList ,
"mime_type" : "text/plain" ,
"target_language_code" : target_language_code ,
}
)
return response .translations
MAX_STRINGS = 1024 # GCloud API has hard limit of 1024 lines per request
numStringsToTranslate = len (textList )
numStringsTranslated = 0
translations = []
while numStringsTranslated < numStringsToTranslate :
endIndex = numStringsTranslated + MAX_STRINGS
if endIndex > numStringsToTranslate :
endIndex = numStringsToTranslate
contents = textList [numStringsTranslated :endIndex ]
response = client .translate_text (
request = {
"parent" : parent ,
"contents" : contents ,
"mime_type" : "text/plain" ,
"target_language_code" : target_language_code ,
}
)
numStringsTranslated += len (response .translations )
translations .extend (response .translations )
return translations
os .system ("" ) # enables ansi escape characters in terminal
LINE_CLEAR = '\x1b [2K' # <-- ANSI sequence
@@ -46,6 +58,7 @@ def gcloud_API_translate_text(textList, project_id, target_language_code):
parser .add_argument ('-l' , '--language' , help = 'Language to translate to' , default = 'en-US' , choices = ['en-US' , 'zh-CN' , 'zh-TW' , 'ja' , 'ko' ])
parser .add_argument ('--test_run' , action = 'store_true' , help = 'Runs a few times and displays debug info' )
parser .add_argument ('--skip_cleanup' , action = 'store_true' , help = 'Does not delete temporary files' )
parser .add_argument ('--skip_extract' , action = 'store_true' , help = 'Skips extracting subtitle images step (uses cached temporary files)' )
args = parser .parse_args ()
video_path = args .video_path
@@ -57,59 +70,61 @@ def gcloud_API_translate_text(textList, project_id, target_language_code):
project_id = args .project_id
test_run = args .test_run
skip_cleanup = args .skip_cleanup or test_run
skip_extract = args .skip_extract
# Generate raw images of the subtitles
print ("Extracting subtitle images with VideoSubFinder (takes quite a long time) ..." )
startTime = time .time ()
subprocess .run ([
"VideoSubFinderWXW.exe" ,
"--clear_dirs" ,
"--run_search" ,
"--create_cleared_text_images" ,
"--input_video" , video_path ,
"--output_dir" , tmp_dir ,
"--num_threads" , str (4 ),
"--num_ocr_threads" , str (4 ),
"--top_video_image_percent_end" , str (0.25 ),
"--bottom_video_image_percent_end" , str (0.0 )
], capture_output = True )
endTime = time .time ()
print ("Completed! Took " + str (endTime - startTime )+ "s" )
if not skip_extract :
print ("Extracting subtitle images with VideoSubFinder (takes quite a long time) ..." )
startTime = time .time ()
subprocess .run ([
"VideoSubFinderWXW.exe" ,
"--clear_dirs" ,
"--run_search" ,
"--create_cleared_text_images" ,
"--input_video" , video_path ,
"--output_dir" , tmp_dir ,
"--num_threads" , str (4 ),
"--num_ocr_threads" , str (4 ),
"--top_video_image_percent_end" , str (0.25 ),
"--bottom_video_image_percent_end" , str (0.0 )
], capture_output = True )
endTime = time .time ()
print ("Completed! Took " + str (endTime - startTime )+ "s" )
# Enumerate all the images
imagePaths = []
if os .path .isdir (txt_images_folder ):
filetypes = ('*.jpg' , '*.jpeg' , '*.png' )
for filetype in filetypes :
globPath = os .path .join (txt_images_folder , filetype );
imagePaths .extend (glob .glob (globPath ));
filetypes = ('*.jpg' , '*.jpeg' , '*.png' )
for filetype in filetypes :
globPath = os .path .join (txt_images_folder , filetype );
imagePaths .extend (glob .glob (globPath ));
else :
print ("ERROR: Invalid paths provided!\n " )
parser .print_help ()
sys .exit (2 )
print ("ERROR: Invalid paths provided!\n " )
parser .print_help ()
sys .exit (2 )
numImages = len (imagePaths )
# Reduce image size OCR has a max image size
i = 0
totalTimeElapsedS = 0
estimateText = "Estimated time remaining: Unknown"
for imagePath in imagePaths :
if test_run and i > 10 :
break
progressText = "Preprocessing image " + str (i + 1 )+ "/" + str (numImages )+ ". " + estimateText + ". Filename: " + os .path .basename (imagePath )
print (end = LINE_CLEAR )
print (progressText , end = '\r ' )
startTime = time .time ()
# Leaving some padding and not doing "-trim" seems to be important
subprocess .run (["magick" , "convert" , imagePath , "-resize" , "x200>" , imagePath ], capture_output = True )
endTime = time .time ()
i += 1
# Some logic to provide an estimated time
timeElapsed = endTime - startTime
totalTimeElapsedS += timeElapsed
averageTime = totalTimeElapsedS / float (i )
numImagesRemaining = numImages - i
estimateText = "Estimated time remaining: " + str (averageTime * numImagesRemaining )+ "s"
if test_run and i > 10 :
break
progressText = "Preprocessing image " + str (i + 1 )+ "/" + str (numImages )+ ". " + estimateText + ". Filename: " + os .path .basename (imagePath )
print (end = LINE_CLEAR )
print (progressText , end = '\r ' )
startTime = time .time ()
# Leaving some padding and not doing "-trim" seems to be important
subprocess .run (["magick" , "convert" , imagePath , "-resize" , "x200>" , imagePath ], capture_output = True )
endTime = time .time ()
i += 1
# Some logic to provide an estimated time
timeElapsed = endTime - startTime
totalTimeElapsedS += timeElapsed
averageTime = totalTimeElapsedS / float (i )
numImagesRemaining = numImages - i
estimateText = "Estimated time remaining: " + str (averageTime * numImagesRemaining )+ "s"
# Run Windows OCR on the images and save to text file
i = 0
@@ -119,68 +134,70 @@ def gcloud_API_translate_text(textList, project_id, target_language_code):
baseNameList = []
textList = []
for imagePath in imagePaths :
if test_run and i > 10 :
break
progressText = "Running OCR on image " + str (i + 1 )+ "/" + str (numImages )+ ". " + estimateText + ". Filename: " + os .path .basename (imagePath )
print (end = LINE_CLEAR )
print (progressText , end = '\r ' )
startTime = time .time ()
result = subprocess .run (["Windows.Media.Ocr.Cli.exe" , imagePath ], capture_output = True )
ocrText = result .stdout .decode ("utf-8" )
# Replace carriage returns with spaces.
# This leads to more naturally flowing translations most of the time.
ocrText = " " .join (ocrText .splitlines ()).strip ()
filename , ext = os .path .splitext (imagePath )
basename = os .path .basename (filename )
# GCloud API throws error if we give it blanks
if len (ocrText ) > 0 :
baseNameList .append (basename )
textList .append (ocrText )
totalCharacterCount += len (ocrText )
endTime = time .time ()
i += 1
# Some logic to provide an estimated time
timeElapsed = endTime - startTime
totalTimeElapsedS += timeElapsed
averageTime = totalTimeElapsedS / float (i )
numImagesRemaining = numImages - i
estimateText = "Estimated time remaining: " + str (averageTime * numImagesRemaining )+ "s"
if test_run and i > 10 :
break
progressText = "Running OCR on image " + str (i + 1 )+ "/" + str (numImages )+ ". " + estimateText + ". Filename: " + os .path .basename (imagePath )
print (end = LINE_CLEAR )
print (progressText , end = '\r ' )
startTime = time .time ()
result = subprocess .run (["Windows.Media.Ocr.Cli.exe" , imagePath ], capture_output = True )
ocrText = result .stdout .decode ("utf-8" )
# Replace carriage returns with spaces.
# This leads to more naturally flowing translations most of the time.
ocrText = " " .join (ocrText .splitlines ()).strip ()
filename , ext = os .path .splitext (imagePath )
basename = os .path .basename (filename )
# GCloud API throws error if we give it blanks
if len (ocrText ) > 0 :
baseNameList .append (basename )
textList .append (ocrText )
totalCharacterCount += len (ocrText )
endTime = time .time ()
i += 1
# Some logic to provide an estimated time
timeElapsed = endTime - startTime
totalTimeElapsedS += timeElapsed
averageTime = totalTimeElapsedS / float (i )
numImagesRemaining = numImages - i
estimateText = "Estimated time remaining: " + str (averageTime * numImagesRemaining )+ "s"
# Additionally writeout to file for easier debugging
original_text_file = os .path .join (tmp_dir , "original_text.txt" )
with open (original_text_file , 'w' , encoding = "utf-8" ) as f :
for i in range (len (textList )):
f .write (str (i )+ ": " + textList [i ]+ "\n " )
print ('' )
# Translate via Google Cloud Translation API and output to disk
print ("Translating text..." )
translations = gcloud_API_translate_text (textList , project_id , target_language_code )
print ("# of characters translated: " + str (totalCharacterCount ))
print ("# of lines translated: " + str (len (translations )))
for i in range (len (translations )):
out_filepath = os .path .join (txt_results_folder , baseNameList [i ]) + '.txt'
translated_text = translations [i ].translated_text
if test_run :
print ("Orignal text: " + repr (textList [i ]))
print ("Translated text: " + repr (translated_text ))
with open (out_filepath , 'w' , encoding = "utf-8" ) as f :
f .write (translated_text )
# Additional writeout all text to files for easier debugging
if skip_cleanup :
with open (os .path .join (tmp_dir , "original_text.txt" ), 'w' , encoding = "utf-8" ) as f :
for i in range (len (textList )):
f .write (str (i )+ ": " + textList [i ]+ "\n " )
with open (os .path .join (tmp_dir , "translated_text.txt" ), 'w' , encoding = "utf-8" ) as f :
for i in range (len (translations )):
f .write (str (i )+ ": " + translations [i ].translated_text + "\n " )
out_filepath = os .path .join (txt_results_folder , baseNameList [i ]) + '.txt'
translated_text = translations [i ].translated_text
if test_run :
print ("Orignal text: " + repr (textList [i ]))
print ("Translated text: " + repr (translated_text ))
with open (out_filepath , 'w' , encoding = "utf-8" ) as f :
f .write (translated_text )
# Additionally writeout to file for easier debugging
with open (os .path .join (tmp_dir , "translated_text.txt" ), 'w' , encoding = "utf-8" ) as f :
for i in range (len (translations )):
f .write (str (i )+ ": " + translations [i ].translated_text + "\n " )
# Generate subtitle file
srt_path = video_filename + ".srt"
print ("Generating softsubs with VideoSubFinder... Output file: " + srt_path )
startTime = time .time ()
subprocess .run ([
"VideoSubFinderWXW.exe" ,
"--create_sub_from_txt_results" , srt_path ,
"--output_dir" , tmp_dir
"VideoSubFinderWXW.exe" ,
"--create_sub_from_txt_results" , srt_path ,
"--output_dir" , tmp_dir
], capture_output = True )
endTime = time .time ()
print ("Completed! Took " + str (endTime - startTime )+ "s" )
# Cleanup if needed
if not skip_cleanup :
shutil .rmtree (tmp_dir , ignore_errors = True )
shutil .rmtree (tmp_dir , ignore_errors = True )