huey, a little task queue.
轻量级异步任务队列。
下载安装huey。
$ pip install huey
下载安装redis依赖(huey暂时只支持redis)。
$ pip install redis
利用huey定义并执行一些任务时,可以分成这几个文件。
The first step is to configure your queue. The consumer needs to be pointed at a Huey
instance, which specifies which backend to use.
实际上就是利用redis创建consumer所指向的huey实例,huey实际上包含了一个队列queue,用来存储和取出消息。
from huey import RedisHuey
huey = RedisHuey('base_app', host='127.0.0.1')
或者
from huey import RedisHuey
from redis import ConnectionPool
import settings
redis_pool = ConnectionPool(host=settings.REDIS_ADDRESS, port=settings.REDIS_PORT, db=0)
huey = RedisHuey('base_app', connection_pool=redis_pool)
利用config.py所创建的huey来修饰普通函数使之成为huey任务。
这样就定义了一个最基本的异步任务。(base_huey.py 及上述的 config.py)
from base.base_huey import huey
@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
for n in range(num):
print(n)
return 'Counted %s beans' % num
之前习惯把huey包里的huey_consumer.py文件直接拿出来到主目录然后执行,新版的的huey_consumer.py与旧版的稍微有点区别。
新版本增加了一个consumer_options.py,用来定义封装了一些consumer相关的配置和命令行解析的处理类;在旧版中,这些都是直接定义在huey_consumer.py中。
查看OptionParserHandler
源码可知,huey_consumer可以包含很多参数,主要分为三个group(Logging日志记录, Workers任务worker相关, Scheduler计划任务相关)。
def get_option_parser(self):
parser = optparse.OptionParser('Usage: %prog [options] '
'path.to.huey_instance')
def add_group(name, description, options):
group = parser.add_option_group(name, description)
for abbrev, name, kwargs in options:
group.add_option(abbrev, name, **kwargs)
add_group('Logging', 'The following options pertain to logging.',
self.get_logging_options())
add_group('Workers', (
'By default huey uses a single worker thread. To specify a '
'different number of workers, or a different execution model (such'
' as multiple processes or greenlets), use the options below.'),
self.get_worker_options())
add_group('Scheduler', (
'By default Huey will run the scheduler once every second to check'
' for tasks scheduled in the future, or tasks set to run at '
'specfic intervals (periodic tasks). Use the options below to '
'configure the scheduler or to disable periodic task scheduling.'),
self.get_scheduler_options())
return parser
最常用的一些参数:
ConsumerConfig
的setup_logger()
来定义logger)。定义需要执行huey任务的方法。
from tasks.huey_task import count_beans
# base test
def test_1():
count_beans(10) # no block
count_beans.schedule(args=(5, ), delay=5) # delay 5s
res = count_beans.schedule(args=(5, ), delay=5)
# res.get(blocking=True)
res(blocking=True) # block
if __name__ == '__main__':
test_1()
print('end')
python huey_consumer_new.py tasks.huey_task.huey -l logs/base_huey.log -w 1
pyton huey_main.py
ps:官方文档中是将 huey
实例和task任务都引入到main.py中。
from config import huey # import our "huey" object
from tasks import count_beans # import our task
if name == 'main':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans)
To run these scripts, follow these steps:
1. Ensure you have [Redis](http://redis.io/) running locally
2. Ensure you have [installed huey](http://huey.readthedocs.io/en/latest/installation.html#installation)
3. Start the consumer: huey_consumer.py main.huey
(notice this is “main.huey” and not “config.huey”).
4. Run the main program: python main.py
---
#####huey task简单api介绍
利用```@huey.task()```能来定义一些基本异步任务,当然还有其他延时任务,周期性任务等。
1. 延时执行:下例展示了两种延时执行的方法:第一种时直接执行延时时间n秒并传入参数;第二种是指定了eta参数,即estimated time of arrival,传入未来的某个时间点,使其在计划时间点执行。
import datetime
from tasks.huey_task import count_beans
count_beans(3) # normal
count_beans.schedule(args=(3, ), delay=5) # delay 5s
in_a_minute = datetime.datetime.now() + datetime.timedelta(seconds=60)
count_beans.schedule(args=(100,), eta=in_a_minute)
2. 阻塞:利用block参数能够使其阻塞。
res = count_beans(100)
res(blocking=True) # block
3. 异常重试:当任务出现异常时进行retry,并且可以指定重试延时时间。
from base.base_huey import huey
@huey.task(retries=3, retry_delay=5)
def try_reties_by_delay():
print('trying %s' % datetime.now())
raise Exception('try_reties_by_delay')
4. 周期性任务:利用```@huey.periodic_task()```来定义一个周期性任务。
from base.base_huey import huey
from huey import crontab
@huey.periodic_task(crontab(minute='*'))
def print_time():
print(datetime.now())