Python 定时任务(apscheduler模块)

  • Post author:
  • Post category:python


APScheduler (advanceded python scheduler)是一款Python开发的定时任务工具。支持异步执行、后台执行调度任务。


APScheduler 支持三种调度任务:


  • 固定时间间隔

  • 固定时间点(日期)

  • Linux 下的 Crontab 命令


安装

pip install apscheduler

使用方式

新建一个 schedulers (调度器) 。
添加一个调度任务(job stores)。
运行调度任务。

schedulers(调度器)

APScheduler 非常好用的原因。它提供 7 种调度器,能够满足我们各种场景的需要。例如:后台执行某个操作,异步执行操作等。调度器分别是:


  • BlockingScheduler

    : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。

  • BackgroundScheduler

    : 调度器在后台线程中运行,

    不会阻塞当前线程

    。在框架程序(如Django、Flask)中使用
  • AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
  • GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 GeventExecutor 配合使用。
  • TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
  • TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。
  • QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。

执行器 executors

在定时任务该执行时,以进程或线程方式执行任务

  • ThreadPoolExecutor
  from apscheduler.executors.pool import ThreadPoolExecutor
  ThreadPoolExecutor(max_workers)  
  ThreadPoolExecutor(20) # 最多20个线程同时执行

使用方法

  executors = {
      'default': ThreadPoolExecutor(20)
  }
  scheduler = BackgroundScheduler(executors=executors)
  • ProcessPoolExecutor
  from apscheduler.executors.pool import ProcessPoolExecutor
  ProcessPoolExecutor(max_workers)
  ProcessPoolExecutor(5) # 最多5个进程同时执行

使用方法

  executors = {
      'default': ProcessPoolExecutor(3)
  }
  scheduler = BackgroundScheduler(executors=executors)

触发器 Trigger

APScheduler 有三种内建的 trigger:


date 触发器

date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:

参数 说明
run_date (datetime 或 str) 作业的运行日期或时间
timezone (datetime.tzinfo 或 str) 指定时区


interval 触发器

固定时间间隔触发。interval 间隔调度,参数如下:

参数 说明
weeks (int) 间隔几周
days (int) 间隔几天
hours (int) 间隔几小时
minutes (int) 间隔几分钟
seconds (int) 间隔多少秒
start_date (datetime 或 str) 开始日期
end_date (datetime 或 str) 结束日期
timezone (datetime.tzinfo 或str) 时区


cron 触发器


在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。

我们先了解 cron 参数:

参数 说明
year (int 或 str) 年,4位数字
month (int 或 str) 月 (范围1-12)
day (int 或 str) 日 (范围1-31
week (int 或 str) 周 (范围1-53)
day_of_week (int 或 str) 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str) 时 (范围0-23)
minute (int 或 str) 分 (范围0-59)
second (int 或 str) 秒 (范围0-59)
start_date (datetime 或 str) 最早开始日期(包含)
end_date (datetime 或 str) 最晚结束时间(包含)
timezone (datetime.tzinfo 或str) 指定时区

这些参数是支持算数表达式,取值格式有如下:

作业存储(job store)

有两种添加方法,其中一种上述代码用到的

add_job()

, 另一种则是

scheduled_job()

修饰器来修饰函数。

这个两种办法的区别是:第一种方法返回一个

apscheduler.job.Job

的实例,可以用来改变或者移除 job。第二种方法只适用于应用运行期间不会改变的 job。

第二种添加任务方式的例子:

import datetime
from apscheduler.schedulers.background import BackgroundScheduler
 
@scheduler.scheduled_job(job_func, 'interval', minutes=2)
def job_func(text):
    print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
 
scheduler = BackgroundScheduler()
scheduler.start()

移除 job

移除 job 也有两种方法:

remove_job()



job.remove()




remove_job()

是根据 job 的 id 来移除,所以要在 job 创建的时候指定一个 id。


job.remove()

则是对 job 执行 remove 方法即可

scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
scheduler.remove_job(job_one)
 
