当前位置: 首页 > 工具软件 > M3O > 使用案例 >

m3u8视频下载工具

冯招
2023-12-01

参考了这位仁兄的blog, 有加密的m3u8视频下载以及解码方法_baidu_41902768的博客-CSDN博客_m3u8加密视频怎么解密

对其进行了进一步包装,支持直接输入网页页面的url直接下载视频,仅匹配到页面中第一个m3u8

分享给大家

此python脚本基于linux环境编写,如需在windows上使用,需开启WSL,然后在windows软件商店安装ubuntu后使用

废话不多说,直接上代码

#!/usr/bin/python

import requests
import os
import re
from Crypto.Cipher import AES
import time
import tempfile
import sys
import optparse

class DownloadError(RuntimeError):
    def __init__(self, arg):
        self.args = arg

class UrlError(RuntimeError):
    def __init__(self, arg):
        self.args = arg

def progressbar(tot, pre, content = None):
  '''
  max_bar means the total number of tasks.
  i means the number of finished tasks.
  '''
  con_str = ""
  if  content != None:
    con_str = " " + content
  max_bar = 30
  finish = int(pre*max_bar/tot)
  unfinish = (max_bar - finish)
  bar = "[{}{}]".format(finish * "=", unfinish * " ")
  counts = " (%d/%d) " % (pre, tot)
  percent = str(int(pre * 100/tot)) + "%"
  if pre < tot:
    sys.stdout.write(bar + counts + percent + con_str + "\r")
  else:
    sys.stdout.write(bar + counts + percent + con_str + "\n")
  sys.stdout.flush()

def del_path(path):
  if os.path.isdir(path):
    del_list = os.listdir(path)
    for f in del_list:
      file_path = os.path.join(path, f)
      del_path(file_path)
    os.rmdir(path)
  else:
    os.remove(path)

def cal_time(fun):
  def inner_wrapper(*args):
    start = time.time()
    fun(*args)
    end = time.time()
    print('Time spent is ' + str(round(end - start,1)))
  return inner_wrapper

def request_net_content(url):
  headers = {'user-agent': 'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50'}

  request_res = None
  retry_times = 0
  while retry_times < 10:
    try:
      request_res = requests.get(url, timeout=(5, 15), headers = headers)
      break
    except requests.exceptions.RequestException:
      retry_times += 1
      print("\nrequest timeout for: '%s' and retry again (%d/10)" % (url, retry_times))

  if retry_times >= 10:
    raise DownloadError("download retry 10 times, and always no response!")

  return request_res

def get_real_url(last_url, url):
  matchUrl = re.match(r'(http[s]?://.*)', url)
  if matchUrl is not None:
    real_url = matchUrl.group(1)
  else:
    if url[0] == '/':
      base_url_match = re.match(r'(http[s]?://[^/]*)/.*', last_url)
      if base_url_match is None:
        raise UrlError("last_url '%s' is not valid !!!" % (last_url,))
      base_url = base_url_match.group(1)

      real_url = base_url + url
    else:
      path_end_name = os.path.basename(last_url)
      parent_url = last_url[:-len(path_end_name)]

      real_url = parent_url + url
  return real_url

def write_split_to_out_fd(split_content, out_fd, cryptor):
  if len(split_content) > 0:
    if cryptor is not None:
      split_content = cryptor.decrypt(split_content)
    out_fd.write(split_content)
  else:
    raise ValueError("no value")

def download_split(index, count, name, path, out_fd, url, cryptor):
  if os.path.exists(path):
    split_fd = None
    proc_ok = True
    try:
      progressbar(count, index, "proc '" + name + "'")
      split_fd = open(path, 'rb')
      split_content = split_fd.read()
      write_split_to_out_fd(split_content, out_fd, cryptor)
    except ValueError:
      proc_ok = False
      print("\n'%s' zero content, redownload from '%s'" % (name, url,))

    if split_fd is not None:
      split_fd.close()

    if proc_ok:
      return

  download_count = 0
  while True:
    download_count += 1
    if download_count >= 6:
      print("\ndownload '%s' from '%s' always failed, ignore" % (name, url,))
      break
    try:
      progressbar(count, index, "download '" + name + "'")
      split_content = request_net_content(url).content
      with open(path, 'wb+') as split_fd:
        split_fd.write(split_content)
        write_split_to_out_fd(split_content, out_fd, cryptor)
      break
    except ValueError:
      print("\n'%s' zero content, %dth redownload" % (name, download_count,))
    except DownloadError:
      print("\n'%s' download error, %dth redownload" % (name, download_count,))

