当前位置: 首页 > 面试题库 >

celery任务和自定义装饰器

梁丘翔
2023-03-14
问题内容

我正在使用django和celery(django-celery)进行项目。我们的团队决定将所有数据访问代码(app-name)/manager.py包装在其中(不要像django这样包装到Manager中),而将代码放入(应用程序名称)/task.py中,仅处理用celery组装和执行任务(因此我们没有django在这一层的ORM依赖性)。

在我的manager.py,我有这样的事情:

def get_tag(tag_name):
    ctype = ContentType.objects.get_for_model(Photo)
    try:
        tag = Tag.objects.get(name=tag_name)
    except ObjectDoesNotExist:
        return Tag.objects.none()
    return tag

def get_tagged_photos(tag):
    ctype = ContentType.objects.get_for_model(Photo)
    return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)

def get_tagged_photos_count(tag):
    return get_tagged_photos(tag).count()

在我的task.py中,我喜欢将它们包装成任务(然后可以使用这些任务来完成更复杂的任务),因此我编写了这个装饰器:

import manager #the module within same app containing data access functions

class mfunc_to_task(object):
    def __init__(mfunc_type='get'):
        self.mfunc_type = mfunc_type

    def __call__(self, f):
        def wrapper_f(*args, **kwargs):
            callback = kwargs.pop('callback', None)

            mfunc = getattr(manager, f.__name__)

            result = mfunc(*args, **kwargs)
            if callback:
                if self.mfunc_type == 'get':
                    subtask(callback).delay(result)
                elif self.mfunc_type == 'get_or_create':
                    subtask(callback).delay(result[0])
                else:
                    subtask(callback).delay()
            return result            

        return wrapper_f

然后(仍在task.py):

#@task
@mfunc_to_task()
def get_tag():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos_count():
    pass

一切正常@task。但是,在应用了该@task装饰器之后(按照celery文档的指示放到顶部),事情就开始崩溃了。显然,每次mfunc_to_task.__call__调用时,task.get_tag都会传递与相同的函数f。所以我wrapper_f每次都得到相同的结果,现在我唯一要做的就是得到一个标签。


问题答案:

不太确定为什么传递参数无效?

如果使用此示例:

@task()
def add(x, y):
    return x + y

让我们向MyCoolTask​​添加一些日志记录:

from celery import task
from celery.registry import tasks

import logging
import celery

logger = logging.getLogger(__name__)

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        logger.info("Starting to run")
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        logger.info("Ending run")
        pass

并创建一个扩展类(扩展MyCoolTask​​,但现在带有参数):

class AddTask(MyCoolTask):

    def run(self,x,y):
        if x and y:
            result=add(x,y)
            logger.info('result = %d' % result)
            return result
        else:
            logger.error('No x or y in arguments')

tasks.register(AddTask)

并确保将kwargs作为json数据传递:

{"x":8,"y":9}

我得到结果:

[2019-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2019-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2019-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2019-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17


 类似资料:
  • Nest 是基于 装饰器 这种语言特性而创建的。在很多常用的编程语言中 装饰器 都是一个很大众的概念,但在 JavaScript 语言中这个概念却比较新。所以为了更好地理解装饰器是如何工作的,你应该看看 这篇 文章。下面给出一个简单的定义: ES2016 的装饰器是一个可以将目标对象,名称和属性描述符作为被修饰方法(returns function)的参数的表达式。你可以通过装饰器前缀 @ 来使用

  • 定义一个配置 在配置文件 .sbt这章已经将了如何定义个配置,大部分配置定义在Default中 配置有三种类型,其中SettingKey和TaskKey已经在配置文件 .sbt介绍了,InputKey在任务配置的输入章节介绍。 对于配置的一些例子: val scalaVersion = settingKey[String]("The version of Scala used for buildi

  • 问题 你想写一个装饰器来包装一个函数,并且允许用户提供参数在运行时控制装饰器行为。 解决方案 引入一个访问函数,使用 nonlocal 来修改内部变量。 然后这个访问函数被作为一个属性赋值给包装函数。 from functools import wraps, partial import logging # Utility decorator to attach a function as an

  • 问题内容: 我受够了在函数中不断重复输入相同的重复命令。我想知道我是否可以写一个装饰器为我做这项工作。这是我的问题的一个示例: 有什么方法可以自动将所有传递给函数的参数变成具有相同名称的实例变量?例如: 哪里会自动设置和。我该怎么办? 谢谢! 编辑:我应该提到我使用CPython 2.7。 问题答案: 这是我第一次尝试装饰器: 编辑第二尝试:我添加了处理变量的默认值和检查有效的关键字。 [编辑3:

  • 首先,我是卡蒙达的新手。。我在camunda中创建了一个自定义任务列表。我可以申请/取消申请任务等,这是可行的。 现在我想完成一项任务,但是当我打电话的时候: 似乎camunda希望在此上下文中进入下一步,而不是部署BPMN模式以及部署执行下一步所需的一切的上下文。所以我得到了“classNotFound”异常,因为我的customTasklist中没有相同的类。战争就像卡蒙达处理器一样。战争 我

  • 问题内容: 问题 我希望某些观点仅适用于网站的高级用户。 在项目中的各种应用程序中如何使用此装饰器? 问题答案: 你不必为此编写自己的装饰器,就像中已经包含的那样。 还有一个扩展此装饰器的代码段(),该代码段非常适合你的用例。 而且,要(重新)使用装饰器,只需将装饰器放在路径中的模块中,即可从任何其他模块导入它。