APScheduler调度框架

  • Post author:
  • Post category:其他


1、安装APScheduler

1.1 pip 安装

​ 首选的安装方法是使用

pip

$ pip install apscheduler

1.2 源码安装

$ python setup.py install

2、代码示例

​ 源代码分发包含的

examples

目录中可以找到许多以不同方式使用APScheduler的任务示例。

https://github.com/agronholm/apscheduler/tree/master/examples/?at=master

3、基本概念

3.1 触发器:triggers

​ 包含调度的逻辑。每项任务都有自己的触发器,它决定了接下来应该运行的任务。除了最初的配置之外,触发器是完全无状态的。

3.2 作业存储:job stores

​ 安排预定的任务。默认的作业存储只是将任务保存在内存中,但其他存储在各种数据库中。任务数据在保存到持久性任务存储时被序列化,并在从任务存储加载时反序列化。作业存储(不是默认存储)不会将任务数据保存在内存中,而是作为中间人在后端保存,加载,更新和搜索任务。作业存储绝不能在调度程序之间共享。

3.3 执行器:executors

​ 是处理任务运行的东西。他们通常通过将任务中指定的可调用对象提交给线程或进程池来完成此操作。任务完成后,执行程序通知调度程序,然后发出适当的事件。

3.4 调度器:schedulers

​ 将其余的组合在一起。您通常只有一个调度程序在您的应用程序中运行。应用程序开发人员通常不直接处理任务存储,执行程序或触发器。相反,调度程序提供了适当的接口来处理所有这些。配置任务存储和执行程序是通过调度程序完成的,就像添加,修改和删除任务一样。

4、如何选择?

​ 如何选择正确(合适)的调度器,作业存储,执行器和触发器?

​ 调度程序的选择主要取决于编程环境以及使用APScheduler进行的操作。以下是选择调度程序的快速指南:

调度方式 备注
BlockingScheduler 仅可用在当前你的进程之内,与当前的进行共享计算资源
BackgroundScheduler 在后台运行调度,不影响当前的系统计算运行
AsyncIOScheduler 如果当前系统中使用了async module,则需要使用异步的调度器
GeventScheduler 如果使用了gevent,则需要使用该调度
TornadoScheduler 如果使用了Tornado, 则使用当前的调度器
TwistedScheduler Twister应用的调度器
QtScheduler Qt的调度器

​ 要选择适当的任务存储,您需要确定是否需要任务持久化。如果您始终在应用程序开始时重新创建任务,则可以使用默认值(


MemoryJobStore


)。但是,如果您需要自己的任务坚持调度程序重新启动或应用程序崩溃,那么您的选择通常归结为编程环境中使用的工具。但是,如果你在位置自由选择,然后


SQLAlchemyJobStore




PostgreSQL的

后端是推荐的选择,因为它强大的数据完整性保护。

​ 同样,如果您使用上述框架之一,则通常会为您选择执行器。否则,


ThreadPoolExecutor


对于大多数目的而言,默认值应该足够好。如果您的任务负载涉及CPU密集型操作,则应考虑使用


ProcessPoolExecutor


多个CPU内核。您甚至可以同时使用这两种方法,将进程池执行程序添加为辅助执行程序。

​ 当您安排任务时,您需要为其选择一个

触发器

,确定任务运行时计算日期/时间的逻辑。

4.1 三种内置触发器

4.1.1 date

  • 在某个特定时间只运行一次时使用

参数 类型 备注
run_date datetime|str the date/time to run the job at
timezone datetime.tzinfo|str time zone for run_date if it doesn’t have one already
  • date 样例

# The job will be executed on November 6th, 2009
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
​
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

4.1.2 interval

  • 固定的时间间隔运行任务时使用

参数 类型 备注
seconds int number of seconds to wait
minutes int number of minutes to wait
hours int number of hours to wait
days int number of days to wait
weeks int number of weeks to wait
start_date datetime|str starting point for the interval calculation
end_date datetime|str latest possible date/time to trigger on
timezone datetime.tzinfo|str time zone to use for the date/time calculations
  • interval 样例

# Schedule job_function to be called every two hours
sched.add_job(job_function, 'interval', hours=2)
​
# Schedule job_function to be called every one weeks
sched.add_job(job_function, 'interval', weeks=1)

4.1.3 cron

​ 当您想要在某个特定时间定期运行任务时使用,也可以将多个触发器组合成一个触发器,或者在所有参与触发器同意的时间触发,或者触发器中的任何触发器触发。

  • 参数说明

参数 类型 备注

year
int|str 4-digit year

month
int|str month (1-12)

