最近需要完成一个多线程下载的工具,对其中的多线程下载进行了一个抽象,可以对所有需要使用到多线程编程的地方统一使用这个模型来进行编写。
主要结构:
1、基于Queue标准库实现了一个类似线程池的工具,用户指定提交任务线程submitter与工作线程worker数目,所有线程分别设置为后台运行,提供等待线程运行完成的接口。
2、所有需要完成的任务抽象成task,提供单独的无参数调用方式,供worker线程调用;task以生成器的方式作为参数提供,供submitter调用。
3、所有需要进行线程交互的信息放在context类中。
主要实现代码如下:
#Submitter线程类实现,主要是`task_generator`调用 class SubmitterThread(threading.Thread): _DEFAULT_WAIT_TIMEOUT = 2 #seconds def __init__(self, queue, task_gen, timeout=2): super(SubmitterThread, self).__init__() self.queue = queue if not isinstance(timeout, int): _logger.error('Thread wait timeout value error: %s, ' 'use default instead.' % timeout) self.timeout = self._DEFAULT_WAIT_TIMEOUT self.timeout = timeout self.task_generator = task_gen def run(self): while True: try: task = self.task_generator.next() self.queue.put(task, True, self.timeout) except Queue.Full: _logger.debug('Task queue is full. %s wait %d second%s timeout' % (self.name, self.timeout, 's' if (self.timeout > 1) else '')) break except (StopIteration, ValueError) as e: _logger.debug('Task finished') break
#Worker线程实现,主要就是try块内的func调用 class WorkerThread(threading.Thread): _DEFAULT_WAIT_TIMEOUT = 2 #seconds def __init__(self, queue, timeout=2): super(WorkerThread, self).__init__() self.queue = queue if not isinstance(timeout, int): _logger.error('Thread wait timeout value error: %s, ' 'use default instead.' % timeout) self.timeout = self._DEFAULT_WAIT_TIMEOUT self.timeout = timeout def run(self): while True: try: func = self.queue.get(True, self.timeout) except Queue.Empty: _logger.debug('Task queue is empty. %s wait %d second%s timeout' % (self.name, self.timeout, 's' if (self.timeout > 1) else '')) break if not callable(func): time.sleep(1) try: func() except Exception as e: _logger.error('Thread %s running occurs error: %s' % (self.name, e)) print('Thread running error: %s' % e)
class Executor(object): """ The really place to execute executor """ thread_list = [] submitters = 0 workers = 0 queue = None task_generator = None timeout = 0 def __init__(self, task_gen, submitters=1, workers=1 , timeout=2): if len(self.thread_list) != 0: raise RuntimeError('Executor can only instance once.') self.queue = Queue.Queue(maxsize=submitters * 2 + workers * 2) self.submitters = submitters self.workers = workers self.task_generator = task_gen self.timeout = timeout def start(self): for i in range(self.submitters): submitter = SubmitterThread(self.queue, self.task_generator, self.timeout) self.thread_list.append(submitter) submitter.setName('Submitter-%d' % i) submitter.setDaemon(True) submitter.start() for i in range(self.workers): worker = WorkerThread(self.queue, self.timeout) self.thread_list.append(worker) worker.setName('Worker-%d' % i) worker.setDaemon(True) worker.start() def is_alive(self): alive = False for t in self.thread_list: if t.isAlive(): alive = True break return alive def wait_to_shutdown(self): _logger.debug('Start to wait to shutdown') for t in self.thread_list: t.join() _logger.debug('Shutdown thread : %s' % t.name)
Executor类保存了线程池,提供相应接口。有了这个抽象之后,只需要实例化Executor类的对象,然后调用start方法进行多线程任务的运行。并可以用is_alive等接口再主线程内进行其他处理。
后续再使用这个抽象进行实际多线程任务的实现。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。
本文向大家介绍详解Python中的多线程编程,包括了详解Python中的多线程编程的使用技巧和注意事项,需要的朋友参考一下 一、简介 多线程编程技术可以实现代码并行性,优化处理能力,同时功能的更小划分可以使代码的可重用性更好。Python中threading和Queue模块可以用来实现多线程编程。 二、详解 1、线程和进程 进程(有时被称为重量级进程)是程序的一次执行。
本文向大家介绍详解Python多线程,包括了详解Python多线程的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家解析了Python多线程,供大家参考,具体内容如下 1、多线程的理解 多进程和多线程都可以执行多个任务,线程是进程的一部分。线程的特点是线程之间可以共享内存和变量,资源消耗少(不过在Unix环境中,多进程和多线程资源调度消耗差距不明显,Unix调度较快),缺点是线程之间的同步和
9.3.4 Python 多线程编程 很多编程语言都支持多线程编程,Python 语言亦然。与其他编程语言相比,Python 的 多线程编程是非常简单的。 Python 提供了两个支持线程的模块,一个是较老的 thread 模块,另一个是较新的 threading 模块。其中 threading 采用了面向对象实现,功能更强,建议读者使用。 thread 模块的用法 任何程序一旦开始执行,就构成了
本文向大家介绍python 并发编程 多路复用IO模型详解,包括了python 并发编程 多路复用IO模型详解的使用技巧和注意事项,需要的朋友参考一下 多路复用IO(IO multiplexing) 这种IO方式为事件驱动IO(event driven IO)。 我们都知道,select/epoll的好处就在于单个进程process就可以同时处理多个网络连接的IO。它的基本原理就是select/e
一、简介 下图为 Strom 的运行流程图,在开发 Storm 流处理程序时,我们需要采用内置或自定义实现 spout(数据源) 和 bolt(处理单元),并通过 TopologyBuilder 将它们之间进行关联,形成 Topology。 二、IComponent接口 IComponent 接口定义了 Topology 中所有组件 (spout/bolt) 的公共方法,自定义的 spout 或
本文向大家介绍python并发编程之多进程、多线程、异步和协程详解,包括了python并发编程之多进程、多线程、异步和协程详解的使用技巧和注意事项,需要的朋友参考一下 最近学习python并发,于是对多进程、多线程、异步和协程做了个总结。 一、多线程 多线程就是允许一个进程内存在多个控制权,以便让多个函数同时处于激活状态,从而让多个函数的操作同时运行。即使是单CPU的计算机,也可以通过不停地在不同