python-定时任务-apschelduer
1. apscheduler
1.1. install
pip install apscheduler
1.2. basic concepts
APScheduler has four kinds of components:
- triggers
- job stores
- executors
- schedulers
Your choice of scheduler depends mostly on your programming environment and what you’ll be using APScheduler for. Here’s a quick guide for choosing a scheduler:
-
BlockingScheduler
: use when the scheduler is the only thing running in your process -
BackgroundScheduler
: use when you’re not using any of the frameworks below, and want the scheduler to run in the background inside your application -
AsyncIOScheduler
: use if your application uses the asyncio module -
GeventScheduler
: use if your application uses gevent -
TornadoScheduler
: use if you’re building a Tornado application -
TwistedScheduler
: use if you’re building a Twisted application -
QtScheduler
: use if you’re building a Qt application
需要注意的是前两种方式,一为阻塞型,一为非阻塞型。具体差别见后文。
job stores主要影响作业持久化,一般情况下使用默认方式default (
MemoryJobStore
)足够了,如果需要持久化,可能就得需要数据库支持了,例如SQLAlchemyJobStore。
exeutors主要有
ThreadPoolExecutor
,
ProcessPoolExecutor
,默认是前者,一般够用,除非是cpu密集型作业。
APScheduler comes with three built-in trigger types:
-
date
: use when you want to run the job just once at a certain point of time -
interval
: use when you want to run the job at fixed intervals of time -
cron
: use when you want to run the job periodically at certain time(s) of day
1.2.1. 示例
schedule.
add_job
(func
=
job1,trigger
=’interval’
, seconds
=
1)
schedule.
add_job
(func
=
job2, args
=
(
‘lierl’
,), trigger
=’date’
, next_run_time
=
datetime.datetime.
now
()
+
datetime.
timedelta
(seconds
=
5))
schedule.
add_job
(func
=
job1, trigger
=’cron’
, month
=’1,3,5,7-9′
, day
=’*’
, hour
=’14’
, minute
=’*’
)
1.3. scheduler manage
scheduler.start()
scheduler.shutdown()
scheduler.pause()
This will cause the scheduler to not wake up until processing is resumed:
scheduler.resume()
It is also possible to start the scheduler in paused state, that is, without the first wakeup call:
scheduler.start(paused=True)
1.4. job administer add /remove/pause/resume/list/modify
There are two ways to add jobs to a scheduler:
-
by calling
add_job()
-
by decorating a function with
scheduled_job()
When you remove a job from the scheduler, it is removed from its associated job store and will not be executed anymore. There are two ways to make this happen:
-
by calling
remove_job()
with the job’s ID and job store alias -
by calling
remove()
on the Job instance you got from
add_job()
You can easily pause and resume jobs through either the
Job
instance or the scheduler itself. When a job is paused, its next run time is cleared and no further run times will be calculated for it until the job is resumed. To pause a job, use either method:
To resume:
获取当前任务列表
- get_jobs():It will return a list of Job instances. If you’re only interested in the jobs contained in a particular job store, then give a job store alias as the second argument.
- print_jobs():will print out a formatted list of jobs, their triggers and next run times.
2. 基本使用
2.1. 阻塞/非阻塞
下面是一个非阻塞型的定时任务调度。
#sys
import
threading
import
apscheduler
import
time, datetime
from
apscheduler.schedulers.blocking
import
BlockingScheduler
from
apscheduler.schedulers.background
import
BackgroundScheduler
para = [1,2,3,4]
continue_run =
True
def
job1():
print(
‘job1’
)
print(threading.current_thread())
print(time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime()))
def
job2(*args):
global
para
print(
‘job2’
, para)
print(threading.current_thread())
print(time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime()))
def
stop_runing():
global
continue_run
continue_run =
False
#sch = BlockingScheduler()
sch = BackgroundScheduler()
sch.add_job(job1,
‘interval’
, seconds=5)
sch.add_job(job2,
‘interval’
, seconds=8)
sch.add_job(stop_runing,
‘date’
, run_date=
‘2019-6-16 12:25:00’
,)
jl = sch.get_jobs()
#[<Job (id=66307271d51f451491fd7bf8e8ebfc47 name=job1)>, <Job (id=af6ddd2c0ed94df58d889637cb7b816d name=job2)>]
sch.print_jobs()
print(
‘main thread:’
, threading.current_thread())
print(
‘before scheduler:’
, time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime()))
sch.start()
while
continue_run:
print(
‘main’
)
time.sleep(10)
print(
‘program ending.’
)
2.2. 执行时间问题
import
threading
import
apscheduler
import
time, datetime
from
apscheduler.schedulers.blocking
import
BlockingScheduler
from
apscheduler.schedulers.background
import
BackgroundScheduler
continue_run =
True
def
aps_schduler():
para = [1,2,3,4]
def
job1():
print(
‘job1’
)
print(threading.current_thread())
print(time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime()))
def
job2(*args, ):
print(
‘job2’
, args)
print(threading.current_thread())
print(time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime()))
def
stop_runing():
global
continue_run
continue_run =
False
#sch = BlockingScheduler()
sch = BackgroundScheduler()
#
添加定时任务
sch.add_job(job1,
‘interval’
, seconds=5)
sch.add_job(job2,
‘interval’
, seconds=8, args=[
‘text’
])
# 1
分钟后执行stop_running
sch.add_job(stop_runing,
‘date’
, run_date=time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime(time.time()+60)))
jl = sch.get_jobs()
#[<Job (id=66307271d51f451491fd7bf8e8ebfc47 name=job1)>, <Job (id=af6ddd2c0ed94df58d889637cb7b816d name=job2)>]
sch.print_jobs()
print(
‘main thread:’
, threading.current_thread())
print(
‘before scheduler:’
, time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime()))
sch.start()
while
continue_run:
print(
‘main’
)
time.sleep(10)
print(
‘program ending.’
)
aps_schduler()
2.3. 超时问题
如果使用非阻塞模式,不存在超时问题,每次都会启一个新线程,直到达到max_instances给出的限制。
这里把实例限制设为1:
Execution of job “aps_schduler.<locals>.job1 (trigger: interval[0:00:03], next run at: 2019-06-16 16:19:35 CST)” skipped: maximum number of running instances reached (1)
每次调度启动任务时冲突,会抛出一个异常信息,但不会终止执行;否则正常进行。
示例代码:
import
threading
import
apscheduler
import
time, datetime
from
apscheduler.schedulers.blocking
import
BlockingScheduler
from
apscheduler.schedulers.background
import
BackgroundScheduler
continue_run =
True
def
aps_schduler():
para = [1,2,3,4]
count_x = 1
def
job1():
nonlocal
count_x
count_x +=1
print(
‘job1’
)
print(threading.current_thread())
print(time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime()))
time.sleep(15)
print(count_x)
def
stop_runing():
global
continue_run
continue_run =
False
sch = BackgroundScheduler()
#
添加定时任务
sch.add_job(job1,
‘interval’
, seconds=3, max_instances=1)
# 1
分钟后执行stop_running
sch.add_job(stop_runing,
‘date’
, run_date=time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime(time.time()+60)))
print(
‘main thread:’
, threading.current_thread())
print(
‘before scheduler:’
, time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime()))
sch.start()
while
continue_run:
print(
‘main’
)
time.sleep(5)
print(
‘program ending.’
)
aps_schduler()
2.4. 首次执行时间问题
与twisted的定时任务类似,它也不会在一开始就执行;
除非指定:
sch.add_job(job1,
‘interval’
, seconds=5, max_instances=1, next_run_time=datetime.datetime.now())
当然,在开始定时任务前手动执行一次也是可行的。
2.5. 其它设置
任务多实例
add_job有max_instances参数可以控制多任务实例
3. api
3.1. date apscheduler.triggers.date
from
datetime
import
date
from
apscheduler.schedulers.blocking
import
BlockingScheduler
sched = BlockingScheduler()
def
my_job(text):
print(text)
# The job will be executed on November 6th, 2009
sched.add_job(my_job, ‘date’, run_date=date(2009, 11, 6), args=[‘text’])
sched.start()
You can specify the exact time when the job should be run:
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(my_job, ‘date’, run_date=datetime(2009, 11, 6, 16, 30, 5), args=[‘text’])
The run date can be given as text too:
sched.add_job(my_job, ‘date’, run_date=’2009-11-06 16:30:05′, args=[‘text’])
To add a job to be run immediately:
# The ‘date’ trigger and datetime.now() as run_date are implicit
sched.add_job(my_job, args=[‘text’])
4. 参考文档
参考文档:
https://apscheduler.readthedocs.io/en/latest/userguide.html#starting-the-scheduler
5. testing code
5.1. example1
import
threading
import
apscheduler
import
time, datetime
from
apscheduler.schedulers.blocking
import
BlockingScheduler
from
apscheduler.schedulers.background
import
BackgroundScheduler
continue_run =
True
def
aps_schduler():
para = [1,2,3,4]
count_x = 1
def
job1():
nonlocal
count_x
count_x +=1
print(
‘job1’
)
print(threading.current_thread())
print(time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime()))
#time.sleep(15)
print(count_x)
def
stop_runing():
global
continue_run
continue_run =
False
sch = BackgroundScheduler()
#
添加定时任务
sch.add_job(job1,
‘interval’
, seconds=5, max_instances=1, next_run_time=datetime.datetime.now())
# 1
分钟后执行stop_running
sch.add_job(stop_runing,
‘date’
, run_date=time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime(time.time()+60)))
print(
‘main thread:’
, threading.current_thread())
print(
‘before scheduler:’
, time.strftime(
‘%Y-%m-%d %H:%M:%S’
, time.localtime()))
sch.start()
while
continue_run:
#print(‘main’)
time.sleep(5)
print(
‘program ending.’
)
aps_schduler()
转载于:https://www.cnblogs.com/wodeboke-y/p/11049788.html