当前位置: 首页 > 知识库问答 >
问题:

Python-动态收缩线程池/停止线程

杜焕
2023-03-14

我正在写一个小的多线程超文本传输协议文件下载程序,并希望能够缩小可用的线程,因为代码遇到错误

这些错误将特定于在web服务器不允许任何更多连接的情况下返回的http错误

eg.如果我设置了一个由5个线程组成的池,每个线程都试图打开自己的连接并下载文件块。服务器可能只允许2个连接,我相信会返回503个错误,我想检测到这一点并关闭一个线程,最终限制池的大小,大概只有服务器允许的2个

我能让线自动停止吗?

是自我。线程停止()是否足够?

我还需要加入()吗?

这是我的worker类,它进行下载,从队列中抓取数据进行处理,下载后将结果转储到resultQ中,由主线程保存到文件中

在这里,我想检测http 503并从可用池中停止/杀死/删除一个线程——当然,还要将失败的块重新添加回队列,以便其余线程将处理它

class Downloader(threading.Thread):
    def __init__(self, queue, resultQ, file_name):
        threading.Thread.__init__(self)
        self.workQ = queue
        self.resultQ = resultQ
        self.file_name = file_name

    def run(self):
        while True:
            block_num, url, start, length = self.workQ.get()
            print 'Starting Queue #: %s' % block_num
            print start
            print length

            #Download the file
            self.download_file(url, start, length)

            #Tell queue that this task is done
            print 'Queue #: %s finished' % block_num
            self.workQ.task_done()


    def download_file(self, url, start, length):        

        request = urllib2.Request(url, None, headers)
        if length == 0:
            return None
        request.add_header('Range', 'bytes=%d-%d' % (start, start + length))

        while 1:
            try:
                data = urllib2.urlopen(request)
            except urllib2.URLError, u:
                print "Connection did not start with", u
            else:
                break

        chunk = ''
        block_size = 1024
        remaining_blocks = length

        while remaining_blocks > 0:

            if remaining_blocks >= block_size:
                fetch_size = block_size
            else:
                fetch_size = int(remaining_blocks)

            try:
                data_block = data.read(fetch_size)
                if len(data_block) == 0:
                    print "Connection: [TESTING]: 0 sized block" + \
                        " fetched."
                if len(data_block) != fetch_size:
                    print "Connection: len(data_block) != length" + \
                        ", but continuing anyway."
                    self.run()
                    return

            except socket.timeout, s:
                print "Connection timed out with", s
                self.run()
                return

            remaining_blocks -= fetch_size
            chunk += data_block

        resultQ.put([start, chunk])

下面是我初始化线程池的地方,接下来我将项目放入队列

# create a thread pool and give them a queue
for i in range(num_threads):
    t = Downloader(workQ, resultQ, file_name)
    t.setDaemon(True)
    t.start()

共有3个答案

汪兴为
2023-03-14

Thread对象通过从run方法返回来终止线程——它不调用stop。如果将线程设置为守护程序模式,则不需要加入,但主线程需要加入。线程通常使用resultq报告它正在退出,而主线程则使用该信息进行连接。这有助于有序地终止流程。若python仍在处理多个线程,并且最好避开这一步,那个么在系统退出期间可能会出现奇怪的错误。

刘和昶
2023-03-14

您应该使用线程池控制线程的生命周期:

  • http://www.inductiveload.com/posts/easy-thread-pools-in-python-with-threadpool/

然后,当线程存在时,您可以向主线程(处理线程池)发送消息,然后更改线程池的大小,并在将清空的堆栈中延迟新请求或失败的请求。

Tedelay对于您为线程提供的守护进程状态是绝对正确的。不需要将它们设置为守护进程。

基本上,您可以简化代码,您可以执行以下操作:

import threadpool

def process_tasks():
    pool = threadpool.ThreadPool(4)

    requests = threadpool.makeRequests(download_file, arguments)

    for req in requests:
        pool.putRequest(req) 

    #wait for them to finish (or you could go and do something else)
    pool.wait()

if __name__ == '__main__': 
    process_tasks()

其中参数取决于您的策略。要么给线程一个队列作为参数,然后清空队列。或者您可以在process_tasks中处理队列,在池已满时阻塞,并在线程完成时打开一个新线程,但队列不是空的。这完全取决于您的需求和下载器的上下文。

资源:

  • http://chrisarndt.de/projects/threadpool/
  • http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/203871
  • http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/196618
  • http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/302746
  • http://lethain.com/using-threadpools-in-python/
程冥夜
2023-03-14