day
int|str day of the (1-31)

week
int|str ISO week (1-53)

day_of_week
int|str number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)

hour
int|str hour (0-23)

minute
int|str minute (0-59)

second
int|str second (0-59)

start_date
datetime|str earliest possible date/time to trigger on (inclusive)

end_date
datetime|str latest possible date/time to trigger on (inclusive)

timezone
datetime.tzinfo|str time zone to use for the date/time calculations (defaults to scheduler timezone)

和Linux的Crontab一样,它的值格式为

  • cron使用样例

# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function,'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
​
# Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

5、配置调度程序

​ APScheduler提供了许多不同的方式来配置调度程序。您可以使用配置字典,也可以传递选项作为关键字参数。您也可以先实例化调度程序,然后添加任务并配置调度程序。这样你可以在任何环境下获得最大的灵活性。

​ 调度程序级别配置选项的完整列表可以在


BaseScheduler


该类的API参考中找到 。调度程序子类也可能有其他选项,这些选项记录在其各自的API参考中。各个任务存储和执行程序的配置选项同样可以在其API参考页面上找到。

​ 比方说,您希望在应用程序中使用默认任务存储和默认执行程序运行BackgroundScheduler:

from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# Initialize the rest of the application here, or before the scheduler initialization

​ 这将为您带来一个BackgroundScheduler,其中一个名为“default”的MemoryJobStore和一个名为“default”的ThreadPoolExecutor的默认最大线程数为10。

5.1 样例

​ 现在,假设你想要更多,您希望有两个使用两个执行器的作业存储,并且您还想调整新任务的默认值并设置不同的时区。以下三个例子完全相同,并会为您提供:

  • 一个名为“mongo”的MongoDBJobStore

  • 一个名为“default”的SQLAlchemyJobStore(使用SQLite)

  • 一个名为“default”的ThreadPoolExecutor,进程数为20

  • 一个名为“processpool”的ProcessPoolExecutor,进程数为5

  • UTC作为调度程序的时区

  • 默认情况下,合并关闭以用于新任务

  • 新任务的默认最大实例限制为3

5.1.1 样例1

from pytz import utc
​
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
​
jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

5.1.1 样例2

from apscheduler.schedulers.background import BackgroundScheduler
​
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

5.1.1 样例3

from pytz import utc
​
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
​
jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()
​
# .. do something else here, maybe add jobs etc.
​
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

6、启动调度程序

​ 通过调用调度程序来启动调度


start()


程序。

对于除

apscheduler.schedulers.blocking.BlockingScheduler

以外的调度程序,此调用将立即返回,您可以继续应用程序的初始化过程,可能会将任务添加到调度程序。

​ 对于BlockingScheduler,您只需


start()


在完成任何初始化步骤后再执行 。

  • 注意

​调度程序启动后,您不能再更改其设置。

7、添加任务

有两种方法可以将任务添加到调度程序中:

  1. 通过调用


    add_job()

  2. 通过装饰函数


    scheduled_job()

    第一种方法是最常用的方法。

    第二种方式主要是方便地声明在应用程序运行时不会更改的任务。该


    add_job()


    方法返回一个


    apscheduler.job.Job


    实例,您可以稍后使用该 实例修改或删除该任务。

    您可以**在任何时候**安排调度任务。如果在添加任务时调度程序尚未运行,则会*暂时*安排任务,并且仅在调度程序启动时计算其第一个运行时间。需要注意的是,如果您使用连续执行任务的执行程序或任务存储,它会在您的任务中添加一些要求:
  • 目标可调用必须是全局可访问的

  • 可调用的任何参数都必须是可序列化的

    内建任务存储中,只有MemoryJobStore不会序列化任务。内置执行程序中,只有ProcessPoolExecutor会序列化任务。


重要

​ 如果您在应用程序初始化过程中将任务安排在持久任务存储区中,则

必须

为该任务定义一个明确的ID并使用,

replace_existing=True

或者每次重新启动应用程序时都会得到新任务副本!


建议

​ 要立即运行任务,请

trigger

在添加任务时省略参数。

8、删除任务

​ 当您从调度程序中删除任务时,该任务将从相关的任务存储中删除,并且不会再执行。有两种方法可以做到这一点:

  • 通过


    remove_job()


    与任务的ID和任务存储别名调用

  • 通过调用


    remove()


    你从中获得的Job实例


    add_job()

    后一种方法可能更方便,但它要求


    Job


    您在添加任务时将您收到的实例存储在某个地方 。对于通过该计划安排的任务


    scheduled_job()


    ,第一种方法是唯一的方法。

    如果任务的时间表结束(即其触发器不会产生任何进一步的运行时间),它会自动删除。

