当前位置: 首页 > 面试题库 >

Django-每x秒运行一个函数

傅阳
2023-03-14
问题内容

我正在开发Django应用。我有一个API端点,如果需要,它必须执行必须重复几次的功能(直到满足特定条件为止)。我现在如何处理-

def shut_down(request):
  # Do some stuff
  while True:
    result = some_fn()
    if result:
      break
    time.sleep(2)

  return True

虽然我知道这是一种可怕的方法,并且我不应该阻塞2秒钟,但是我不知道该如何解决。
等待4秒后,此方法起作用。但是我想要使循环在后台运行并在some_fn返回True时停止的东西。(此外,可以肯定some_fn将返回True)

编辑-
阅读Oz123的响应给了我一个似乎可行的想法。这是我所做的-

def shut_down(params):
    # Do some stuff
    # Offload the blocking job to a new thread

    t = threading.Thread(target=some_fn, args=(id, ), kwargs={})
    t.setDaemon(True)
    t.start()

    return True

def some_fn(id):
    while True:
        # Do the job, get result in res
        # If the job is done, return. Or sleep the thread for 2 seconds before trying again.

        if res:
            return
        else:
            time.sleep(2)

这为我完成了工作。很简单,但我不知道与Django结合使用多线程的效率如何。
如果有人能指出这一点的缺陷,请多多指教。


问题答案:

对于许多小型项目来说,芹菜实在是太过分了。对于那些项目,您可以使用schedule,它非常易于使用。

使用此库,您可以使任何函数定期执行任务:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

该示例以阻塞方式运行,但是如果您查看FAQ,您会发现您还可以在并行线程中运行任务,以使您不会阻塞,并且一旦不再需要就删除该任务:

from schedule import Scheduler

def run_continuously(self, interval=1):
    """Continuously run, while executing pending jobs at each elapsed
    time interval.
    @return cease_continuous_run: threading.Event which can be set to
    cease continuous run.
    Please note that it is *intended behavior that run_continuously()
    does not run missed jobs*. For example, if you've registered a job
    that should run every minute and you set a continuous run interval
    of one hour then your job won't be run 60 times at each interval but
    only once.
    """

    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):

        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                self.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.setDaemon(True)
    continuous_thread.start()
    return cease_continuous_run


Scheduler.run_continuously = run_continuously

这是在类方法中使用的示例:

    def foo(self):
        ...
        if some_condition():
           return schedule.CancelJob  # a job can dequeue it

    # can be put in __enter__ or __init__
    self._job_stop = self.scheduler.run_continuously()

    logger.debug("doing foo"...)
    self.foo() # call foo
    self.scheduler.every(5).seconds.do(
        self.foo) # schedule foo for running every 5 seconds

    ...
    # later on foo is not needed any more:
    self._job_stop.set()

    ...

    def __exit__(self, exec_type, exc_value, traceback):
        # if the jobs are not stop, you can stop them
        self._job_stop.set()


 类似资料:
  • 我正在开发一个Android 2.3.3应用程序,我需要每X秒运行一个方法。 在iOS,我有NSTmer,但在Android我不知道用什么。 有人向我推荐了Handler;另一个人向我推荐了AramManager,但我不知道哪种方法更适合NSTmer。 这是我想在Android中实现的代码: 我需要像NSTmer这样的东西。 你给我推荐什么?

  • 问题内容: 我正在使用Python和PyGTK。我对运行某个功能感兴趣,该功能每隔几分钟从串行端口获取数据并保存一次。 当前,我正在时间库中使用sleep()函数。为了能够进行处理,我将系统设置如下: 这种设置使我从串口读取数据的间隔为5分钟。我的问题是我希望能够让我的readserial()函数每隔5分钟暂停一次,并且能够一直执行操作,而不是使用time.sleep()函数。 关于如何解决这个问

  • 我有一个函数,使API调用服务器和更新用户界面与新数据。我想使,使每30秒我使API调用不同的url每次?这些API调用应该是不间断的,只要应用程序正在运行。

  • 问题内容: 请查找每10秒钟重复一次的更新代码。但是,问题在于它每10秒在屏幕上创建一个新的GUI,而不是仅每10秒更新一次数据。请您指教 问题答案: 您只有一门课,Learningfromscrach。在其中,您具有主要功能。在主要功能内,您将创建 另一个 Learningfromscrach 实例 ,该 实例 现在传递给计时器,每10秒运行一次。当它在10秒内运行时,它会创建另一个Learni

  • 问题内容: 抱歉,我有点菜鸟,我只想知道我如何让此javascript每秒钟运行一次? 源代码: 问题答案: 使用setInterval()每x毫秒运行一段代码。 您可以将要每秒运行的代码包装到一个名为的函数中。 因此它将是: 要停止它,可以运行:

  • 问题内容: 我试图刷新一个表,因为其中的变量不断更新,我想每隔几秒钟重新更新一次。我已经通过为表指定一个ID并为其创建一个div来完成代码。该代码将解释我的实际情况。提前致谢 ! 编辑:添加了变量表和变量刷新器。 但是代码仍然不会重新加载我的表!有什么想法吗?? Test.php JS.php 问题答案: 我想用就是你要找的人 或缩短 如果您想停止刷新数据,例如,说用户长时间使页面处于打开状态 如