当前位置: 首页 > 面试题库 >

如何并行化文件下载?

宋弘壮
2023-03-14
问题内容

我可以一次下载一个文件:

import urllib.request

urls = ['foo.com/bar.gz', 'foobar.com/barfoo.gz', 'bar.com/foo.gz']

for u in urls:
  urllib.request.urlretrieve(u)

我可以这样尝试subprocess

import subprocess
import os

def parallelized_commandline(command, files, max_processes=2):
    processes = set()
    for name in files:
        processes.add(subprocess.Popen([command, name]))
        if len(processes) >= max_processes:
            os.wait()
            processes.difference_update(
                [p for p in processes if p.poll() is not None])

    #Check if all the child processes were closed
    for p in processes:
        if p.poll() is None:
            p.wait()

urls = ['http://www.statmt.org/wmt15/training-monolingual-nc-v10/news-commentary-v10.en.gz',
'http://www.statmt.org/wmt15/training-monolingual-nc-v10/news-commentary-v10.cs.gz', 
'http://www.statmt.org/wmt15/training-monolingual-nc-v10/news-commentary-v10.de.gz']

parallelized_commandline('wget', urls)

urlretrieve没有不使用os.systemsubprocess作弊的并行化方法?

鉴于我现在必须诉诸“作弊”,是否是subprocess.Popen下载数据的正确方法?

使用parallelized_commandline()上述方法时,它使用的是多线程而不是多核的wget,是否正常?有没有办法使它成为多核而不是多线程?


问题答案:

您可以使用线程池并行下载文件:

#!/usr/bin/env python3
from multiprocessing.dummy import Pool # use threads for I/O bound tasks
from urllib.request import urlretrieve

urls = [...]
result = Pool(4).map(urlretrieve, urls) # download 4 files at a time

您还可以使用asyncio以下命令在一个线程中一次下载多个文件:

#!/usr/bin/env python3
import asyncio
import logging
from contextlib import closing
import aiohttp # $ pip install aiohttp

@asyncio.coroutine
def download(url, session, semaphore, chunk_size=1<<15):
    with (yield from semaphore): # limit number of concurrent downloads
        filename = url2filename(url)
        logging.info('downloading %s', filename)
        response = yield from session.get(url)
        with closing(response), open(filename, 'wb') as file:
            while True: # save file
                chunk = yield from response.content.read(chunk_size)
                if not chunk:
                    break
                file.write(chunk)
        logging.info('done %s', filename)
    return filename, (response.status, tuple(response.headers.items()))

urls = [...]
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
with closing(asyncio.get_event_loop()) as loop, \
     closing(aiohttp.ClientSession()) as session:
    semaphore = asyncio.Semaphore(4)
    download_tasks = (download(url, session, semaphore) for url in urls)
    result = loop.run_until_complete(asyncio.gather(*download_tasks))

这里url2filename()定义在哪里。



 类似资料:
  • null 问题1:Spark如何并行处理? 我想大部分的执行时间(99%?)上面的解决方案是从USB驱动器中读取1TB文件到Spark集群中。从USB驱动器读取文件是不可并行的。但是在读取整个文件之后,Spark在底层做了什么来并行处理呢? > 有多少节点用于创建DataFrame?(也许只有一个?) 假设Snappy压缩的Parquet文件小10倍,大小=100GB,HDFS块大小=128 MB

  • 我将一个jenkinsfile放到项目的根目录中,并希望为我的管道引入一个groovy文件并执行它。我能够实现这一点的唯一方法是创建一个单独的项目并使用文件加载器。fromGit命令。我想做

  • 问题内容: 我在src / test / resources / feature /中有以下功能文件(单独的功能文件),我想并行运行它们。就像:一个功能文件必须在chrome中执行,而另一个功能文件必须在firefox中执行,如@Tags名称所述。 有人可以帮助我实现这一点吗?我使用的是cumul-java 1.2.2版本,并且使用AbstractTestNGCucumberTests作为运行程序

  • 问题内容: 是否可以使用goroutine并行下载和保存文件? 以下是我的代码,可从我的保管箱下载文件: 当我在命令前给download_file函数添加前缀时,它不起作用。 问题答案: 那是因为您的主要goroutine正在退出。您需要添加一个WaitGroup以等待所有goroutines退出。例如,

  • 我有一堆功能文件(大约15个),其中每个功能文件都有一个线程,需要睡眠至少3分钟(对于一些复杂的后台应用程序,需要执行一些功能)。我需要一种并行执行它们的方法。 我有一个简单的CucumberRunnerTestCLass。 非常感谢您的帮助。谢谢