python-定时任务-apschelduer

  • Post author:
  • Post category:python


python-定时任务-apschelduer

1.      apscheduler

1.1.    install

pip install apscheduler

1.2.    basic concepts

APScheduler has four kinds of components:

  • triggers
  • job stores
  • executors
  • schedulers

Your choice of scheduler depends mostly on your programming environment and what you’ll be using APScheduler for. Here’s a quick guide for choosing a scheduler:

需要注意的是前两种方式,一为阻塞型,一为非阻塞型。具体差别见后文。

job stores主要影响作业持久化,一般情况下使用默认方式default (

MemoryJobStore

)足够了,如果需要持久化,可能就得需要数据库支持了,例如SQLAlchemyJobStore。

exeutors主要有

ThreadPoolExecutor



ProcessPoolExecutor

,默认是前者,一般够用,除非是cpu密集型作业。

APScheduler comes with three built-in trigger types:



  • date


    : use when you want to run the job just once at a certain point of time


  • interval


    : use when you want to run the job at fixed intervals of time


  • cron


    : use when you want to run the job periodically at certain time(s) of day

1.2.1.   示例

schedule.

add_job

(func

=

job1,trigger

=’interval’

, seconds

=

1)

schedule.

add_job

(func

=

job2, args

=

(

‘lierl’

,), trigger

=’date’

, next_run_time

=

datetime.datetime.

now

()

+

datetime.

timedelta

(seconds

=

5))

schedule.

add_job

(func

=

job1, trigger

=’cron’

, month

=’1,3,5,7-9′

, day

=’*’

, hour

=’14’

, minute

=’*’

)

1.3.    scheduler manage

scheduler.start()

scheduler.shutdown()

scheduler.pause()

This will cause the scheduler to not wake up until processing is resumed:

scheduler.resume()

It is also possible to start the scheduler in paused state, that is, without the first wakeup call:

scheduler.start(paused=True)

1.4.    job administer add /remove/pause/resume/list/modify

There are two ways to add jobs to a scheduler:

  1. by calling


    add_job()

  2. by decorating a function with


    scheduled_job()

When you remove a job from the scheduler, it is removed from its associated job store and will not be executed anymore. There are two ways to make this happen:

  1. by calling


    remove_job()


    with the job’s ID and job store alias
  2. by calling


    remove()


    on the Job instance you got from


    add_job()

You can easily pause and resume jobs through either the

Job

instance or the scheduler itself. When a job is paused, its next run time is cleared and no further run times will be calculated for it until the job is resumed. To pause a job, use either method:

To resume:

获取当前任务列表

  1. get_jobs():It will return a list of Job instances. If you’re only interested in the jobs contained in a particular job store, then give a job store alias as the second argument.
  2. print_jobs():will print out a formatted list of jobs, their triggers and next run times.

2.      基本使用

2.1.    阻塞/非阻塞

下面是一个非阻塞型的定时任务调度。


#sys



import

threading


import

apscheduler


import

time, datetime


from

apscheduler.schedulers.blocking

import

BlockingScheduler


from

apscheduler.schedulers.background

import

BackgroundScheduler

para = [1,2,3,4]

continue_run =

True

def

job1():

print(

‘job1’

)

print(threading.current_thread())

print(time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime()))


def

job2(*args):


global

para

print(

‘job2’

, para)

print(threading.current_thread())

print(time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime()))


def

stop_runing():


global

continue_run

continue_run =

False



#sch = BlockingScheduler()


sch = BackgroundScheduler()

sch.add_job(job1,

‘interval’

, seconds=5)

sch.add_job(job2,

‘interval’

, seconds=8)

sch.add_job(stop_runing,

‘date’

, run_date=

‘2019-6-16 12:25:00’

,)

jl = sch.get_jobs()

#[<Job (id=66307271d51f451491fd7bf8e8ebfc47 name=job1)>, <Job (id=af6ddd2c0ed94df58d889637cb7b816d name=job2)>]


sch.print_jobs()

print(

‘main thread:’

, threading.current_thread())

print(

‘before scheduler:’

, time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime()))

sch.start()


while

continue_run:

print(

‘main’

)

time.sleep(10)

print(

‘program ending.’

)

2.2.    执行时间问题


import

threading


import

apscheduler


import

time, datetime


from

apscheduler.schedulers.blocking

import

BlockingScheduler


from

apscheduler.schedulers.background

import

BackgroundScheduler

continue_run =

True

def

aps_schduler():

para = [1,2,3,4]


def

job1():

print(

‘job1’

)

print(threading.current_thread())

print(time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime()))


def

job2(*args, ):

print(

‘job2’

, args)

print(threading.current_thread())

print(time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime()))


def

stop_runing():


global

continue_run

continue_run =

False



#sch = BlockingScheduler()


sch = BackgroundScheduler()


#


添加定时任务


sch.add_job(job1,

‘interval’

, seconds=5)

