APScheduler
是一个功能强大的Python任务调度库,它支持多种定时任务的调度方式,包括固定时间间隔、特定时间执行等。
安装
pip install apscheduler
基本构成
- 调度器(Scheduler): 调度器是
APScheduler
的核心组件,负责管理和执行任务。APScheduler
提供了不同类型的调度器,包括阻塞调度器(BlockingScheduler
)和非阻塞调度器(BackgroundScheduler
)。 - 触发器(Trigger): 触发器定义了何时执行任务。
APScheduler
支持各种触发器类型,如固定时间间隔触发器(IntervalTrigger
)、日期触发器(DateTrigger
)等。 - 任务(Job): 任务是要执行的操作或函数。在
APScheduler
中,你可以创建任务并将它们与触发器关联起来,以指定何时执行任务。
基本使用
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print("Hello, world!")
scheduler = BlockingScheduler(timezone='Asia/Shanghai')
scheduler.add_job(job, 'interval', seconds=10)
scheduler.start()
在实际应用中,可以根据需要配置不同类型的触发器和任务,以执行各种定时任务。APScheduler还支持将任务持久化到数据库,以便在应用重启后保留任务信息。
调度器(Scheduler)
阻塞与非阻塞:阻塞调度器会阻塞主程序的执行,而非阻塞调度器不会。
并发执行:非阻塞调度器可以同时执行多个任务,而阻塞调度器一次只能执行一个任务。
适用场景:阻塞调度器适用于简单的脚本和小型应用程序,而非阻塞调度器适用于大型应用程序和需要执行耗时任务的情况。
阻塞调度器
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print("Hello, world!")
scheduler = BlockingScheduler(timezone='Asia/Shanghai')
scheduler.add_job(job, 'interval', seconds=10)
scheduler.start()
非阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print("Hello, world!")
scheduler = BackgroundScheduler(timezone='Asia/Shanghai')
scheduler.add_job(job, 'interval', seconds=10)
scheduler.start()
触发器(Trigger)
常用主要包括以下3种
1、IntervalTrigger(间隔触发器)
from apscheduler.triggers.interval import IntervalTrigger
# 创建一个触发器,每隔5秒触发一次
trigger = IntervalTrigger(seconds = 5, timezone = default_timezone)
周期对应的参数
interval 周期触发任务
固定时间间隔触发。interval 间隔调度,参数如下:
参数 说明
weeks(int) 间隔几周
days(int) 间隔几天
hours(int) 间隔几小时
minutes(int) 间隔几分钟
seconds(int) 间隔多少秒
start_date(datetime or str) 开始日期
end_date(datetime or str) 结束日期
timezone(datetime.tzinfo or str) 时区
2、CronTrigger(Cron表达式触发器)
from apscheduler.triggers.cron import CronTrigger
# 每分钟的0秒和30秒触发
trigger = CronTrigger(second='0,30', timezone = default_timezone)
对应的参数
参数 说明
year(int or str) 年,4位数字
month(int or str) 月(范围1-12)
day(int or str) 日(范围1-31)
week(int or str) 周(范围1-53)
day_of_week(int or str) 周内第几天或者星期几(范围0-6或者mon,tue,wed,thu,fri,stat,sun)
hour(int or str) 时(0-23)
minute(int or str) 分(0-59)
second(int or str) 秒(0-59)
start_date(datetime or str) 最早开始日期(含)
end_date(datetime or str) 最晚结束日期(含)
timezone(datetime.tzinfo or str) 指定时区
可使用表达式
表达式 参数类型 描述
* 所有 通配符。例:minutes=*即每分钟触发
*/a 所有 可被a整除的通配符。
a-b 所有 范围a-b触发
a-b/c 所有 范围a-b,且可被c整除时触发
xth y 日 第几个星期几触发。x为第几个,y为星期几
last x 日 一个月中,最后个星期几触发
last 日 一个月最后一天触发
x,y,z 所有 组合表达式,可以组合确定值或上方的表达式
注意:month和day_of_week参数分别接受的是英语缩写jan– dec 和 mon – sun
3、DateTrigger(指定时间触发器)
是最基本的一种调度,作业任务只会在指定时间执行一次。
from apscheduler.triggers.date import DateTrigger
# 在2024年12月16日17:27执行任务
trigger = DateTrigger(run_date=datetime.datetime(2024, 12, 16, 17, 27), timezone = default_timezone, args=['test task'])
完整代码
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
import logging
import datetime
import time
default_timezone = 'Asia/Shanghai'
#当前时间
now_date = datetime.datetime.now().strftime('%Y-%m-%d')
now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
#print(now_date)
log_file_path = './logs/'
log_file_name = f'{log_file_path}{now_date}.log'
#写日志
logging.basicConfig(
#控制台打印的日志级别
level = logging.INFO,
filename = log_file_name,
##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志
#a是追加模式,默认如果不写的话,就是追加模式
filemode = 'a',
#日志格式 '%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
format = '%(asctime)s - %(levelname)s: %(message)s'
)
# 定义一个任务函数
def my_task():
current_time = datetime.datetime.now()
#print(f"任务执行时间:{current_time}")
logging.info(f'task >>>{current_time}')
# 创建一个阻塞调度器
scheduler = BlockingScheduler(timezone = default_timezone)
# 创建一个非阻塞调度器
#scheduler = BackgroundScheduler(timezone = default_timezone)
# 添加任务
#scheduler.add_job(my_task, trigger = 'interval', seconds = 2)
# 启动任务
#scheduler.start()
# 测试非阻塞调度器
#while True:
# print('ooo')
# time.sleep(2)
# 暂停任务
#scheduler.pause()
# 恢复任务
#scheduler.resume()
# 关闭任务
#scheduler.shutdown()
# 移除任务
#job = scheduler.add_job(my_task, trigger = 'interval', seconds=10)
#job.remove()
# 获取任务列表
#jobs = scheduler.get_jobs()
#print(jobs)
# 创建一个触发器,每隔5秒触发一次
trigger = IntervalTrigger(seconds = 5, timezone = default_timezone)
# 每分钟的0秒和30秒触发
#trigger = CronTrigger(second='0,30', timezone = default_timezone)
# 在2024年12月16日17:27执行任务
#trigger = DateTrigger(datetime.datetime(2024, 12, 16, 17, 27), timezone = default_timezone)
# 添加任务和触发器到调度器
scheduler.add_job(my_task, trigger)
try:
# 启动调度器
scheduler.start()
except (KeyboardInterrupt, SystemExit):
# 用户按Ctrl+C或者程序退出时停止调度器
scheduler.shutdown()