当前位置: 首页 > 知识库问答 >
问题:

将任务结果输入芹菜中的地图

宫俊远
2023-03-14

我觉得这是一个非常简单的用例,但不是一个我从文档中成功解决的用例。

以下是我的问题的人为版本:

我有一个任务,它接受一个参数并返回一个列表。我想对列表中的每个元素应用另一个任务。然后(视情况而定)我可能希望继续处理其他任务。

chain(
    my.tasks.divide.s(data),
    my.tasks.conquer.map(),
    ).apply_async().get()

或者

chain(
    my.tasks.divide.s(data),
    my.tasks.conquer.map(),
    my.tasks.unite.s(),
    my.tasks.rule.s(),
    ).apply_async().get()

地图不是那样的,但我觉得应该是这样的!我只想从一个参数链接到一个列表,然后再返回。。这当然不难做到吗?

这里和这里的“解决方案”似乎过于复杂,我不敢相信这是最佳实践。

关于和弦、贴图和链的文档对这种特殊情况没有帮助。

编辑:到目前为止我有什么。这里的问题是,我必须调用。apply_async(). get()两次以获得实际结果,因为中间步骤返回一个和弦而不是实际结果:

@app.task()
def divide(item):
    return [x for x in item]

@app.task()
def conquer(item):
    return item.upper()

@app.task()
def rule(items):
    return "".join(items)

@app.task()
def conquer_group(items):
    return group([conquer.s(x) for x in items])

@app.task()
def rule_group(items):
    return chord([conquer.s(x) for x in items], rule.s())

def main():
    print chain(
        divide.s("foobarbaz"),
        rule_group.s(),
        ).apply_async().get().apply_async().get()

共有1个答案

张丰
2023-03-14

这里的问题是map()需要一个迭代来实例化。apply_async()但不能实例化接受链中最后一个任务结果的map()签名。我认为实现这一点的简单方法是在任务中实例化map()签名。

@app.task()
def divide(item):
    return [x for x in item]

@app.task()
def conquer(item):
    return item.upper()

@app.task()
def process(items):
    return conquer.map(items)

def run():
    (divide.s("foobar") | process.s()).apply_async()
 类似资料:
  • 问题内容: 我将向芹菜队列添加多个任务,然后等待结果。我有各种各样的想法,我将如何利用某种形式的共享存储(memcached,redis,db等)来实现这一目标,但是,我本以为Celery可以自动处理,但我无法在线找到任何资源。 代码示例 问题答案: 对于 芹菜 > = 3.0 ,使用taskset被弃用赞成组。 在后台启动组: 等待:

  • 问题内容: 我在task.py中有一个任务,如下所示: 我试图将请求对象直接从几个传递给任务,如下所示: 我收到一个无法序列化的错误,我猜是吗?我该如何解决?麻烦的是我也有文件上传对象..它不是所有简单的数据类型。 问题答案: 因为请求对象包含对不实际序列化的内容的引用(例如上载的文件或与请求关联的套接字),所以没有通用的方法来对其进行序列化。 相反,您应该拔出并传递需要的部分。例如,类似:

  • 问题内容: 我正在尝试将类的方法用作django-celery任务,并使用@task装饰器对其进行标记。阿南德·杰亚哈(Anand Jeyahar)询问,这里也描述了同样的情况。是这样的 问题是,即使我使用这样的类实例,也需要至少两个参数,这意味着指针未命中。 更多信息: 由于继承,我无法将类转换为 模块 方法在很大程度上依赖于类成员,所以我不能使它们成为 静态的 标记 类 与@task装饰任务,

  • 现在,我想将< code>register事件发布到某个特殊的交换,我可以使用celery远程检索和处理它。 实际上,我已经使用了函数来实现这一点,但是它必须传递来指示应该执行哪个任务并消费它。所以它似乎不太适合我的目标。 我想要的就是这样: 向某些发布消息; 远程机器1订阅此或并捕获消息,用于执行任务; 远程机器2-与机器1相同但执行另一个任务-接收(可能需要回复某些) 例如,就像这个工作流一样

  • 当我运行批处理文件以启用芹菜时: 出现以下错误: 回溯(最后一次调用): 文件“C:\Users\Chernov.a\Desktop\Projects\Roscosmos\Python36-32\lib\runpy.py”,第193行,作为主“main”,mod\u spec) 文件“C:\Users\Chernov.a\Desktop\Projects\Roscosmos\Python36-32

  • 问题内容: 当我有以下内容时 直观的解释是,task3应该仅在第2组中的所有任务完成后才执行。 实际上,任务3在group1开始但尚未完成时执行。 我究竟做错了什么? 问题答案: 事实证明,在芹菜中,不能将两个组链接在一起。 我怀疑这是因为与任务链接的组自动变成和弦 -> Celery文档:http : //docs.celeryproject.org/en/latest/userguide/ca