要问我是哪个网站,嘿嘿,你猜。里面有几个变量名是随手写的,凑合看吧,本程序参考了
Python实现m3u8下载mp4视频原理及源码_呆呆的机器人儿~的博客-CSDN博客_python下载m3u8
自动获取m3u8连接内的key进行解密,若有错误望大家评论指出
# m3u8视频下载
import os
import re
import time
import shutil
import requests
import json
import base64
from concurrent.futures import ThreadPoolExecutor, wait
from Crypto.Cipher import AES
# UA伪装
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
#"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cookie": "" #此处需要填写
}
def download_mp4(mp4_file_path, ts_url_list, ts_url_title):
'''下载ts文件并写入mp4文件
:param mp4_file_path: mp4文件名
:param ts_url_list: ts请求链接列表
:return:
'''
# 判断文件是否存在,存在则先清空
if os.path.exists(mp4_file_path):
with open(mp4_file_path, 'w') as fp:
fp.write('')
# 创建存放ts的文件夹
if not os.path.exists('ts'):
os.mkdir('ts')
print('开始下载{}...'.format(mp4_file_path))
excutor = ThreadPoolExecutor(max_workers=20) # 线程池
len_list = len(ts_url_list) # ts链接总数
all_tasks = [excutor.submit(lambda args: download_ts(*args), (ts_url_id, len_list, ts_url_list, ts_url_title))
for ts_url_id in range(len_list)] # 创建任务
wait(all_tasks) # 等待所有任务执行完成
# 检测ts数目是否正确
if len(os.listdir('ts')) == len_list:
pass
else:
print('ts文件部分缺失...')
# 删除存放ts的临时文件
#shutil.rmtree('ts')
return ''
# ts合并为mp4文件
print('ts文件下载完成,正在合并ts文件...')
for ts_url_id in range(len_list):
ts_file_name = 'ts/{}.ts'.format(ts_url_id)
with open(ts_file_name, 'rb') as fp:
ts_content = fp.read() # 读取ts数据
with open(mp4_file_path, 'ab') as fp:
fp.write(ts_content) # 将ts数据追加写入文件
print('ts文件合并成功!')
# 删除存放ts的临时文件
shutil.rmtree('ts')
return 1
def download_ts(ts_url_id, len_list, ts_url_list, ts_url_title):
''' 请求下载ts文件
:param ts_url_id: 分区ts的id
:param len_list: ts个数
:param ts_url_list: 存放ts的列表
:param ts_url_title: ts链接拼接的头部
:return:
'''
print('{}/{}开始下载'.format(ts_url_id, len_list - 1))
# 请求不成功补发请求,最大补发次数为
max_request = 5 # 最大补发请求次数
for i in range(max_request):
try:
response = requests.get(url=ts_url_title + ts_url_list[ts_url_id],
headers=header, timeout=(5, 20)) # 请求获取ts数据
if response.status_code == 200:
ts_content = response.content
break
except:
if i == max_request - 1:
print('{}/{}下载失败'.format(ts_url_id, len_list - 1))
return ''
else:
print('{}/{}下载失败,正在补发请求...'.format(ts_url_id, len_list - 1))
ts_file_name = 'ts/{}.ts'.format(ts_url_id)
with open(ts_file_name, 'wb') as fp:
fp.write(ts_content) # 将ts数据写入文件
print('{}/{}下载完成'.format(ts_url_id, len_list - 1))
def deciphering(key, fileName):
'''对aes加密视频进行解密
:param key: aes解密密钥
:param fileName: 需要解密的文件
:return:
'''
# 读取原文件
try:
with open(fileName, 'rb') as fp:
part = fp.read()
# aes解密需要的偏移量,我也不知道为啥这样,反正可以正常使用
iv = b'0000000000000000'
# 解密数据
plain_data = AES.new(key, AES.MODE_CBC,iv).decrypt(part).rstrip(b'\0')
# 将解密数据写入文件
with open(fileName, 'wb') as fp:
fp.write(plain_data)
print('视频解密完成!')
except Exception as e:
print(e)
def timer(start_time, end_time, mp4_file_name):
'''计时器
:param start_time: 开始时间
:param end_time: 结束时间
:return:
'''
spend_second = end_time - start_time
hour = str(int(spend_second / (60 * 60)))
minute = str(int(spend_second / 60))
second = str(int(spend_second % 60))
spend_time = '{}h{}m{}s'.format(hour, minute, second)
print('{}下载完成!用时:{}'.format(mp4_file_name, spend_time))
def start(m3u8_url, mp4_file_name, ts_url_title,kds_token):
'''开始
:param m3u8_url: m3u8链接
:param mp4_file_path: 下载后的视频名称
:return:
'''
# 开始计时
start_time = time.time()
# 创建目录文件
if not os.path.exists('mv'):
os.mkdir('mv')
# 视频保存路径
mp4_file_path = 'mv/' + mp4_file_name+'.mp4'
# 获取m3u8内容
m3u8_file = requests.get(url=m3u8_url, headers=header).text
print('m3u8file='+m3u8_file)
# 整理ts列表
ts_url_list = re.findall(',\n(.*?)\n#', m3u8_file)
print(ts_url_list)
# 下载ts,并拼接为mp4文件
mp4 = download_mp4(mp4_file_path, ts_url_list, ts_url_title)
# 判断是否存在加密
if re.search('#EXT-X-KEY', m3u8_file):
print('{}视频存在加密,正在对其进行解密,请稍后...'.format(mp4_file_path))
# 获取key
ext_x_key=re.findall('#EXT-X-KEY:(.*)\n',m3u8_file)
key_base_url=re.findall('URI="(.*)"',ext_x_key[0])
key_url = key_base_url[0]+'&token='+kds_token
#re.search('#EXT-X-KEY:(.*URI="(.*)")\n', m3u8_file)[2]+'HMAC-SHA1' # 获取key的url
key_req= requests.get(url=key_url, headers=header).text # 请求获取key
print('key_url='+key_url)
print(key_req)
key_res=json.loads(key_req)
print(key_res["data"])
# 解密视频
deciphering(base64.b64decode(key_res["data"]), mp4_file_path)
# 计时结束
end_time = time.time()
# 耗时统计
timer(start_time, end_time, mp4_file_name)
if __name__ == '__main__':
# m3u8 链接
m3u8_url ='https://v3-default.ixigua.com/**/main.m3u8?***'
# ts链接头
ts_url_title = ''
# mp4 名称,不含.MP4
mp4_file_name = '123'
kds_token=input('请输入kds_token:')
while 1==1:
m3u8_url =input('请输入m3u8链接(输入0退出):')
if m3u8_url =='0':
break
ts_url_title=m3u8_url.split('main.m3u8')[0]
mp4_file_name = input('请输入视频名称:')
# 执行下载
start(m3u8_url, mp4_file_name, ts_url_title,kds_token)