当前位置: 首页 > 面试题库 >

报告长期运行的Celery任务的结果

孙海
2023-03-14
问题内容

问题
我已经将一个长期运行的任务划分为多个逻辑子任务,因此我可以在每个子任务完成时报告结果。但是,我正在尝试报告将永远无法完成的任务的结果(而不是不断产生价值),并且正在使用现有的解决方案来做到这一点。

背景
我正在为我编写的某些Python程序构建Web界面。用户可以通过Web表单提交作业,然后返回查看该作业的进度。

假设我有两个函数,每个函数都可以通过单独的形式进行访问:

  • med_func:执行大约需要1分钟,结果传递给render(),产生了其他数据。
  • long_func:返回一个生成器。每次yield大约需要30分钟,应向用户报告。产量很多,我们可以认为此迭代器是无

限的(仅在被吊销时终止)。

代码,当前实现

使用med_func,我报告结果如下:

在表单提交时,我将保存AsyncResult到Django会话:

    task_result = med_func.apply_async([form], link=render.s())
    request.session["task_result"] = task_result

结果页面的Django视图访问this AsyncResult。任务完成后,结果将保存到作为上下文传递给Django模板的对象中。

def results(request):
    """ Serve (possibly incomplete) results of a session's latest run. """
    session = request.session

    try:  # Load most recent task
        task_result = session["task_result"]
    except KeyError:  # Already cleared, or doesn't exist
        if "results" not in session:
            session["status"] = "No job submitted"
    else:  # Extract data from Asynchronous Tasks
        session["status"] = task_result.status
        if task_result.ready():
            session["results"] = task_result.get()
            render_task = task_result.children[0]

            # Decorate with rendering results
            session["render_status"] = render_task.status
            if render_task.ready():
                session["results"].render_output = render_task.get()
                del(request.session["task_result"])  # Don't need any more

    return render_to_response('results.html', request.session)

仅当函数实际终止时,此解决方案才有效。我无法将的逻辑子任务链接在一起long_func,因为存在未知数量的yields(每个循环的迭代long_func可能不会产生结果)。


有什么明智的方法可以从极其长时间运行的Celery任务访问生成的对象,以便可以在生成器用尽之前显示它们?


问题答案:

为了让Celery知道任务的当前状态是什么,它将在你拥有的任何结果后端中设置一些元数据。你可以搭载它来存储其他类型的元数据。

def yielder():
    for i in range(2**100):
        yield i

@task
def report_progress():
    for progress in yielder():
        # set current progress on the task
        report_progress.backend.mark_as_started(
            report_progress.request.id,
            progress=progress)

def view_function(request):
    task_id = request.session['task_id']
    task = AsyncResult(task_id)
    progress = task.info['progress']
    # do something with your current progress

我不会扔吨的数据在那里,但它运作良好,跟踪长时间运行任务的进度。



 类似资料:
  • 问题内容: 我有一个python celery-redis队列处理一次上传和下载值得一次演出和大量演出的数据。 很少有上传内容需要花费几个小时的时间。但是,一旦完成了这样的任务,我就目睹了这种奇怪的芹菜行为:芹菜调度程序通过将其再次发送给工作人员来重新运行刚刚完成的任务(我正在运行一个工作人员),并且它在同一时间发生了2次任务! 有人可以帮助我知道为什么会发生这种情况以及如何预防吗? 这些任务肯定

  • 问题内容: 我使用celery更新新闻聚合站点中的RSS feed。我为每个提要使用一个@task,看起来一切正常。 有一个细节我不确定如何处理:所有提要每分钟都使用@periodic_task更新一次,但是如果提要仍在启动新任务时从上一个定期任务更新,该怎么办?(例如,如果Feed确实很慢或离线,并且任务在重试循环中进行) 目前,我存储任务结果并按以下方式检查其状态: 也许我错过了一些使用芹菜机

  • 在我的spring boot应用程序中,我必须实现一个导入服务。用户可以提交一组JSON文件,应用程序将尝试从这些文件中导入数据。根据JSON文件中的数据量,单个导入过程可能需要1或2个小时。 我不想在导入过程中阻止用户,因此我计划接受导入任务,并通知用户此数据已排定处理时间。我将把数据放入队列,另一端的空闲队列使用者将启动导入过程。此外,我需要有一个可能性来监视队列中的一个作业,如果需要的话终止

  • 问题内容: 我有一个Django站点,当用户请求时会发生刮擦,并且我的代码在新过程中启动了Scrapy Spider独立脚本。自然,这与增加用户数量无关。 像这样: 我决定使用Celery并使用工作人员将爬网请求排队。 但是,我遇到了无法重新启动龙卷风反应堆的问题。第一个蜘蛛和第二个蜘蛛成功运行,但随后的蜘蛛将引发ReactorNotRestartable错误。 任何人都可以在Celery框架中运

  • 我使用的是Activti版本6.0.0。 我想同步执行活动中的任务,但我有一个长时间运行的任务,可能需要20分钟来执行。我想在我的控制器中快速获得过程实例ID,而不考虑在bpmn中完成任务,所以我在bpmn文件中添加了一个中间计时器事件“0分钟”。 我已经开始执行一个进程,比如说processInstanceid p1。当时只有我再次启动这个进程,比如说使用processInstanceid p2

  • 问题内容: 我一直在阅读文档并进行搜索,但似乎找不到直接的答案: 你可以取消已经执行的任务吗?(由于任务已开始,需要一段时间,因此需要取消一半) 我是从Celery FAQ的文档中找到的 但是我不清楚这是否会取消排队的任务,或者是否会杀死工作程序上正在运行的进程。感谢你能摆脱的光芒! 问题答案: 撤销将取消任务执行。如果任务被吊销,工人将忽略该任务并且不执行它。如果你不使用持久撤销,则可以在wor