sch.add_job(job2,

‘interval’

, seconds=8, args=[

‘text’

])


# 1


分钟后执行stop_running


sch.add_job(stop_runing,

‘date’

, run_date=time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime(time.time()+60)))

jl = sch.get_jobs()

#[<Job (id=66307271d51f451491fd7bf8e8ebfc47 name=job1)>, <Job (id=af6ddd2c0ed94df58d889637cb7b816d name=job2)>]


sch.print_jobs()

print(

‘main thread:’

, threading.current_thread())

print(

‘before scheduler:’

, time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime()))

sch.start()


while

continue_run:

print(

‘main’

)

time.sleep(10)

print(

‘program ending.’

)

aps_schduler()

2.3.    超时问题

如果使用非阻塞模式,不存在超时问题,每次都会启一个新线程,直到达到max_instances给出的限制。

这里把实例限制设为1:

Execution of job “aps_schduler.<locals>.job1 (trigger: interval[0:00:03], next run at: 2019-06-16 16:19:35 CST)” skipped: maximum number of running instances reached (1)

每次调度启动任务时冲突,会抛出一个异常信息,但不会终止执行;否则正常进行。

示例代码:


import

threading


import

apscheduler


import

time, datetime


from

apscheduler.schedulers.blocking

import

BlockingScheduler


from

apscheduler.schedulers.background

import

BackgroundScheduler

continue_run =

True

def

aps_schduler():

para = [1,2,3,4]

count_x = 1


def

job1():


nonlocal

count_x

count_x +=1

print(

‘job1’

)

print(threading.current_thread())

print(time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime()))

time.sleep(15)

print(count_x)


def

stop_runing():


global

continue_run

continue_run =

False


sch = BackgroundScheduler()


#


添加定时任务


sch.add_job(job1,

‘interval’

, seconds=3, max_instances=1)


# 1


分钟后执行stop_running


sch.add_job(stop_runing,

‘date’

, run_date=time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime(time.time()+60)))

print(

‘main thread:’

, threading.current_thread())

print(

‘before scheduler:’

, time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime()))

sch.start()


while

continue_run:

print(

‘main’

)

time.sleep(5)

print(

‘program ending.’

)

aps_schduler()

2.4.    首次执行时间问题

与twisted的定时任务类似,它也不会在一开始就执行;

除非指定:

sch.add_job(job1,

‘interval’

, seconds=5, max_instances=1, next_run_time=datetime.datetime.now())

当然,在开始定时任务前手动执行一次也是可行的。

2.5.    其它设置

任务多实例

add_job有max_instances参数可以控制多任务实例

3.      api

3.1.    date apscheduler.triggers.date


from


datetime


import

date


from


apscheduler.schedulers.blocking


import

BlockingScheduler

sched = BlockingScheduler()


def

my_job(text):

print(text)


# The job will be executed on November 6th, 2009

sched.add_job(my_job, ‘date’, run_date=date(2009, 11, 6), args=[‘text’])

sched.start()

You can specify the exact time when the job should be run:


# The job will be executed on November 6th, 2009 at 16:30:05

sched.add_job(my_job, ‘date’, run_date=datetime(2009, 11, 6, 16, 30, 5), args=[‘text’])

The run date can be given as text too:

sched.add_job(my_job, ‘date’, run_date=’2009-11-06 16:30:05′, args=[‘text’])

To add a job to be run immediately:


# The ‘date’ trigger and datetime.now() as run_date are implicit

sched.add_job(my_job, args=[‘text’])

4.      参考文档

参考文档:


https://apscheduler.readthedocs.io/en/latest/userguide.html#starting-the-scheduler

5.      testing code

5.1.    example1


import

threading


import

apscheduler


import

time, datetime


from

apscheduler.schedulers.blocking

import

BlockingScheduler


from

apscheduler.schedulers.background

import

BackgroundScheduler

continue_run =

True

def

aps_schduler():

para = [1,2,3,4]

count_x = 1


def

job1():


nonlocal

count_x

count_x +=1

print(

‘job1’

)

print(threading.current_thread())

print(time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime()))


#time.sleep(15)


print(count_x)


def

stop_runing():


global

continue_run

continue_run =

False


sch = BackgroundScheduler()


#


添加定时任务


sch.add_job(job1,

‘interval’

, seconds=5, max_instances=1, next_run_time=datetime.datetime.now())


# 1


分钟后执行stop_running


sch.add_job(stop_runing,

‘date’

, run_date=time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime(time.time()+60)))

print(

‘main thread:’

, threading.current_thread())

print(

‘before scheduler:’

, time.strftime(

‘%Y-%m-%d %H:%M:%S’

, time.localtime()))

sch.start()


while

continue_run:


#print(‘main’)


time.sleep(5)

print(

‘program ending.’

)

aps_schduler()

转载于:https://www.cnblogs.com/wodeboke-y/p/11049788.html