我能让线自动停止吗?

不要使用自己。_Thread__stop()。退出线程的run()方法就足够了(您可以检查标志或从队列中读取哨兵值以知道何时退出)。

在这里,我想检测http 503并从可用池中停止/杀死/删除一个线程——当然,还要将失败的块重新添加回队列,以便其余线程将处理它

您可以通过分离责任来简化代码:

  • download_file()不应尝试在无限循环中重新连接。如果有错误;让我们看看调用download_file()的代码,如果需要,请重新提交它
  • 关于并发连接数的控制可以封装在信号量对象中。在这种情况下,线程数可能与并发连接数不同
import concurrent.futures # on Python 2.x: pip install futures 
from threading import BoundedSemaphore

def download_file(args):
    nconcurrent.acquire(timeout=args['timeout']) # block if too many connections
    # ...
    nconcurrent.release() #NOTE: don't release it on exception,
                          #      allow the caller to handle it

# you can put it into a dictionary: server -> semaphore instead of the global
nconcurrent = BoundedSemaphore(5) # start with at most 5 concurrent connections
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    future_to_args = dict((executor.submit(download_file, args), args)
                           for args in generate_initial_download_tasks())

    while future_to_args:
        for future in concurrent.futures.as_completed(dict(**future_to_args)):
            args = future_to_args.pop(future)
            try: 
                result = future.result()
            except Exception as e:
                print('%r generated an exception: %s' % (args, e))
                if getattr(e, 'code') != 503:
                   # don't decrease number of concurrent connections
                   nconcurrent.release() 
                # resubmit
                args['timeout'] *= 2                    
                future_to_args[executor.submit(download_file, args)] = args
            else: # successfully downloaded `args`
                print('f%r returned %r' % (args, result))

请参见ThreadPoolExec导师()示例。

 类似资料:
  • 我需要在Java(Java.util.concurrent)中实现一个线程池,它的线程数在空闲时处于某个最小值,当作业提交到线程池中时,线程数会增长到一个上限(但不会再增长),而当所有作业完成且不再提交作业时,线程数会收缩到下限。 您将如何实现这样的功能?我想这将是一个相当常见的使用场景,但显然是factory方法只能创建固定大小的池以及在提交多个作业时无限增长的池。类提供了和参数,但其文档似乎暗

  • 问题内容: 我有一个运行时间很长的过程,可以监听事件并进行一些激烈的处理。 目前,我通常用于限制并发运行的作业数量,但是根据一天中的时间以及其他各种因素,我希望能够动态地增加或减少并发线程的数量。 如果我减少了并发线程的数量,那么我希望当前正在运行的作业能够很好地完成。 是否有Java库可以让我控制并动态增加或减少线程池中运行的并发线程数?(该类必须实现ExecutorService)。 我必须自

  • 问题内容: 我需要在Java(java.util.concurrent)中实现一个线程池,该线程池在空闲时处于最小数量,在作业提交到其完成速度快于完成时会增长到一个上限(但永远不会超过上限) ,并且在完成所有作业且不再提交任何作业时,缩小到下限。 您将如何实现这样的目标?我想这将是一个相当普遍的使用场景,但是显然工厂方法只能创建固定大小的池,并且当提交许多作业时池会无限增长。本类提供和参数,但它的

  • 最近,我一直在尝试寻找一个用于线程并发任务的库。理想情况下,一个简单的接口,调用线程上的函数。任何时候都有n个线程,有些线程完成得比其他线程快,并且在不同的时间到达。 首先我在试Rx,这在c语言中很好。我也研究了区块和TBB,但它们都依赖于平台。对于我的原型,我需要保持平台独立性,因为我们还不知道它将在什么平台上运行,并且在做出决定时可以更改。 C 11有很多关于线程和并发的东西,我发现了很多关于

  • 我试图创建一个简单的pythongui(使用Tkinter),其中包含start按钮,在线程中运行while循环,以及停止while循环的stop按钮。 我对停止按钮有问题,它不会停止任何东西,一旦单击开始按钮,就会冻结GUI。 见下面的代码: 知道怎么解决这个问题吗?我肯定这是微不足道的,一定做了成千上万次,但我自己找不到解决办法。 谢谢大卫

  • 问题 你要为需要并发执行的代码创建/销毁线程 解决方案 threading 库可以在单独的线程中执行任何的在 Python 中可以调用的对象。你可以创建一个 Thread 对象并将你要执行的对象以 target 参数的形式提供给该对象。 下面是一个简单的例子: # Code to execute in an independent thread import time def countdown(