8.1 样例1

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
​
# 使用指定的任务ID(添加任务时,自定义)
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

9、暂停和恢复任务

​ 您可以通过


Job


实例或调度程序本身暂停和恢复任务。任务暂停时,下一次运行时间将被清除,并且不会再计算运行时间,直到任务恢复。要暂停任务,请使用以下任一方法

9.1 样例1

# 暂停任务
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
​
# 恢复
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

10、获取计划任务的列表

​ 要获得计划任务的机器可处理列表,您可以使用


get_jobs()


方法。它会返回一个


Job


实例列表 。如果您只对特定作业存储中包含的任务感兴趣,那么请提供作业存储别名作为第二个参数。

​ 为了方便起见,您可以使用


print_jobs()


打印格式化的任务列表,触发器和下次运行时间的方法。

11、修改任务

11.1 样例1

# 您可以通过调用 apscheduler.job.Job.modify()或modify_job()
# 来修改任意的(除了任务属性id的)任务属性。
job.modify(max_instances=6, name='Alternate name')

11.2 样例2

# 如果您想重新计划任务 - 即更改其触发器,则可以使用 
# apscheduler.job.Job.reschedule() 或 reschedule_job()。
# 这些方法为任务构建新的触发器,并基于新的触发器重新计算其下一次运行时间。
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

12、关闭调度程序

12.1 关闭调度程序

scheduler.shutdown()

​ 默认情况下,调度程序关闭其任务存储和执行程序,并等待所有当前正在执行的任务完成。如果你不想等,你可以这样做:

scheduler.shutdown(wait=False)

​ 这仍将关闭任务存储和执行程序,但不会等待任何正在运行的任务完成。

13、暂停/恢复任务处理

13.1 暂停预定任务

scheduler.pause()

13.2 恢复

​ 这将导致调度程序在处理恢复之前不会唤醒

scheduler.resume()

13.3 启动暂停的调度程序

​ 也可以在暂停状态下启动调度程序,也就是说,不需要第一次唤醒调用:

scheduler.start(paused=True)

​ 当您需要在有机会运行之前修剪不需要的任务时,这非常有用。

14、同时执行任务的实例的数量限制

14.1 max_instances

​ 默认情况下,每个任务只允许同时运行一个实例。这意味着如果任务即将开始,但之前的运行尚未完成,那么最新的运行被认为是失火。

​ 可以通过

max_instances

在添加任务时使用关键字参数来设置调度程序允许并发运行的特定任务的最大实例数。

15、错过的任务执行和合并

​ 有时调度程序可能无法在计划运行时执行预定任务。最常见的情况是在持久任务存储中安排任务,并且计划程序在任务执行后关闭并重新启动。发生这种情况时,该任务被认为是“失误”。然后,调度程序将根据任务的

misfire_grace_time

选项(可以在每个任务上设置或在调度程序中全局设置)检查每个错过的执行时间,以查看是否仍应触发执行。这可能导致连续几次执行的任务。

​ 如果这种行为不适合您的特定用例,则可以使用合并将所有这些错过的执行合并为一个。换句话说,如果为任务启用了合并,并且调度程序看到该任务的一个或多个排队执行,则只会触发一次。“绕过”运行不会发生失火事件。

注意

​ 如果由于池中没有线程或进程可用而导致任务执行延迟,执行程序可能会由于运行太迟(与其原先指定的运行时间相比)而跳过它。如果这可能发生在您的应用程序中,您可能希望增加执行程序中的线程/进程数,或者将

misfire_grace_time

设置调整为更高的值。

16、调度程序事件

​ 可以将事件监听器附加到调度程序。调度程序事件在某些情况下被触发,并且可能会在其中携带关于该特定事件的详细信息的额外信息。通过给出适当的

mask

参数


add_listener()


,可以只听特定类型的事件 ,或者将不同的常量放在一起。可调用的监听器使用一个参数即事件对象进行调用。

​ 有关


events


可用事件及其属性的详细信息,请参阅模块的文档。

http://apscheduler.readthedocs.io/en/v3.5.1/modules/events.html#module-apscheduler.events

16.1 样例

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')
​
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

17、故障排除

​ 如果调度程序未按预期任务,则将记录

apscheduler

器的日志记录级别提高到该

DEBUG

级别将会有所帮助 。

​ 如果您还没有首先启用日志记录,则可以这样做:

import logging
​
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

​ 这应该提供很多关于调度程序内部正在进行的有用信息。



版权声明:本文为a_jie_2016_05原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。