当我有以下内容时
group1 = group(task1.si(), task1.si(), task1.si())
group2 = group(task2.si(), task2.si(), task2.si())
workflow = chain(group1, group2, task3.si())
直观的解释是,task3应该仅在第2组中的所有任务完成后才执行。
实际上,任务3在group1开始但尚未完成时执行。
我究竟做错了什么?
事实证明,在芹菜中,不能将两个组链接在一起。
我怀疑这是因为与任务链接的组自动变成和弦
-> Celery文档:http : //docs.celeryproject.org/en/latest/userguide/canvas.html
将组与另一个任务链接在一起将自动将其升级为和弦:
组返回父任务。将两个组链接在一起时,我怀疑当第一组完成时,和弦会启动回调“任务”。我怀疑这个“任务”实际上是第二组的“父母任务”。我进一步怀疑该父任务在启动该组中的所有子任务后立即完成,因此执行了第二个组之后的下一项。
为了证明这一点,这里是一些示例代码。您需要已经有一个正在运行的celery实例。
# celery_experiment.py
from celery import task, group, chain, chord
from celery.signals import task_sent, task_postrun, task_prerun
import time
import logging
import random
random.seed()
logging.basicConfig(level=logging.DEBUG)
### HANDLERS ###
@task_prerun.connect()
def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
try:
logging.info('[%s] starting' % kwargs['id'])
except KeyError:
pass
@task_postrun.connect()
def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
try:
logging.info('[%s] finished' % kwargs['id'])
except KeyError:
pass
def random_sleep(id):
slp = random.randint(1, 3)
logging.info('[%s] sleep for %ssecs' % (id, slp))
time.sleep(slp)
@task()
def thing(id):
logging.info('[%s] begin' % id)
random_sleep(id)
logging.info('[%s] end' % id)
def exec_exp():
st = thing.si(id='st')
st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),]
st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),]
st2 = thing.si(id='st2')
st3 = thing.si(id='st3')
st4 = thing.si(id='st4')
grp1 = group(st_arr)
grp2 = group(st_arr2)
# chn can chain two groups together because they are seperated by a single subtask
chn = (st | grp1 | st2 | grp2 | st3 | st4)
# in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes
#chn2 = (st | st2 | grp1 | grp2 | st3 | st4)
r = chn()
#r2 = chn2()
问题内容: 我研究了,但是示例仅涉及使其重现。我正在寻找一种类似的功能,例如说“每个星期一的凌晨1点执行此任务”的功能。 问题答案: 感谢Patrick Altman,最近发布的1.0.3版现在支持此功能。 例: 请参阅更改日志以获取更多信息: http://celeryproject.org/docs/changelog.html
问题内容: 我正在尝试将类的方法用作django-celery任务,并使用@task装饰器对其进行标记。阿南德·杰亚哈(Anand Jeyahar)询问,这里也描述了同样的情况。是这样的 问题是,即使我使用这样的类实例,也需要至少两个参数,这意味着指针未命中。 更多信息: 由于继承,我无法将类转换为 模块 方法在很大程度上依赖于类成员,所以我不能使它们成为 静态的 标记 类 与@task装饰任务,
现在,我想将< code>register事件发布到某个特殊的交换,我可以使用celery远程检索和处理它。 实际上,我已经使用了函数来实现这一点,但是它必须传递来指示应该执行哪个任务并消费它。所以它似乎不太适合我的目标。 我想要的就是这样: 向某些发布消息; 远程机器1订阅此或并捕获消息,用于执行任务; 远程机器2-与机器1相同但执行另一个任务-接收(可能需要回复某些) 例如,就像这个工作流一样
问题内容: 我在task.py中有一个任务,如下所示: 我试图将请求对象直接从几个传递给任务,如下所示: 我收到一个无法序列化的错误,我猜是吗?我该如何解决?麻烦的是我也有文件上传对象..它不是所有简单的数据类型。 问题答案: 因为请求对象包含对不实际序列化的内容的引用(例如上载的文件或与请求关联的套接字),所以没有通用的方法来对其进行序列化。 相反,您应该拔出并传递需要的部分。例如,类似:
我觉得这是一个非常简单的用例,但不是一个我从文档中成功解决的用例。 以下是我的问题的人为版本: 我有一个任务,它接受一个参数并返回一个列表。我想对列表中的每个元素应用另一个任务。然后(视情况而定)我可能希望继续处理其他任务。 或者 地图不是那样的,但我觉得应该是这样的!我只想从一个参数链接到一个列表,然后再返回。。这当然不难做到吗? 这里和这里的“解决方案”似乎过于复杂,我不敢相信这是最佳实践。
问题内容: 我正在使用芹菜和django-celery。我定义了一个我想测试的定期任务。是否可以从外壳程序手动运行定期任务,以便查看控制台输出? 问题答案: 您是否尝试过仅从Django Shell运行任务?您可以使用任务的方法来确保它在本地积极运行。 假设在子模块的Django应用中调用了该任务: 结果实例具有与通常类型相同的API ,不同之处在于,始终要在本地迅速地评估结果,并且该方法将阻塞,