# 合并视频段
def _merge_segment_content(self):
while True:
if (self._data['write_count'] is not None) and (self._data['write_count'] + 1 >= self._data['max_count']):
break
if len(self._data['get']) <= 1:
time.sleep(2)
continue
# 必须在遍历前获取锁,防止在遍历的时候数据被修改了
with self._lock:
for k, v in enumerate(self._data['get']):
for i, j in enumerate(self._data['get']):
# 去除自己
if k == i:
continue
if v['end'] + 1 == j['start']:
get_data = self._data['get']
get_data[k]['content'] += get_data[i]['content']
get_data[k]['end'] = get_data[i]['end']
get_data.pop(i)
#print('\n合并视频-'+str(v['start'])+' << '+str(j['start']))
self._data['get'] = get_data
break
# 双层循环退出
else:
continue
break
time.sleep(1)
写入到文件部分
# 将段视频内容写入
def _write_segment_content(self):
while True:
if (self._data['write_count'] is not None) and (self._data['write_count'] + 1 >= self._data['max_count']):
break
if len(self._data['get']) <= 0:
continue
content = None
with self._lock:
# 每次只写入一个,因为需要修改正在遍历的数据,防止出错
for k, v in enumerate(self._data['get']):
if (self._data['write_count'] is None) or (self._data['write_count'] + 1 == v['start']):
content = v['content']
self._data['write_count'] = v['end']
get_list = self._data['get']
get_list.pop(k)
#print('\n写入视频-' + str(v['start']))
self._data['get'] = get_list
break
# 将写入数据放到锁外面执行,不要占用锁
if content:
with open(os.path.join(self._save_file_dir, self._save_file_name), 'ab') as f:
f.write(content)
time.sleep(0.05)
全部代码
注意: 关于请求头部分需要修改成自己的
import asyncio
import os.path
import re
import sys
import time
from multiprocessing import Lock, Manager, Process
import aiohttp
import requests
from Crypto.Cipher import AES
class M3u8(object):
"""
初始化M3u8对象
参数说明:
------------------------
m3u8_url : str
m3u8的链接
run_count : int
同一时间内最多请求的数量
save_file_dir : str
保存文件的目录
save_file_name : str
保存文件的名称
------------------------
"""
def __init__(self, m3u8_url, run_count, save_file_dir=None, save_file_name=None) -> None:
self._m3u8_url = m3u8_url
self._save_file_dir = save_file_dir
self._save_file_name = save_file_name
self._run_count = run_count
self._m3u8_content = None
self._head_url = None
self._is_encrypted = None
self._encrypted_line = None
self._encrypted_key = None
self._encrypted_iv = None
self._urls = None
if save_file_dir is None:
save_file_dir = './'
if save_file_name is None:
save_file_name = 'mv.mp4'
if not os.path.exists(save_file_dir):
os.mkdir(save_file_dir)
# 注意: 这里会删除下载位置的重名文件
if os.path.exists(os.path.join(save_file_dir, save_file_name)):
os.remove(os.path.join(save_file_dir, save_file_name))
# 获取m3u8内容
def _get_m3u8_content(self):
if self._m3u8_content is None:
# 注意: 需要修改成自己的
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)
}
page = requests.get(self._m3u8_url, headers=headers)
if page is None:
raise RuntimeError("can't get url's content")
self._m3u8_content = page.text
return self._m3u8_content
# 获取链接的头,也就是去除xxx.m3u8后缀
def _get_head_url(self):
if self._head_url is None:
find = re.findall(r'(.*/).*\.m3u8?', self._m3u8_url)
if find:
self._head_url = find[0]
else:
raise RuntimeError("can't get head url")
return self._head_url
# 获取是否加密 注意:这里默认为AES的ECB或CBC加密,如果是其他方式,或者不适用,则需要另行修改
def _get_is_encrypted(self):
if self._is_encrypted is None:
if re.match(r'(#EXT-X-KEY:METHOD.*\n)', self._get_m3u8_content()):
self._is_encrypted = True
else:
self._is_encrypted = False
return self._is_encrypted
# 获取m3u8文件中,关于加密信息对应的那一行
def _get_encrypted_line(self):
if self._encrypted_line is None:
find = re.findall(r'(#EXT-X-KEY:METHOD.*\n)', self._get_m3u8_content())
if find:
self._encrypted_line = find[0]
else:
raise RuntimeError("can't get encrypted line")
return self._encrypted_line
# 获取加密的密匙 未考虑需要填充成16字节128位的倍数
def _get_encrypted_key(self):
if self._encrypted_key is None:
# key密匙 注意:如果不是以y结尾需要修改
find = re.findall(r'URI="?(.*y)"?.*\n', self._get_encrypted_line(), re.IGNORECASE)
head_url = self._get_head_url()
key = None
if find:
key = find[0]
# 如果不是完整链接,补充上head_url
if re.match(head_url, find) is None:
key = head_url + key
else:
# 如果在m3u8内容中没有找到密匙,试试默认普遍的url规则
key = head_url + 'key.key'
req = requests.get(key)
if req:
self._encrypted_key = req.content
else:
raise RuntimeError("can't get encrypted key")
return self._encrypted_key
# 获取加密的偏移值 未考虑填充
def _get_encrypted_iv(self):
if self._encrypted_iv is None:
find = re.findall(r'IV="?(\w*)"?.*\n', self._get_encrypted_line(), re.IGNORECASE)
if find:
self._encrypted_iv = find[0]
else:
raise RuntimeError("can't get encrypted iv")
return self._encrypted_iv
# 按照aes(ecb或cbc)解密
def _get_decrypt_content(self, content, key, iv=None):
aes = None
if iv is None:
aes = AES.new(key, AES.MODE_ECB)
else:
aes = AES.new(key, AES.MODE_CBC, iv)
content = aes.decrypt(content)
return content
# 进行解密处理
def _decrypt_content(self, content):
# 如果加密了就将其解密
if self._get_is_encrypted():
key = self._encrypted_key()
iv = self._encrypted_iv()
content = self._get_decrypt_content(content, key, iv)
return content
# 返回链接列表
def _get_urls(self):
if self._urls is None:
urls = re.findall(r'(h.*\.ts)', self._get_m3u8_content())
if urls:
# 如果ts不是完整的链接,需要补上head_url
if not (re.match(r'^http', urls[0]) or re.match(r'^https', urls[0])):
head_url = self._get_head_url()
urls = list(map(lambda x: head_url + x, urls))
else:
raise RuntimeError("can't find urls")
self._urls = urls
return self._urls
# 显示当前下载进度条
def _process_bar(self, cur, end):
print("\r", end='')
print("download file ({}) {}%:".format((str(cur) + '/' + str(end)), int((cur / end) * 100)),
'▋' * int((cur / end) * 100),
end='')
sys.stdout.flush()
# 异步请求 依赖aiohttp
async def _async_get_segment_content(self, segment_url, semaphore, index):
async with semaphore:
async with aiohttp.ClientSession() as session:
try:
async with session.get(segment_url) as rep:
if rep:
content = await rep.read()
content = self._decrypt_content(content)
with self._lock:
# Manager.dict的深层数据无法直接修改,只能通过中间变量来改动
data = self._data['get']
data.append(
{
'start': index,
'end': index,
'content': content
}
)
self._data['get'] = data
except (Exception, RuntimeError):
# 出错后将空数据插入
with self._lock:
data = self._data['get']
data.append(
{
'start': index,
'end': index,
'content': b''
}
)
self._data['get'] = data
# 打印进度条
self._process_bar(index + 1, self._data['max_count'])
return content
# 获取段视频数据
def _get_all_contents(self):
loop = asyncio.new_event_loop()
semaphore = asyncio.Semaphore(self._run_count)
urls = self._get_urls()
tasks = []
# 很想写成列表生成式 但是报错
for k, v in enumerate(urls):
task = asyncio.ensure_future(self._async_get_segment_content(v, semaphore, k), loop=loop)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
# 合并视频段
def _merge_segment_content(self):
while True:
if (self._data['write_count'] is not None) and (self._data['write_count'] + 1 >= self._data['max_count']):
break
if len(self._data['get']) <= 1:
time.sleep(2)
continue
# 必须在遍历前获取锁,防止在遍历的时候数据被修改了
with self._lock:
for k, v in enumerate(self._data['get']):
for i, j in enumerate(self._data['get']):
# 去除自己
if k == i:
continue
if v['end'] + 1 == j['start']:
get_data = self._data['get']
get_data[k]['content'] += get_data[i]['content']
get_data[k]['end'] = get_data[i]['end']
get_data.pop(i)
print('\n合并视频-'+str(v['start'])+'<<'+str(j['start']))
self._data['get'] = get_data
break
# 双层循环退出
else:
continue
break
time.sleep(1)
# 将段视频内容写入
def _write_segment_content(self):
while True:
if (self._data['write_count'] is not None) and (self._data['write_count'] + 1 >= self._data['max_count']):
break
if len(self._data['get']) <= 0:
continue
content = None
with self._lock:
for k, v in enumerate(self._data['get']):
if (self._data['write_count'] is None) or (self._data['write_count'] + 1 == v['start']):
content = v['content']
self._data['write_count'] = v['end']
get_list = self._data['get']
get_list.pop(k)
print('\n写入视频-' + str(v['start']))
self._data['get'] = get_list
break
# 将写入数据放到锁外面执行,不要占用锁
if content:
with open(os.path.join(self._save_file_dir, self._save_file_name), 'ab') as f:
f.write(content)
time.sleep(0.05)
# 运行
def run(self):
self._lock = Lock()
with Manager() as m:
self._data = m.dict({
# 视频段数据列表
'get': [],
# 视频段数量
'max_count': len(self._get_urls()),
# 当前写入磁盘数量
'write_count': None,
})
# 很想使用进程池,但是无法运行
# pool = Pool(3)
# pool.apply_async(self._get_all_contents, args=(self,))
# pool.apply_async(self._merge_segment_content, args=(self,))
# pool.apply_async(self._write_segment_content, args=(self,))
# pool.close()
# pool.join()
tasks = (self._get_all_contents, self._merge_segment_content, self._write_segment_content)
processes = [Process(target=v) for v in tasks]
for p in processes:
p.start()
for p in processes:
p.join()
if __name__ == '__main__':
url = r'xxx/index.m3u8'
m3u8 = M3u8(url, 3, './mv', 'mv.mp4')
m3u8.run()
# 合并后删除ts文件
def _del_all_ts_files(self, del_files_dir):
files = [x for x in os.listdir(del_files_dir) if os.path.isfile(del_files_dir + '/' + x) and os.path.splitext(del_files_dir + '/' + x)[1] == '.ts']
for file in files:
os.remove(del_files_dir + '/' + file)
zipfile
# zipfile 合并文件 不使用压缩
def _zipfile_merge(self, save_file_dir, save_file_name, is_del=True):
# 可能需要处理下合并顺序
files = os.listdir(save_file_dir)
with zipfile.ZipFile(os.path.join(save_file_dir, save_file_name), 'a') as z:
for file in files:
z.write(save_file_dir + '/' + file)
if is_del is True:
self._del_all_ts_files(save_file_dir)