当前位置: 首页 > 工具软件 > huey > 使用案例 >

【PYthon分布式】huey:python轻量级异步任务队列简介

东门彬
2023-12-01

简介并安装

huey, a little task queue.
轻量级异步任务队列。

下载安装huey。

$ pip install huey

下载安装redis依赖(huey暂时只支持redis)。

$ pip install redis

利用huey定义并执行一些任务时,可以分成这几个文件。

  • config.py: 定义使用huey的一些配置,任务的redis存储。

The first step is to configure your queue. The consumer needs to be pointed at a Huey
instance, which specifies which backend to use.

  • task.py: 定义你需要执行的一些异步任务。
  • huey_consumer.py: 开启huey consumer的入口(huey提供)。
  • huey_main.py: 执行异步任务。

config.py

实际上就是利用redis创建consumer所指向的huey实例,huey实际上包含了一个队列queue,用来存储和取出消息。

from huey import RedisHuey

huey = RedisHuey('base_app', host='127.0.0.1')

或者

from huey import RedisHuey
from redis import ConnectionPool

import settings

redis_pool = ConnectionPool(host=settings.REDIS_ADDRESS, port=settings.REDIS_PORT, db=0)

huey = RedisHuey('base_app', connection_pool=redis_pool)
task.py

利用config.py所创建的huey来修饰普通函数使之成为huey任务。
这样就定义了一个最基本的异步任务。(base_huey.py 及上述的 config.py)

from base.base_huey import huey

@huey.task()
def count_beans(num):
    print('-- counted %s beans --' % num)
    for n in range(num):
        print(n)
    return 'Counted %s beans' % num
huey_consumer.py

之前习惯把huey包里的huey_consumer.py文件直接拿出来到主目录然后执行,新版的的huey_consumer.py与旧版的稍微有点区别。
新版本增加了一个consumer_options.py,用来定义封装了一些consumer相关的配置和命令行解析的处理类;在旧版中,这些都是直接定义在huey_consumer.py中。
查看OptionParserHandler源码可知,huey_consumer可以包含很多参数,主要分为三个group(Logging日志记录, Workers任务worker相关, Scheduler计划任务相关)。

    def get_option_parser(self):
        parser = optparse.OptionParser('Usage: %prog [options] '
                                       'path.to.huey_instance')

        def add_group(name, description, options):
            group = parser.add_option_group(name, description)
            for abbrev, name, kwargs in options:
                group.add_option(abbrev, name, **kwargs)

        add_group('Logging', 'The following options pertain to logging.',
                  self.get_logging_options())

        add_group('Workers', (
            'By default huey uses a single worker thread. To specify a '
            'different number of workers, or a different execution model (such'
            ' as multiple processes or greenlets), use the options below.'),
            self.get_worker_options())

        add_group('Scheduler', (
            'By default Huey will run the scheduler once every second to check'
            ' for tasks scheduled in the future, or tasks set to run at '
            'specfic intervals (periodic tasks). Use the options below to '
            'configure the scheduler or to disable periodic task scheduling.'),
            self.get_scheduler_options())

        return parser

最常用的一些参数:

  • -l 指定huey异步任务执行时的日志文件(也可以通过ConsumerConfigsetup_logger()来定义logger)。
  • -w 执行器worker队列的数量
  • -k worker的类型(process, thread, greenlet,默认是thread)
  • -d 轮询队列的最短时间间隔
huey_main.py

定义需要执行huey任务的方法。

from tasks.huey_task import count_beans

# base test
def test_1():
    count_beans(10)  # no block

    count_beans.schedule(args=(5, ), delay=5)  # delay 5s

    res = count_beans.schedule(args=(5, ), delay=5)
    # res.get(blocking=True)
    res(blocking=True)  # block

if __name__ == '__main__':

    test_1()

    print('end')
执行脚本
  1. 开启consumer轮询:python huey_consumer_new.py tasks.huey_task.huey -l logs/base_huey.log -w 1
    tasks.task.huey即上述的task.py,在此时后缀名需要替换成 .huey。
  2. 执行异步方法:pyton huey_main.py

ps:官方文档中是将 huey 实例和task任务都引入到main.py中。

main.py

from config import huey # import our "huey" object
from tasks import count_beans # import our task
if name == 'main':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans)

To run these scripts, follow these steps:
1. Ensure you have [Redis](http://redis.io/) running locally
2. Ensure you have [installed huey](http://huey.readthedocs.io/en/latest/installation.html#installation)
3. Start the consumer: huey_consumer.py main.huey
 (notice this is “main.huey” and not “config.huey”).
4. Run the main program: python main.py

---
#####huey task简单api介绍
利用```@huey.task()```能来定义一些基本异步任务,当然还有其他延时任务,周期性任务等。
1. 延时执行:下例展示了两种延时执行的方法:第一种时直接执行延时时间n秒并传入参数;第二种是指定了eta参数,即estimated time of arrival,传入未来的某个时间点,使其在计划时间点执行。

import datetime

from tasks.huey_task import count_beans

count_beans(3) # normal

count_beans.schedule(args=(3, ), delay=5) # delay 5s

in_a_minute = datetime.datetime.now() + datetime.timedelta(seconds=60)
count_beans.schedule(args=(100,), eta=in_a_minute)

2. 阻塞:利用block参数能够使其阻塞。

res = count_beans(100)

res.get(blocking=True)

res(blocking=True) # block

3. 异常重试:当任务出现异常时进行retry,并且可以指定重试延时时间。

from base.base_huey import huey

retry 3 times delay 5s

@huey.task(retries=3, retry_delay=5)
def try_reties_by_delay():
print('trying %s' % datetime.now())
raise Exception('try_reties_by_delay')

4. 周期性任务:利用```@huey.periodic_task()```来定义一个周期性任务。

from base.base_huey import huey
from huey import crontab

@huey.periodic_task(crontab(minute='*'))
def print_time():
print(datetime.now())



作者:蒋狗
链接:https://www.jianshu.com/p/e5801cec92e1
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
 类似资料: