示例
标准线程多进程,生产者/消费者示例:
Worker越多,问题越大
# -*- coding: utf8 -*-import os import time import Queue import threading from PIL import Image
def create_thumbnail(filename, size=(128, 128)): try: fp, fmt = filename.rsplit('.', 1) im = Image.open(filename) im.thumbnail(size, Image.ANTIALIAS) im.save((fp + '_'+'x'.join(str(i) for i in size) + '.'+fmt), im.format) return '%s thumbnail success!' % filename except Exception: return '%s thumbnail failed!' % filename
def get_image_paths(folder): return [os.path.join(folder, f) for f in os.listdir(folder) if 'png' in f]
class Consumer(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self._queue = queue
def run(self): while True: content = self._queue.get() if isinstance(content, str) and content == 'quit': break respone = create_thumbnail(content) print 'Bye bye!'
def Producer(): filenames = get_image_paths('images') queue = Queue.Queue() worker_threads = build_worker_pool(queue, 4) start_time = time.time()
for filename in filenames: queue.put(filename) for worker in worker_threads: queue.put('quit') for worker in worker_threads: worker.join()
print time.time() - start_time
def build_worker_pool(queue, size): workers = [] for _ in range(size): worker = Consumer(queue) worker.start() workers.append(worker) return workers
if __name__ == '__main__': Producer()
map
Map能够处理集合按顺序遍历,最终将调用产生的结果保存在一个简单的集合当中。
# -*- coding: utf8 -*-import os import time from multiprocessing import Pool from PIL import Image
def create_thumbnail(filename, size=(128, 128)): try: fp, fmt = filename.rsplit('.', 1) im = Image.open(filename) im.thumbnail(size, Image.ANTIALIAS) im.save((fp + '_'+'x'.join(str(i) for i in size) + '.'+fmt), im.format) return '%s thumbnail success!' % filename except Exception: return '%s thumbnail failed!' % filename
def get_image_paths(folder): return [os.path.join(folder, f) for f in os.listdir(folder) if 'png' in f]
def main(): filenames = get_image_paths('images') start_time = time.time() pool = Pool(4) pool.map(create_thumbnail, filenames) pool.close() pool.join()
print time.time() - start_time
if __name__ == '__main__': main()
问题内容: 我正在使用python 2.7,我有一些看起来像这样的代码: 此处唯一的依赖项如下:dependent1需要等待任务1-3,Dependent2需要等待任务4-6,而dependent3需要等待依赖项1-2 …以下是可以的:首先运行全部6个任务并行,然后是前两个从属。 我希望尽可能多的任务并行运行,我已经在Google上搜索了一些模块,但是我希望避免使用外部库,并且不确定队列线程技术如
问题内容: 我正在编写一个新的Jenkins管道,并具有一组最终要并行运行的步骤。但是,在开发此管道时,我想强制其顺序运行。我没有看到任何指定并行步骤使用的线程数或类似方法的方法。这是到目前为止的基本代码: 我希望能够依次运行这些Shell脚本而无需更改很多代码。 问题答案: 而不是您可以这样使用:
我有一大堆Scalaz任务。创建方式如下: 我希望这些任务并行运行。以随机顺序打印数字,不要花5秒钟(每个任务有50个任务和100毫升睡眠)。 但是,很明显,每个任务需要100毫秒,所有任务都需要5秒钟,并且创建的列表是有序的。 如何并行运行它们?任务在哪里运行线程?
问题内容: 我有以下使用类的课程。所以我想做的是,在运行cp1实例处理方法的同时,我要并行运行。 但是,我要按顺序cp1,所以我要它运行并完成,如果cp2没有完成或失败,那就很好。如果确实失败,我想加入结果。该示例中未返回任何内容,但我想返回结果。 为此,应该使用TaskExecutor吗?还是线程? 我只希望cp2与cp1并行运行。或者,如果我添加更多内容,例如说cp3,我希望它也可以与cp1并
我是JavaFx/并发的新手,所以我在JavaFX中阅读了并发教程,但是我仍然对JavaFX Gui中后台线程的实现有点困惑。 我试图编写一个与一些串行设备(使用JSSC-2.8)接口的小图形用户界面,并根据这些设备的响应更新图形用户界面。但是,在写入消息和设备响应之间有一个延迟,在任意的时间内使用Thread.sleep()对我来说不是一个可靠的编程方式。因此,我想使用并发包中的等待()和通知(
是否可以在gradle.properties文件中定义test.maxParallelForks=runtime.runtime.availableProcessors(),而不是在test任务下的每个build.gradle文件中定义它?