job = add_job(job_func, 'interval', minutes=2, id='job_one')
job.remvoe()


获取 job 列表


通过

scheduler.get_jobs()

方法能够获取当前调度器中的所有 job 的列表


修改 job


如果你因计划改变要对 job 进行修改,可以使用

Job.modify()

或者

modify_job()

方法来修改 job 的属性。但是值得注意的是,job 的 id 是无法被修改的。

scheduler.add_job(my_job, 'interval', minutes=10, id='one')
scheduler.start()
# 将触发时间间隔修改成 5分钟
scheduler.modify_job('one', minutes=5)


关闭 job


默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将 wait 选项设置为 False。

scheduler.shutdown()
scheduler.shutdown(wait=false)

持久化存储

存要调度的任务,其中除了默认的作业存储是把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据将在保存在持久化的作业存储之前,会对作业执行序列化操作,当重新读取作业时,再执行反序列化操作。(

下面第二部分代码就是用的redis存储

)

目前APScheduler支持的Jobstore:

  • MemoryJobStore
  • MongoDBJobStore
  • RedisJobStore
  • RethinkDBJobStore
  • SQLAlchemyJobStore
  • ZooKeeperJobStore

使用:

from apscheduler.schedulers.background import BackgroundScheduler
 
# 创建定时任务的调度器对象
scheduler = BackgroundScheduler()
 
# 定义定时任务
def my_job(p1):
    pass
 
# 向调度器中添加定时任务
scheduler.add_job(my_job, 'date', args=[10])
 
# 在 2020-12-13 时刻运行一次 job_func 方法
scheduler .add_job(my_job, 'date', run_date=date(2020, 12, 13), args=['text'])
 
# 每隔两分钟执行一次 job_func 方法
scheduler .add_job(my_job, 'interval', minutes=2)
 
# 在 2020-12-13 14:00:01 ~ 2020-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法
scheduler .add_job(my_job, 'interval', minutes=2, start_date='2020-12-13 14:00:01' , end_date='2020-12-13 14:00:10')
 
# 在每年 1-37-9 月份中的每个星期一、二中的 00:00, 01:00, 02:0003:00 执行 job_func 任务
scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')
# 启动定时任务调度器工作
scheduler.start()
 
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask, make_response
 
app = Flask(__name__)
 
jobstores = {
    # 用redis作backend
    'redis': RedisJobStore(),
}
executors = {
    'default': ThreadPoolExecutor(10),#默认线程数
    'processpool': ProcessPoolExecutor(3)#默认进程
}
conf = {
        "host": "127.0.0.1",
        "port": 6379,
        "db": 0,
        "max_connections": 10
      }
job_defaults = {
        'coalesce': False,
        'max_instances': 3
        }
#coalesce:累计的 任务是否执行。True不执行,False,执行。
 #同上,由于某种原因,比如进场挂了,导致任务多次没有调用,则前几次的累计任务的任务是否执行的策略。
# max_instances:同一个任务在线程池中最多跑的实例数。
def my_job():
    print('定时任务')
 
sched = BackgroundScheduler(timezone='MST', jobstores=jobstores, executors=executors)
# 添加redis为作业存储
sched.add_jobstore(jobstore="redis", **conf)
sched.add_job(my_job, 'interval', id='3_second_job', seconds=3,misfire_grace_time=60)
#misfire_grace_time:超过用户设定的时间范围外,该任务依旧执行的时间(单位时间s)。比如用户设置#misfire_grace_time=60,于12:00触发任务。由于某种原因在12:00没有触发,被延时了。如果时间在12:01内,该任务仍能触发,超过3:01任务不执行
 
@app.route('/start')
def ds():
    sched.start()
    sched.remove_job('3_second_job')  # 删除任务
    sched.pause()  # 暂定任务
    sched.resume()  # 恢复任务
    return 'ok'
 
 
@app.route('/ssst')
def st():
    sched.start()
    return 'ok'
 
if __name__ == '__main__':
 
    app.run(host="0.0.0.0", port=5000)