当前位置: 首页 > 知识库问答 >
问题:

计划Python脚本每小时准确运行一次

单于庆
2023-03-14

在我问之前,Cron作业和任务调度程序将是我的最后选择,这个脚本将在Windows和Linux中使用,我更喜欢有一个编码的方法来完成这个任务,而不是把它留给最终用户来完成。

有没有Python的库可以用来安排任务?我需要每小时运行一次函数,但是,随着时间的推移,如果我每小时运行一次脚本并使用. chat,“每小时一次”将在不同的时间运行由于执行/运行脚本和/或函数固有的延迟,一小时与前一天不同。

在不使用Cron作业或使用任务计划程序安排函数在一天中的特定时间(多次)运行的最佳方式是什么?

或者,如果这是不可能的,我也希望你的意见。

import datetime
import time
from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.daemonic = False
sched.start()

def job_function():
    print("Hello World")
    print(datetime.datetime.now())
    time.sleep(20)

# Schedules job_function to be run once each minute
sched.add_cron_job(job_function,  minute='0-59')

出:

>Hello World
>2014-03-28 09:44:00.016.492
>Hello World
>2014-03-28 09:45:00.0.14110

(摘自Animesh Pandey下面的回答)

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

@sched.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')

@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')

sched.configure(options_from_ini_file)
sched.start()

共有3个答案

柳项明
2023-03-14

对于apscheduler

对于apscheduler

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

@sched.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')

@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')

sched.configure(options_from_ini_file)
sched.start()

apscheduler文档。

这适用于Python 3.6.2上的apscheduler-3.3.1。

"""
Following configurations are set for the scheduler:

 - a MongoDBJobStore named “mongo”
 - an SQLAlchemyJobStore named “default” (using SQLite)
 - a ThreadPoolExecutor named “default”, with a worker count of 20
 - a ProcessPoolExecutor named “processpool”, with a worker count of 5
 - UTC as the scheduler’s timezone
 - coalescing turned off for new jobs by default
 - a default maximum instance limit of 3 for new jobs
"""

from pytz import utc
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor

"""
Method 1:
"""
jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

"""
Method 2 (ini format):
"""
gconfig = {
    'apscheduler.jobstores.mongo': {
        'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
}

sched_method1 = BlockingScheduler() # uses overrides from Method1
sched_method2 = BlockingScheduler() # uses same overrides from Method2 but in an ini format


@sched_method1.scheduled_job('interval', seconds=10)
def timed_job():
    print('This job is run every 10 seconds.')


@sched_method2.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
    print('This job is run every weekday at 10am.')


sched_method1.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
sched_method1.start()

sched_method2.configure(gconfig=gconfig)
sched_method2.start()
姬存
2023-03-14

每隔10分钟运行一次。

from datetime import datetime, timedelta

while 1:
    print 'Run something..'

    dt = datetime.now() + timedelta(hours=1)
    dt = dt.replace(minute=10)

    while datetime.now() < dt:
        time.sleep(1)
益富
2023-03-14

也许这可以帮助:高级Python调度程序

下面是他们文档中的一小段代码:

from apscheduler.schedulers.blocking import BlockingScheduler

def some_job():
    print "Decorated job"

scheduler = BlockingScheduler()
scheduler.add_job(some_job, 'interval', hours=1)
scheduler.start()
 类似资料:
  • 假设我想在2020年2月6日13:50运行一次作业。我该怎么做? 通过使用下面的表达式,我可以得出它将在今天13:50运行。但它明年也会运行。但我希望它只运行一次。

  • 我在python上写了一个小脚本,它从控制台调用命令行来Hibernatelinux机器(或者在一个单词被更改的情况下关闭自己),然后过一段时间醒来。该命令通过watch命令一次又一次地调用。 因此,在PC再次炒起20秒后再次调用rtcwake命令。我希望每次计算机唤醒时都运行另一个脚本。我已经有这个其他脚本,这是一个倒计时。我想这样做是为了向用户显示计算机再次关闭之前还剩下多少时间,但是每次计算

  • 我的域控制器上有一个小脚本,它被设置为通过SMTP向我发送有关最新安全事件4740的电子邮件。 手动执行时,脚本将按预期运行;但是,当设置为通过计划任务运行时,尽管它显示已执行,但没有任何反应(没有电子邮件)。 脚本如下: 计划任务设置如下: 我还尝试了以下方法: 根据任务历史记录:任务开始、行动开始、创建任务流程、行动完成、任务完成。我浏览了网站上一些有着相同“问题”的不同链接,但它们似乎都有一

  • 我有windows 2008任务调度程序,我设置了一个PHP脚本,以这样运行 C:\php\php。exe-f等。。。 在windows任务计划程序中,我只能每天或每小时计划一次如何将其配置为每4小时运行一次?

  • 问题内容: 如何运行多个python脚本?此刻我像这样跑一个。 我已经尝试过了,但是不起作用:仅运行第一个脚本。另外,我尝试使用这样的单个文件; 但是,这也不起作用。 问题答案: 使用Bash: 这就是整个脚本。它将同时运行两个Python脚本。 Python本身可以做同样的事情,但要花更多的时间输入,对于眼前的问题来说是一个不好的选择。 我认为尽管您采取错误的方法来解决问题很可能,但我想听听您的

  • 问题内容: 我有运行带有并行PLESK面板的Cent OS的专用服务器。我需要每秒运行一个PHP脚本来更新数据库。这些是时间上的替代方法,它需要每秒进行更新。 我可以使用URL找到我的脚本。 每秒可以在本地执行该文件吗?喜欢吗? 更新: 自我添加此问题以来已经过了几个月。我最终使用了以下代码: 我的cronjob设置为每分钟。我已经在测试环境中运行了一段时间,它运行良好。这真的非常快,而且我看不到