admin管理员组文章数量:1794759
Python轻量级定时任务库Schedule
文章目录
- 定时任务库对比
- 简介
- 安装
- 初试
- 不适用场景
- 运行间隔
- 装饰器
- 传递参数
- 取消任务
- 运行一次任务
- 检索所有任务
- 取消所有任务
- 根据标签检索任务
- 根据标签取消任务
- 随机间隔运行
- 运行任务到某时间
- 同一任务运行间隔
- 马上运行所有任务
- 后台运行
- 并行运行
- 遇到的坑
- 参考文献
推荐阅读 Python timing task - Schedule vs. Celery vs. APScheduler
Schedule | 轻量级 | 易用无配置 | 不能动态添加任务或持久化任务 | 简单任务 |
Celery | 重量级 | ①任务队列②分布式 | ①不能动态添加定时任务到系统中,如Flask(Django可以)②设置起来较累赘 | 任务队列 |
APScheduler | 相对重量级 | ①灵活,可动态增删定时任务并持久化②支持多种存储后端③集成框架多,用户广 | 重量级,学习成本大 | 通用 |
Rocketry | 轻量级 | 易用功能强 | 尚未成熟,文档不清晰 | 通用 |
Schedule 是一款轻量级定时任务库,易用无配置。
本文所有代码
安装 pip install schedule 初试 import time import schedule def job(): print('working...') schedule.every(10).minutes.do(job) # 每10分钟 schedule.every().hour.do(job) # 每小时 schedule.every().day.at('10:30').do(job) # 每天10:30 schedule.every().monday.do(job) # 每周一 schedule.every().wednesday.at('13:15').do(job) # 每周三13:15 schedule.every().minute.at(':17').do(job) # 每分钟的17秒 while True: schedule.run_pending() time.sleep(1) 不适用场景- 持久化任务
- 精确时间
- 并发运行
- 时区工作日假期的本地化
- 不考虑任务运行所需的时间
为保证稳定运行计划,需要将长时间运行的任务从主线程分离开,参考示例实现。
运行间隔 import schedule import time def job(): print('working...') # 每隔3秒/分/小时/天/周运行 schedule.every(3).seconds.do(job) schedule.every(3).minutes.do(job) schedule.every(3).hours.do(job) schedule.every(3).days.do(job) schedule.every(3).weeks.do(job) # 每分钟的第23秒运行 schedule.every().minute.at(':23').do(job) # 每小时的第42分钟运行 schedule.every().hour.at(':42').do(job) # 每4小时的20分30秒运行,如果现在是02:00,那么初次运行时间是06:20:30 schedule.every(5).hours.at('20:30').do(job) # 每天固定时间运行 schedule.every().day.at('10:30').do(job) schedule.every().day.at('10:30:42').do(job) # 每周固定时间运行 schedule.every().monday.do(job) schedule.every().wednesday.at('13:15').do(job) while True: schedule.run_pending() time.sleep(1) 装饰器通过 @repeat() 装饰静态方法
import time from schedule import every, repeat, run_pending @repeat(every().second) def job(): print('working...') while True: run_pending() time.sleep(1) 传递参数 import schedule def greet(name): print('Hello', name) schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob') while True: schedule.run_pending()装饰器(待测试)
from schedule import every, repeat @repeat(every().second, 'World') @repeat(every().day, 'Mars') def hello(planet): print('Hello', planet) while True: schedule.run_pending() 取消任务调用 schedule.cancel_job(job)
import schedule i = 0 def some_task(): global i i += 1 print(i) if i == 10: schedule.cancel_job(job) print('cancel job') exit(0) job = schedule.every().second.do(some_task) while True: schedule.run_pending()效果
运行一次任务任务返回 schedule.CancelJob
import time import schedule def job_that_executes_once(): print('Hello') return schedule.CancelJob schedule.every().minute.at(':17').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1) 检索所有任务 import schedule def hello(): print('Hello world') schedule.every().second.do(hello) all_jobs = schedule.get_jobs() print(all_jobs) # [Every 1 second do hello() (last run: [never], next run: 2021-04-15 22:45:44)] 取消所有任务调用 schedule.clear()
import schedule i = 0 def hello(): print('Hello world') def some_task(): global i i += 1 print(i) if i == 5: schedule.clear() print('clear all jobs') exit(0) schedule.every().second.do(hello) schedule.every().second.do(some_task) while True: schedule.run_pending()效果
根据标签检索任务调用 schedule.get_jobs(tag)
import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') friends = schedule.get_jobs('friend') print(friends) # [Every 1 day do greet('Andrea') (last run: [never], next run: 2021-04-16 22:46:51), Every 1 hour do greet('John') (last run: [never], next run: 2021-04-15 23:46:51)] 根据标签取消任务调用 schedule.clear(tag)
import schedule def greet(name): print('Hello {}'.format(name)) if name == 'Cancel': schedule.clear('second-tasks') print('cancel second-tasks') schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend') schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest') while True: schedule.run_pending()效果
随机间隔运行 import schedule from datetime import datetime last = datetime.now() def job(): global last now = datetime.now() print(now - last) last = now schedule.every(1).to(5).seconds.do(job) # 每隔1-5秒运行 while True: schedule.run_pending()效果
运行任务到某时间 import schedule from datetime import datetime, timedelta, time def job(): print('working...') schedule.every().second.until('23:59').do(job) # 今天23:59停止 schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止 schedule.every().second.until(timedelta(hours=8)).do(job) # 8小时后停止 schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止 schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止 while True: schedule.run_pending() 同一任务运行间隔调用 schedule.idle_seconds()
import time import schedule def job(): print('working...') schedule.every(5).seconds.do(job) while True: n = schedule.idle_seconds() print(n) if n is None: # 没其他任务 break elif n > 0: time.sleep(n) # 精确时间 schedule.run_pending() 马上运行所有任务不管调度,调用 schedule.run_all() 运行所有任务
import schedule def job(): print('working...') def job1(): print('Hello...') schedule.every().monday.at('12:40').do(job) schedule.every().tuesday.at('16:40').do(job1) schedule.run_all() schedule.run_all(delay_seconds=3) # 任务间延迟3秒效果
后台运行 import time import schedule import threading def run_continuously(interval=1): """隔一段时间运行""" cease_continuous_run = threading.Event() class ScheduleThread(threading.Thread): @classmethod def run(cls): while not cease_continuous_run.is_set(): schedule.run_pending() time.sleep(interval) continuous_thread = ScheduleThread() continuous_thread.start() return cease_continuous_run def background_job(): print('Hello from the background thread') # 启动后台线程 schedule.every().second.do(background_job) stop_run_continuously = run_continuously() # 做其他事 for i in range(10): print(i) time.sleep(1) # 停止后台线程 stop_run_continuously.set()效果
并行运行使用 Python 内置队列实现
import time import queue import schedule import threading def job(): print('working...') def worker_main(): while 1: job_func = jobqueue.get() job_func() jobqueue.task_done() jobqueue = queue.Queue() schedule.every(3).seconds.do(jobqueue.put, job) schedule.every(3).seconds.do(jobqueue.put, job) schedule.every(3).seconds.do(jobqueue.put, job) schedule.every(3).seconds.do(jobqueue.put, job) schedule.every(3).seconds.do(jobqueue.put, job) worker_thread = threading.Thread(target=worker_main) worker_thread.start() while 1: schedule.run_pending() time.sleep(1)效果
遇到的坑Win10下安装最新版本为 schedule==1.0.0,有些功能无法使用
参考文献版权声明:本文标题:Python轻量级定时任务库Schedule 内容由林淑君副主任自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.xiehuijuan.com/baike/1686481598a72395.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论