admin管理员组

文章数量:1794759

python定时任务schedule模块详细资料

python定时任务schedule模块详细资料

import functools import logging import threading from datetime import timedelta, datetime import schedule import time """ # Scheduler作用就是在job可以执行的时候执行它. 对应方法功能: # # run_pending:运行所有可以运行的任务 # run_all:运行所有任务,不管是否应该运行 # clear:删除所有调度的任务 # cancel_job:删除一个任务 # every: 创建一个调度任务, 返回的是一个job # _run_job:运行一个job # next_run:获取下一个要运行任务的时间, 这里使用的是min去得到最近将执行的job, 之所以这样使用,是Job重载了__lt_方法,这样写起来确实很简洁. # idle_seconds:还有多少秒即将开始运行任务. # Job是整个定时任务的核心. 主要功能就是根据创建Job时的参数,得到下一次运行的时间 # interval:间隔多久,每interval秒或分等. # job_func:job执行函数 # unit : 间隔单元,比如minutes, hours # at_time :job具体执行时间点,比如10:30等 # last_run:job上一次执行时间 # next_run :job下一次即将运行时间 # period: 距离下次运行间隔时间 # start_day: 周的特殊天,也就是monday等的含义 """ """ # 周期任务 def job(): print("I'm working...") # 通过调用 scedule.every(时间数).时间类型.do(job) 发布周期任务。 schedule.every(interval=10).minutes.do(job_func=job) # 每个小时执行任务 schedule.every().hour.do(job) # 每天的10:30执行任务 schedule.every().day.at("10:30").do(job) # 每个月执行任务 schedule.every().monday.do(job) # 每个星期三的13:15分执行任务 schedule.every().wednesday.at("13:15").do(job) # 每分钟的第17秒执行任务 schedule.every().minute.at(":17").do(job) while True: # 发布后的周期任务需要用 run_pending 函数来检测是否执行,因此需要一个 While 循环不断地轮询这个函数 schedule.run_pending() time.sleep(1) """ # ------------------------------------------------------------------------------------------ """ # 单次任务 def job_that_executes_once(): # 此处编写的任务只会执行一次。。。 return schedule.CancelJob # 可以看到CancelJob就是一个空类, 这个类的作用就是当你的job执行函数返回一个CancelJob类型的对象, # 那么执行完后就会被Scheduler移除. 简单说就是只会执行一次 schedule.every().day.at("22:30").do(job_that_executes_once) """ # ------------------------------------------------------------------------------------------ """ # 需要传参的任务 def greet(name): print("Hello", name) # do() 将额外的参数传递给job函数 schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob') """ # ------------------------------------------------------------------------------------------ """ # 此处有错误 def hello(): print('Hello world') schedule.every().second.do(hello) # all_jobs = schedule.get_jobs() # 报错 # print(all_jobs) # schedule.clear() # 清除程序的所有任务 """ """ def greet(name): print('Hello {}'.format(name)) # .tag 打标签 schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') # get_jobs(标签):可以获取所有该标签的任务 # friends = schedule.get_jobs('friend') # 报错 # 取消所有 daily-tasks 标签的任务 # schedule.clear('daily-tasks') """ """ # 让某个作业到某个时间截止 def job(): print('Boo') # 每个小时运行作业,18:30后停止 schedule.every(1).hours.until("18:30").do(job) # 每个小时运行作业,2030-01-01 18:33 today schedule.every(1).hours.until("2030-01-01 18:33").do(job) # 每个小时运行作业,8个小时后停止 schedule.every(1).hours.until(timedelta(hours=8)).do(job) # 每个小时运行作业,11:32:42后停止 schedule.every(1).hours.until(time(11, 33, 42)).do(job) # 每个小时运行作业,2020-5-17 11:36:20后停止 schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job) """ """ # 如果某个机制触发了,你需要立即运行所有作业,可以调用 schedule.run_all() def job_1(): print('Foo') def job_2(): print('Bar') schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2) schedule.run_all() # 立即运行所有作业,每次作业间隔10秒 schedule.run_all(delay_seconds=10) """ """ # 多线程的形式来运行每个作业实现并行执行 def job1(): print("I'm running on thread %s" % threading.current_thread()) def job2(): print("I'm running on thread %s" % threading.current_thread()) def job3(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job1) schedule.every(10).seconds.do(run_threaded, job2) schedule.every(10).seconds.do(run_threaded, job3) """ """ # Schedule 模块同时也支持 logging 日志记录 logging.basicConfig() schedule_logger = logging.getLogger('schedule') # 日志级别为DEBUG schedule_logger.setLevel(level=logging.DEBUG) def job(): print("Hello, Logs") schedule.every().second.do(job) schedule.run_all() schedule.clear() """ # ----------------------------异常捕获---------------------------- # Schedule 不会自动捕捉异常,它遇到异常会直接抛出,这会导致一个严重的问题: # 后续所有的作业都会被中断执行,因此我们需要捕捉到这些异常。 # # 你可以手动捕捉,但是某些你预料不到的情况需要程序进行自动捕获,加一个装饰器就能做到了 def catch_exceptions(cancel_on_failure=False): def catch_exceptions_decorator(job_func): @functools.wraps(job_func) def wrapper(*args, **kwargs): try: return job_func(*args, **kwargs) except: import traceback print(traceback.format_exc()) if cancel_on_failure: return schedule.CancelJob return wrapper return catch_exceptions_decorator # 这样,bad_task 在执行时遇到的任何错误,都会被 catch_exceptions 捕获, # 这点在保证调度任务正常运转的时候非常关键 @catch_exceptions(cancel_on_failure=True) def bad_task(): return 1 / 0 schedule.every(5).minutes.do(bad_task) while True: schedule.run_pending() time.sleep(1)

本文标签: 详细资料模块Pythonschedule