当前位置: 首页 > 工具软件 > m3w > 使用案例 >

m3u8加密视频使用python下载

黄扬
2023-12-01

要问我是哪个网站,嘿嘿,你猜。里面有几个变量名是随手写的,凑合看吧,本程序参考了

Python实现m3u8下载mp4视频原理及源码_呆呆的机器人儿~的博客-CSDN博客_python下载m3u8

自动获取m3u8连接内的key进行解密,若有错误望大家评论指出

# m3u8视频下载

import os

import re

import time

import shutil

import requests

import json

import base64

from concurrent.futures import ThreadPoolExecutor, wait

from Crypto.Cipher import AES


 

# UA伪装

header = {

        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0",

        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",

        #"Accept-Encoding": "gzip, deflate, br",

        "Accept-Language": "zh-CN,zh;q=0.9",

        "Cookie": "" #此处需要填写

        }


 

def download_mp4(mp4_file_path, ts_url_list, ts_url_title):

    '''下载ts文件并写入mp4文件

    :param mp4_file_path: mp4文件名

    :param ts_url_list: ts请求链接列表

    :return:

    '''

    # 判断文件是否存在,存在则先清空

    if os.path.exists(mp4_file_path):

        with open(mp4_file_path, 'w') as fp:

            fp.write('')

    # 创建存放ts的文件夹

    if not os.path.exists('ts'):

        os.mkdir('ts')

    print('开始下载{}...'.format(mp4_file_path))

    excutor = ThreadPoolExecutor(max_workers=20)  # 线程池

    len_list = len(ts_url_list)  # ts链接总数

    all_tasks = [excutor.submit(lambda args: download_ts(*args), (ts_url_id, len_list, ts_url_list, ts_url_title))

                 for ts_url_id in range(len_list)]  # 创建任务

    wait(all_tasks)  # 等待所有任务执行完成

    # 检测ts数目是否正确

    if len(os.listdir('ts')) == len_list:

        pass

    else:

        print('ts文件部分缺失...')

        # 删除存放ts的临时文件

        #shutil.rmtree('ts')

        return ''

    # ts合并为mp4文件

    print('ts文件下载完成,正在合并ts文件...')

    for ts_url_id in range(len_list):

        ts_file_name = 'ts/{}.ts'.format(ts_url_id)

        with open(ts_file_name, 'rb') as fp:

            ts_content = fp.read()  # 读取ts数据

        with open(mp4_file_path, 'ab') as fp:

            fp.write(ts_content)  # 将ts数据追加写入文件

    print('ts文件合并成功!')

    # 删除存放ts的临时文件

    shutil.rmtree('ts')

    return 1


 

def download_ts(ts_url_id, len_list, ts_url_list, ts_url_title):

    ''' 请求下载ts文件

    :param ts_url_id: 分区ts的id

    :param len_list: ts个数

    :param ts_url_list: 存放ts的列表

    :param ts_url_title: ts链接拼接的头部

    :return:

    '''

    print('{}/{}开始下载'.format(ts_url_id, len_list - 1))

    # 请求不成功补发请求,最大补发次数为

    max_request = 5  # 最大补发请求次数

    for i in range(max_request):

        try:

            response = requests.get(url=ts_url_title + ts_url_list[ts_url_id],

                                    headers=header, timeout=(5, 20))  # 请求获取ts数据

            if response.status_code == 200:

                ts_content = response.content

                break

        except:

            if i == max_request - 1:

                print('{}/{}下载失败'.format(ts_url_id, len_list - 1))

                return ''

            else:

                print('{}/{}下载失败,正在补发请求...'.format(ts_url_id, len_list - 1))

    ts_file_name = 'ts/{}.ts'.format(ts_url_id)

    with open(ts_file_name, 'wb') as fp:

        fp.write(ts_content)  # 将ts数据写入文件

    print('{}/{}下载完成'.format(ts_url_id, len_list - 1))


 

def deciphering(key, fileName):

    '''对aes加密视频进行解密

    :param key: aes解密密钥

    :param fileName: 需要解密的文件

    :return:

    '''

    # 读取原文件

    try:

        with open(fileName, 'rb') as fp:

            part = fp.read()

        # aes解密需要的偏移量,我也不知道为啥这样,反正可以正常使用

        iv = b'0000000000000000'

        # 解密数据

        plain_data = AES.new(key, AES.MODE_CBC,iv).decrypt(part).rstrip(b'\0')

        # 将解密数据写入文件

        with open(fileName, 'wb') as fp:

            fp.write(plain_data)

        print('视频解密完成!')

    except Exception as e:

        print(e)


 

def timer(start_time, end_time, mp4_file_name):

    '''计时器

    :param start_time: 开始时间

    :param end_time: 结束时间

    :return:

    '''

    spend_second = end_time - start_time

    hour = str(int(spend_second / (60 * 60)))

    minute = str(int(spend_second / 60))

    second = str(int(spend_second % 60))

    spend_time = '{}h{}m{}s'.format(hour, minute, second)

    print('{}下载完成!用时:{}'.format(mp4_file_name, spend_time))


 

def start(m3u8_url, mp4_file_name, ts_url_title,kds_token):

    '''开始

    :param m3u8_url: m3u8链接

    :param mp4_file_path: 下载后的视频名称

    :return:

    '''

    # 开始计时

    start_time = time.time()

    # 创建目录文件

    if not os.path.exists('mv'):

        os.mkdir('mv')

    # 视频保存路径

    mp4_file_path = 'mv/' + mp4_file_name+'.mp4'

    # 获取m3u8内容

    m3u8_file =  requests.get(url=m3u8_url, headers=header).text

    print('m3u8file='+m3u8_file)

    # 整理ts列表

    ts_url_list = re.findall(',\n(.*?)\n#', m3u8_file)

    print(ts_url_list)

    # 下载ts,并拼接为mp4文件

    mp4 = download_mp4(mp4_file_path, ts_url_list, ts_url_title)

    # 判断是否存在加密

    if  re.search('#EXT-X-KEY', m3u8_file):

        print('{}视频存在加密,正在对其进行解密,请稍后...'.format(mp4_file_path))

        # 获取key

        ext_x_key=re.findall('#EXT-X-KEY:(.*)\n',m3u8_file)

        key_base_url=re.findall('URI="(.*)"',ext_x_key[0])

        key_url = key_base_url[0]+'&token='+kds_token

        #re.search('#EXT-X-KEY:(.*URI="(.*)")\n', m3u8_file)[2]+'HMAC-SHA1'  # 获取key的url

        key_req= requests.get(url=key_url, headers=header).text  # 请求获取key

        print('key_url='+key_url)

        print(key_req)

        key_res=json.loads(key_req)

        print(key_res["data"])

        # 解密视频

        deciphering(base64.b64decode(key_res["data"]), mp4_file_path)

    # 计时结束

    end_time = time.time()

    # 耗时统计

    timer(start_time, end_time, mp4_file_name)


 

if __name__ == '__main__':

    # m3u8 链接

    m3u8_url ='https://v3-default.ixigua.com/**/main.m3u8?***'

    # ts链接头

    ts_url_title = ''

    # mp4 名称,不含.MP4

    mp4_file_name = '123'

    kds_token=input('请输入kds_token:')

    while 1==1:

        m3u8_url =input('请输入m3u8链接(输入0退出):')

        if m3u8_url =='0':

            break

        ts_url_title=m3u8_url.split('main.m3u8')[0]

        mp4_file_name = input('请输入视频名称:')

        # 执行下载

        start(m3u8_url, mp4_file_name, ts_url_title,kds_token)


 

 类似资料: