admin管理员组

文章数量:1794759

Python轻量级定时任务库Schedule

Python轻量级定时任务库Schedule

文章目录
  • 定时任务库对比
  • 简介
  • 安装
  • 初试
  • 不适用场景
  • 运行间隔
  • 装饰器
  • 传递参数
  • 取消任务
  • 运行一次任务
  • 检索所有任务
  • 取消所有任务
  • 根据标签检索任务
  • 根据标签取消任务
  • 随机间隔运行
  • 运行任务到某时间
  • 同一任务运行间隔
  • 马上运行所有任务
  • 后台运行
  • 并行运行
  • 遇到的坑
  • 参考文献

定时任务库对比

推荐阅读 Python timing task - Schedule vs. Celery vs. APScheduler

库大小优点缺点适用场景
Schedule轻量级易用无配置不能动态添加任务或持久化任务简单任务
Celery重量级①任务队列②分布式①不能动态添加定时任务到系统中,如Flask(Django可以)②设置起来较累赘任务队列
APScheduler相对重量级①灵活,可动态增删定时任务并持久化②支持多种存储后端③集成框架多,用户广重量级,学习成本大通用
Rocketry轻量级易用功能强尚未成熟,文档不清晰通用

简介

Schedule 是一款轻量级定时任务库,易用无配置。

本文所有代码

安装 pip install schedule

初试 import time import schedule def job(): print('working...') schedule.every(10).minutes.do(job) # 每10分钟 schedule.every().hour.do(job) # 每小时 schedule.every().day.at('10:30').do(job) # 每天10:30 schedule.every().monday.do(job) # 每周一 schedule.every().wednesday.at('13:15').do(job) # 每周三13:15 schedule.every().minute.at(':17').do(job) # 每分钟的17秒 while True: schedule.run_pending() time.sleep(1)

不适用场景
  • 持久化任务
  • 精确时间
  • 并发运行
  • 时区工作日假期的本地化
  • 不考虑任务运行所需的时间

为保证稳定运行计划,需要将长时间运行的任务从主线程分离开,参考示例实现。

运行间隔 import schedule import time def job(): print('working...') # 每隔3秒/分/小时/天/周运行 schedule.every(3).seconds.do(job) schedule.every(3).minutes.do(job) schedule.every(3).hours.do(job) schedule.every(3).days.do(job) schedule.every(3).weeks.do(job) # 每分钟的第23秒运行 schedule.every().minute.at(':23').do(job) # 每小时的第42分钟运行 schedule.every().hour.at(':42').do(job) # 每4小时的20分30秒运行,如果现在是02:00,那么初次运行时间是06:20:30 schedule.every(5).hours.at('20:30').do(job) # 每天固定时间运行 schedule.every().day.at('10:30').do(job) schedule.every().day.at('10:30:42').do(job) # 每周固定时间运行 schedule.every().monday.do(job) schedule.every().wednesday.at('13:15').do(job) while True: schedule.run_pending() time.sleep(1)

装饰器

通过 @repeat() 装饰静态方法

import time from schedule import every, repeat, run_pending @repeat(every().second) def job(): print('working...') while True: run_pending() time.sleep(1)

传递参数 import schedule def greet(name): print('Hello', name) schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob') while True: schedule.run_pending()

装饰器(待测试)

from schedule import every, repeat @repeat(every().second, 'World') @repeat(every().day, 'Mars') def hello(planet): print('Hello', planet) while True: schedule.run_pending()

取消任务

调用 schedule.cancel_job(job)

import schedule i = 0 def some_task(): global i i += 1 print(i) if i == 10: schedule.cancel_job(job) print('cancel job') exit(0) job = schedule.every().second.do(some_task) while True: schedule.run_pending()

效果

运行一次任务

任务返回 schedule.CancelJob

import time import schedule def job_that_executes_once(): print('Hello') return schedule.CancelJob schedule.every().minute.at(':17').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)

检索所有任务 import schedule def hello(): print('Hello world') schedule.every().second.do(hello) all_jobs = schedule.get_jobs() print(all_jobs) # [Every 1 second do hello() (last run: [never], next run: 2021-04-15 22:45:44)]

取消所有任务

调用 schedule.clear()

import schedule i = 0 def hello(): print('Hello world') def some_task(): global i i += 1 print(i) if i == 5: schedule.clear() print('clear all jobs') exit(0) schedule.every().second.do(hello) schedule.every().second.do(some_task) while True: schedule.run_pending()

效果

根据标签检索任务

调用 schedule.get_jobs(tag)

import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') friends = schedule.get_jobs('friend') print(friends) # [Every 1 day do greet('Andrea') (last run: [never], next run: 2021-04-16 22:46:51), Every 1 hour do greet('John') (last run: [never], next run: 2021-04-15 23:46:51)]

根据标签取消任务

调用 schedule.clear(tag)

import schedule def greet(name): print('Hello {}'.format(name)) if name == 'Cancel': schedule.clear('second-tasks') print('cancel second-tasks') schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend') schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest') while True: schedule.run_pending()

效果

随机间隔运行 import schedule from datetime import datetime last = datetime.now() def job(): global last now = datetime.now() print(now - last) last = now schedule.every(1).to(5).seconds.do(job) # 每隔1-5秒运行 while True: schedule.run_pending()

效果

运行任务到某时间 import schedule from datetime import datetime, timedelta, time def job(): print('working...') schedule.every().second.until('23:59').do(job) # 今天23:59停止 schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止 schedule.every().second.until(timedelta(hours=8)).do(job) # 8小时后停止 schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止 schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止 while True: schedule.run_pending()

同一任务运行间隔

调用 schedule.idle_seconds()

import time import schedule def job(): print('working...') schedule.every(5).seconds.do(job) while True: n = schedule.idle_seconds() print(n) if n is None: # 没其他任务 break elif n > 0: time.sleep(n) # 精确时间 schedule.run_pending()

马上运行所有任务

不管调度,调用 schedule.run_all() 运行所有任务

import schedule def job(): print('working...') def job1(): print('Hello...') schedule.every().monday.at('12:40').do(job) schedule.every().tuesday.at('16:40').do(job1) schedule.run_all() schedule.run_all(delay_seconds=3) # 任务间延迟3秒

效果

后台运行 import time import schedule import threading def run_continuously(interval=1): """隔一段时间运行""" cease_continuous_run = threading.Event() class ScheduleThread(threading.Thread): @classmethod def run(cls): while not cease_continuous_run.is_set(): schedule.run_pending() time.sleep(interval) continuous_thread = ScheduleThread() continuous_thread.start() return cease_continuous_run def background_job(): print('Hello from the background thread') # 启动后台线程 schedule.every().second.do(background_job) stop_run_continuously = run_continuously() # 做其他事 for i in range(10): print(i) time.sleep(1) # 停止后台线程 stop_run_continuously.set()

效果

并行运行

使用 Python 内置队列实现

import time import queue import schedule import threading def job(): print('working...') def worker_main(): while 1: job_func = jobqueue.get() job_func() jobqueue.task_done() jobqueue = queue.Queue() schedule.every(3).seconds.do(jobqueue.put, job) schedule.every(3).seconds.do(jobqueue.put, job) schedule.every(3).seconds.do(jobqueue.put, job) schedule.every(3).seconds.do(jobqueue.put, job) schedule.every(3).seconds.do(jobqueue.put, job) worker_thread = threading.Thread(target=worker_main) worker_thread.start() while 1: schedule.run_pending() time.sleep(1)

效果

遇到的坑
  • Schedule ImportError: cannot import name ‘repeat’
  • Win10下安装最新版本为 schedule==1.0.0,有些功能无法使用

    参考文献
  • Schedule GitHub
  • Schedule Documentation
  • 本文标签: Pythonschedule