当前位置: 首页 > 面试题库 >

RuntimeError:切勿在任务Celery中调用result.get()

笪烨
2023-03-14
问题内容

我正在使用celery将任务发送到远程服务器,并试图将结果恢复。使用远程服务器上的update_state方法不断更新任务状态。

我正在使用发送任务

app.send_task('task_name')

获得celery任务的结果是一个阻塞的呼叫,我不希望我的django应用程序等待结果和超时。

所以我尝试运行另一个celery任务以获取结果。

@app.task(ignore_result=True)
def catpure_res(task_id):
    task_obj = AsyncResult(task_id)
    task_obj.get(on_message=on_msg)

但这会导致以下错误。

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 367, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 622, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/arpit/project/appname/tasks/results.py", line 42, in catpure_res
    task_obj.get(on_message=on_msg)
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 168, in get
    assert_will_not_block()
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 44, in assert_will_not_block
    raise RuntimeError(E_WOULDBLOCK)
RuntimeError: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

有没有解决此错误的方法。是否需要运行守护进程来获取结果?


问题答案:

使用allow_join_result。请参见下面的代码段。

@app.task(ignore_result=True)
def catpure_res(task_id):
    task_obj = AsyncResult(task_id)
    with allow_join_result():
        task_obj.get(on_message=on_msg)

注意:如其他答案中所述,它可能导致性能问题甚至死锁,但是如果您的任务写得好并且没有引起意外的错误,则它应该像魅力一样工作。



 类似资料:
  • 问题内容: 我有一个Django站点,当用户请求时会发生刮擦,并且我的代码在新过程中启动了Scrapy Spider独立脚本。自然,这与增加用户数量无关。 像这样: 我决定使用Celery并使用工作人员将爬网请求排队。 但是,我遇到了无法重新启动龙卷风反应堆的问题。第一个蜘蛛和第二个蜘蛛成功运行,但随后的蜘蛛将引发ReactorNotRestartable错误。 任何人都可以在Celery框架中运

  • 问题内容: 我使用celery更新新闻聚合站点中的RSS feed。我为每个提要使用一个@task,看起来一切正常。 有一个细节我不确定如何处理:所有提要每分钟都使用@periodic_task更新一次,但是如果提要仍在启动新任务时从上一个定期任务更新,该怎么办?(例如,如果Feed确实很慢或离线,并且任务在重试循环中进行) 目前,我存储任务结果并按以下方式检查其状态: 也许我错过了一些使用芹菜机

  • 问题内容: 如何检查一项任务是否在celery中运行(特别是我在使用celery-django)? 我已经阅读了文档,并且已经在Google上进行了搜索,但是看不到类似以下的呼叫: 我的用例是我有一个外部(java)服务来进行代码转换。当我发送要进行代码转换的文档时,我想检查运行该服务的任务是否正在运行,如果没有运行,请(重新)启动它。 我相信我使用的是当前的稳定版本2.4。 问题答案: 每个对象

  • 问题内容: 如何从任务中获取任务的task_id值?这是我的代码: 这个想法是,当我创建任务的新实例时,我从任务对象中检索。然后,我使用任务ID来确定任务是否已完成。我 不想 按值跟踪任务,因为在任务完成后文件将被“清理”,并且可能存在也可能不存在。 在上面的示例中,我将如何获取值? 问题答案: 如果任务接受,Celery会设置一些默认关键字参数。(您可以使用** kwargs接受它们,也可以专门

  • 问题内容: 我刚刚发现了有关配置选项(docs)的信息。默认值为4,但是(我相信)我希望预取尽可能少。我现在将其设置为1,这与我要查找的值足够接近,但是仍有一些我不理解的地方: 为什么这样预取一个好主意?我并没有真正找到原因,除非消息队列和工作线程之间存在大量延迟(就我而言,它们当前正在同一主机上运行,​​最糟糕的是最终可能在同一数据中的不同主机上运行)中央)。该文档仅提到了缺点,但没有解释优点是

  • 问题内容: 我目前正在将celery与django结合使用,并且一切正常。 但是,如果服务器超载,我希望能够通过检查当前计划了多少个任务,使用户有机会取消任务。 我怎样才能做到这一点? 我正在使用Redis作为代理。 这在某种程度上与我的问题有关,但是我不需要列出任务,只需数一下它们即可:) 问题答案: 如果您的代理配置为,并且您的任务已提交到常规队列,则可以通过以下方式获得长度: 或者,从she