Skip to content

Instantly share code, notes, and snippets.

@InJeCTrL
Last active May 26, 2021 20:17
Show Gist options
  • Select an option

  • Save InJeCTrL/8636e51eceaf71fdfb66274bac59960f to your computer and use it in GitHub Desktop.

Select an option

Save InJeCTrL/8636e51eceaf71fdfb66274bac59960f to your computer and use it in GitHub Desktop.
m3u8视频分段下载&合并
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
from Crypto.Cipher import AES
import threading
import requests
import os
class m3u8_downloader:
'''
m3u8下载类
'''
def __init__(self, url, outfile = "downloaded.ts", n_thread = 20):
'''
初始化m3u8信息
url: m3u8 url
outfile: 合成后输出文件名
n_thread: 下载线程数
'''
# m3u8 url
self.url = url
# 待输出合成文件名
self.filename = outfile
# 任务线程数
self.n_thread = n_thread
# m3u8所处url目录
self.m3u8dir = url.rsplit("/", 1)[0]
# ts切片列表
self.tslist = []
# 切片文件临时目录
self.tmpfolder = "./TS"
# 切片解密
self.cryptor = None
# 已下载切片数
self.downloaded = 0
# 已下载文件数锁
self.m = threading.Lock()
self.__getm3u8info__()
def __getm3u8info__(self):
'''
解析m3u8信息
'''
with requests.get(self.url) as response:
data = response.text.split("\n")
for iline, line in enumerate(data):
# 密钥行
if line.find("#EXT-X-KEY") == 0:
if line.upper().find("METHOD=NONE") == -1:
key = None
IV = None
key_parts = line[11:].split(',')
for part in key_parts:
if part.find("URI=") == 0:
keyURL_start = part[5:]
keyURL = keyURL_start[:-1]
with requests.get(keyURL) as response:
key = response.content
elif part.find("IV=") == 0:
IV = int(part[3:], 16)
IV = IV.to_bytes(16, byteorder = "big")
if key:
self.cryptor = AES.new(key, AES.MODE_CBC, IV if IV else key)
# 下一行ts
elif line.find("#EXTINF") == 0:
self.tslist.append(data[iline + 1])
self.n_ts = len(self.tslist)
def __str__(self):
'''
信息输出
'''
return "m3u8 URL: %s\n分配线程数: %d\n总片数: %d\n输出文件名: %s\n是否加密: %s" % (self.url, self.n_thread, self.n_ts, self.filename, "是" if self.cryptor else "否")
def __downloadts__(self, tsname, i):
'''
下载切片到临时目录
tsname: 文件名(url part)
i: 切片序号
'''
with requests.get(self.m3u8dir + "/" + tsname, timeout = 10) as response:
with open(self.tmpfolder + "/" + str(i), "wb") as f:
if self.cryptor:
f.write(self.cryptor.decrypt(response.content))
else:
f.write(response.content)
def __th_task__(self, indexlist):
'''
下载线程执行函数
indexlist: 切片索引列表
'''
for i in indexlist:
if i == self.n_ts:
return
while True:
try:
self.__downloadts__(self.tslist[i], i)
break
except:
continue
with self.m:
self.downloaded += 1
print("\r分片完成数: %d / %d [%d%%]" % (self.downloaded, self.n_ts, self.downloaded * 100 / self.n_ts), end = "")
def __merge__(self):
'''
合并切片到目标文件
'''
with open(self.filename, "wb") as fout:
for i in range(self.n_ts):
with open(self.tmpfolder + "/" + str(i), "rb") as ftemp:
fout.write(ftemp.read())
def run(self):
'''
执行下载
'''
if self.n_ts == 0:
print("m3u8异常, 请重新获取m3u8地址")
else:
if not os.path.exists(self.tmpfolder):
os.mkdir(self.tmpfolder)
# 每个线程下载文件最大数目
n_perthread = self.n_ts // self.n_thread
if self.n_ts % self.n_thread != 0:
n_perthread += 1
# 多线程同时下载
threads = ThreadPoolExecutor(max_workers = self.n_thread)
futures = [threads.submit(self.__th_task__, list(range(index, index + n_perthread))) for index in list(range(0, self.n_ts, n_perthread))]
wait(futures, return_when = ALL_COMPLETED)
self.__merge__()
print("\n下载完成\n合并文件...", end = "", flush = True)
print("OK")
m = m3u8_downloader("http(s)://sample.m3u8", outfile="sample.ts", n_thread=30)
print(m)
m.run()
@wkz2003
Copy link
Copy Markdown

wkz2003 commented Mar 30, 2020

属实嗯

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment