参考了这位仁兄的blog, 有加密的m3u8视频下载以及解码方法_baidu_41902768的博客-CSDN博客_m3u8加密视频怎么解密
对其进行了进一步包装,支持直接输入网页页面的url直接下载视频,仅匹配到页面中第一个m3u8
分享给大家
此python脚本基于linux环境编写,如需在windows上使用,需开启WSL,然后在windows软件商店安装ubuntu后使用
废话不多说,直接上代码
#!/usr/bin/python
import requests
import os
import re
from Crypto.Cipher import AES
import time
import tempfile
import sys
import optparse
class DownloadError(RuntimeError):
def __init__(self, arg):
self.args = arg
class UrlError(RuntimeError):
def __init__(self, arg):
self.args = arg
def progressbar(tot, pre, content = None):
'''
max_bar means the total number of tasks.
i means the number of finished tasks.
'''
con_str = ""
if content != None:
con_str = " " + content
max_bar = 30
finish = int(pre*max_bar/tot)
unfinish = (max_bar - finish)
bar = "[{}{}]".format(finish * "=", unfinish * " ")
counts = " (%d/%d) " % (pre, tot)
percent = str(int(pre * 100/tot)) + "%"
if pre < tot:
sys.stdout.write(bar + counts + percent + con_str + "\r")
else:
sys.stdout.write(bar + counts + percent + con_str + "\n")
sys.stdout.flush()
def del_path(path):
if os.path.isdir(path):
del_list = os.listdir(path)
for f in del_list:
file_path = os.path.join(path, f)
del_path(file_path)
os.rmdir(path)
else:
os.remove(path)
def cal_time(fun):
def inner_wrapper(*args):
start = time.time()
fun(*args)
end = time.time()
print('Time spent is ' + str(round(end - start,1)))
return inner_wrapper
def request_net_content(url):
headers = {'user-agent': 'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50'}
request_res = None
retry_times = 0
while retry_times < 10:
try:
request_res = requests.get(url, timeout=(5, 15), headers = headers)
break
except requests.exceptions.RequestException:
retry_times += 1
print("\nrequest timeout for: '%s' and retry again (%d/10)" % (url, retry_times))
if retry_times >= 10:
raise DownloadError("download retry 10 times, and always no response!")
return request_res
def get_real_url(last_url, url):
matchUrl = re.match(r'(http[s]?://.*)', url)
if matchUrl is not None:
real_url = matchUrl.group(1)
else:
if url[0] == '/':
base_url_match = re.match(r'(http[s]?://[^/]*)/.*', last_url)
if base_url_match is None:
raise UrlError("last_url '%s' is not valid !!!" % (last_url,))
base_url = base_url_match.group(1)
real_url = base_url + url
else:
path_end_name = os.path.basename(last_url)
parent_url = last_url[:-len(path_end_name)]
real_url = parent_url + url
return real_url
def write_split_to_out_fd(split_content, out_fd, cryptor):
if len(split_content) > 0:
if cryptor is not None:
split_content = cryptor.decrypt(split_content)
out_fd.write(split_content)
else:
raise ValueError("no value")
def download_split(index, count, name, path, out_fd, url, cryptor):
if os.path.exists(path):
split_fd = None
proc_ok = True
try:
progressbar(count, index, "proc '" + name + "'")
split_fd = open(path, 'rb')
split_content = split_fd.read()
write_split_to_out_fd(split_content, out_fd, cryptor)
except ValueError:
proc_ok = False
print("\n'%s' zero content, redownload from '%s'" % (name, url,))
if split_fd is not None:
split_fd.close()
if proc_ok:
return
download_count = 0
while True:
download_count += 1
if download_count >= 6:
print("\ndownload '%s' from '%s' always failed, ignore" % (name, url,))
break
try:
progressbar(count, index, "download '" + name + "'")
split_content = request_net_content(url).content
with open(path, 'wb+') as split_fd:
split_fd.write(split_content)
write_split_to_out_fd(split_content, out_fd, cryptor)
break
except ValueError:
print("\n'%s' zero content, %dth redownload" % (name, download_count,))
except DownloadError:
print("\n'%s' download error, %dth redownload" % (name, download_count,))
def download_m3u8(m3u8_url, temp_dir, out_fd):
m3u8_res = request_net_content(m3u8_url)
m3u8_text = m3u8_res.text
print("download m3u8: '%s'" % (m3u8_url,))
lines = m3u8_text.splitlines()
if lines[0] != "#EXTM3U" :
print("not a valid m3u8 file, first line is '%s'\n" % (lines[0],))
return
m3u8_split_cryptor = None
item_count = 0
for line in lines:
matchInfo = re.match(r'#EXT.*', line)
info_key = None
if matchInfo is None:
item_count += 1
else:
matchInfo = re.match(r'#(EXT[^:]*):(.*)', line)
if matchInfo is not None:
info_key = matchInfo.group(1)
info_val = matchInfo.group(2)
#print("info_key is '%s'" % (info_key,))
#print("info_val is '%s'" % (info_val,))
if info_key == "EXT-X-KEY":
key_url_search = re.search(r'URI="(.*)"', info_val)
if key_url_search is None:
print("error! can not get url for key")
exit(1)
key_url = get_real_url(m3u8_url, key_url_search.group(1))
#print("key_url is '%s'" % (key_url,))
key_url_res = request_net_content(key_url).text
print('key text is ' + key_url_res)
m3u8_split_cryptor = AES.new(key_url_res, AES.MODE_CBC, b'0000000000000000')
item_index = 0
for line in lines:
matchInfo = re.match(r'#EXT.*', line)
itemUrl = None
info_key = None
if matchInfo is None:
itemUrl = get_real_url(m3u8_url, line)
if itemUrl is not None:
# proc url, m3u8 or video split
dl_url_match = re.match(r'(http[s]?://[^\?]*)[\?]?.*', itemUrl)
url_no_para = itemUrl
if dl_url_match is not None:
url_no_para = dl_url_match.group(1)
if url_no_para.endswith("m3u8"):
print("redirect to: '%s'" % (itemUrl,))
download_m3u8(itemUrl, temp_dir, out_fd)
else:
item_index += 1
split_name = os.path.basename(url_no_para)
split_path = os.path.join(temp_dir, split_name)
# download ts file
download_split(item_index, item_count, split_name, split_path, out_fd,
itemUrl, m3u8_split_cryptor)
@cal_time
def video_downlowner(options):
url = options.url
out_file = options.out_file
if options.temp is None:
temp_dir = tempfile.mkdtemp(suffix='', prefix='m3u8_', dir='/tmp')
print("temp file save to dir: " + temp_dir)
else:
temp_dir=options.temp
out_name = os.path.basename(out_file)
m3u8_url = None
if options.m3u8 is not None:
m3u8_url = url
else:
result = request_net_content(url)
html_txt = re.sub("\\\/", "/", result.text)
searchObj = re.search(r'"url":"(http[s]?://.*?\.m3u8)"', html_txt)
if searchObj is not None:
m3u8_url = searchObj.group(1)
else:
print("no m3u8 in given url")
if m3u8_url is not None:
out_fd = open(out_file, "wb+")
download_m3u8(m3u8_url, temp_dir, out_fd)
out_fd.close()
if options.temp is None:
del_path(temp_dir)
def main():
usage = "usage: %prog <-u url> <-o out_file> [-t/--temp]"
usage += " %prog <-m m3u8> <-o out_file> [-t/--temp]"
parse = optparse.OptionParser(usage)
parse.add_option('-u', '--url', dest='url', action='store', \
type='str', help='the url of webpage play this video')
parse.add_option('-o', '--output-file', dest='out_file', action='store', \
type='str', help='name of output video file')
parse.add_option('-t', '--temp', dest='temp', action='store', \
type='str', help='give the temp dir which save split video')
parse.add_option('-m', '--m3u8', dest='m3u8', action='store_true', \
help='if <url> is m3u8, add this option')
options,args=parse.parse_args()
if options.url is None and options.m3u8 is None:
print("please give url or m3u8 to download\n")
exit(1)
if options.out_file is None:
print("please give out file\n")
exit(1)
video_downlowner(options)
if __name__ == '__main__':
main()