def download_m3u8(m3u8_url, temp_dir, out_fd):
  m3u8_res = request_net_content(m3u8_url)
  m3u8_text = m3u8_res.text

  print("download m3u8: '%s'" % (m3u8_url,))

  lines = m3u8_text.splitlines()
  if lines[0] != "#EXTM3U" :
    print("not a valid m3u8 file, first line is '%s'\n" % (lines[0],))
    return

  m3u8_split_cryptor = None

  item_count = 0
  for line in lines:
    matchInfo = re.match(r'#EXT.*', line)
    info_key = None
    if matchInfo is None:
      item_count += 1
    else:
      matchInfo = re.match(r'#(EXT[^:]*):(.*)', line)
      if matchInfo is not None:
        info_key = matchInfo.group(1)
        info_val = matchInfo.group(2)
        #print("info_key is '%s'" % (info_key,))
        #print("info_val is '%s'" % (info_val,))

      if info_key == "EXT-X-KEY":
        key_url_search = re.search(r'URI="(.*)"', info_val)
        if key_url_search is None:
          print("error! can not get url for key")
          exit(1)
        key_url = get_real_url(m3u8_url, key_url_search.group(1))
        #print("key_url is '%s'" % (key_url,))
        key_url_res = request_net_content(key_url).text
        print('key text is ' + key_url_res)
        m3u8_split_cryptor = AES.new(key_url_res, AES.MODE_CBC, b'0000000000000000')

  item_index = 0
  for line in lines:
    matchInfo = re.match(r'#EXT.*', line)
    itemUrl = None
    info_key = None
    if matchInfo is None:
      itemUrl = get_real_url(m3u8_url, line)

    if itemUrl is not None:
      # proc url, m3u8 or video split
      dl_url_match = re.match(r'(http[s]?://[^\?]*)[\?]?.*', itemUrl)
      url_no_para = itemUrl
      if dl_url_match is not None:
        url_no_para = dl_url_match.group(1)
      if url_no_para.endswith("m3u8"):
        print("redirect to: '%s'" % (itemUrl,))
        download_m3u8(itemUrl, temp_dir, out_fd)
      else:
        item_index += 1
        split_name = os.path.basename(url_no_para)
        split_path = os.path.join(temp_dir, split_name)
        # download ts file
        download_split(item_index, item_count, split_name, split_path, out_fd,
          itemUrl, m3u8_split_cryptor)

@cal_time
def video_downlowner(options):

  url = options.url
  out_file = options.out_file
  if options.temp is None:
    temp_dir = tempfile.mkdtemp(suffix='', prefix='m3u8_', dir='/tmp')
    print("temp file save to dir: " + temp_dir)
  else:
    temp_dir=options.temp

  out_name = os.path.basename(out_file)

  m3u8_url = None
  if options.m3u8 is not None:
    m3u8_url = url
  else:
    result = request_net_content(url)

    html_txt = re.sub("\\\/", "/", result.text)
    searchObj = re.search(r'"url":"(http[s]?://.*?\.m3u8)"', html_txt)
    if searchObj is not None:
      m3u8_url = searchObj.group(1)
    else:
      print("no m3u8 in given url")

  if m3u8_url is not None:
    out_fd = open(out_file, "wb+")
    download_m3u8(m3u8_url, temp_dir, out_fd)
    out_fd.close()

  if options.temp is None:
    del_path(temp_dir)

def main():
  usage =  "usage: %prog <-u url> <-o out_file> [-t/--temp]"
  usage += "       %prog <-m m3u8> <-o out_file> [-t/--temp]"
  parse = optparse.OptionParser(usage)
  parse.add_option('-u', '--url', dest='url', action='store', \
    type='str', help='the url of webpage play this video')
  parse.add_option('-o', '--output-file', dest='out_file', action='store', \
    type='str', help='name of output video file')
  parse.add_option('-t', '--temp', dest='temp', action='store', \
    type='str', help='give the temp dir which save split video')
  parse.add_option('-m', '--m3u8', dest='m3u8', action='store_true', \
    help='if <url> is m3u8, add this option')

  options,args=parse.parse_args()

  if options.url is None and options.m3u8 is None:
    print("please give url or m3u8 to download\n")
    exit(1)
  if options.out_file is None:
    print("please give out file\n")
    exit(1)

  video_downlowner(options)


if __name__ == '__main__':
  main()

 类似资料: