admin管理员组文章数量:1794759
python任务调度之schedule
简介:
从最简单的栗子看起:
import schedule import time def job(): print("I'm working...") schedule.every(10).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("10:30").do(job) schedule.every(5).to(10).days.do(job) schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) while True: schedule.run_pending() time.sleep(1)这是在pypi上面给出的示例。这个栗子简单到我不需要怎么解释。而且,通过这个栗子,我们也可以知道,schedule其实就只是个定时器。在while True死循环中,schedule.run_pending()是保持schedule一直运行,去查询上面那一堆的任务,在任务中,就可以设置不同的时间去运行。跟crontab是类似的。
参考官网:schedule.readthedocs.io/en/stable/
但是,如果是多个任务运行的话,实际上它们是按照顺序从上往下挨个执行的。如果上面的任务比较复杂,会影响到下面任务的运行时间。比如我们这样:
import datetime import schedule import time def job1(): print("I'm working for job1") time.sleep(2) print("job1:", datetime.datetime.now()) def job2(): print("I'm working for job2") time.sleep(2) print("job2:", datetime.datetime.now()) def run(): schedule.every(10).seconds.do(job1) schedule.every(10).seconds.do(job2) while True: schedule.run_pending() time.sleep(1)接下来你就会发现,两个定时任务并不是10秒运行一次,而是12秒。是的。由于job1和job2本身的执行时间,导致任务延迟了。
其实解决方法也很简单:用多线程/多进程。不要幼稚地问我“python中的多线程不是没有用吗?”这是两码事。开了一条线程,就把job独立出去运行了,不会占主进程的cpu时间,schedule并没有花掉执行一个任务的时间,它的开销只是开启一条线程的时间,所以,下一次执行就变成了10秒后而不是12秒后。
import datetime import schedule import threading import time def job1(): print("I'm working for job1") time.sleep(2) print("job1:", datetime.datetime.now()) def job2(): print("I'm working for job2") time.sleep(2) print("job2:", datetime.datetime.now()) def job1_task(): threading.Thread(target=job1).start() def job2_task(): threading.Thread(target=job2).start() def run(): schedule.every(10).seconds.do(job1_task) schedule.every(10).seconds.do(job2_task) while True: schedule.run_pending() time.sleep(1)就是这么简单。
唯一要注意的是,这里面job不应当是死循环类型的,也就是说,这个线程应该有一个执行完毕的出口。一是因为线程万一僵死,会是非常棘手的问题;二是下一次定时任务还会开启一个新的线程,执行次数多了就会演变成灾难。如果schedule的时间间隔设置得比job执行的时间短,一样会线程堆积形成灾难,所以,还是需要注意一下的。
schedule源码学习源码分析就围绕这三个类:CancelJob,Scheduler,job
CancelJob
class CancelJob(object): pass可以看到就是一个空类, 这个类的作用就是当你的job执行函数返回一个CancelJob类型的对象,那么执行完后就会被Scheduler移除. 简单说就是只会执行一次.
Scheduler class Scheduler(object): def __init__(self): self.jobs = [] def run_pending(self): runnable_jobs = (job for job in self.jobs if job.should_run) for job in sorted(runnable_jobs): self._run_job(job) def run_all(self, delay_seconds=0): for job in self.jobs: self._run_job(job) time.sleep(delay_seconds) def clear(self): del self.jobs[:] def cancel_job(self, job): try: self.jobs.remove(job) except ValueError: pass def every(self, interval=1): job = Job(interval) self.jobs.append(job) return job def _run_job(self, job): ret = job.run() if isinstance(ret, CancelJob) or ret is CancelJob: self.cancel_job(job) @property def next_run(self): if not self.jobs: return None return min(self.jobs).next_run @property def idle_seconds(self): return (self.next_run - datetime.datetime.now()).total_seconds()Scheduler作用就是在job可以执行的时候执行它. 这里的函数也都比较简单:
- run_pending:运行所有可以运行的任务
- run_all:运行所有任务,不管是否应该运行
- clear:删除所有调度的任务
- cancel_job:删除一个任务
- every: 创建一个调度任务, 返回的是一个job
- _run_job:运行一个job
- next_run:获取下一个要运行任务的时间, 这里使用的是min去得到最近将执行的job, 之所以这样使用,是Job重载了__lt_方法,这样写起来确实很简洁.
- idle_seconds:还有多少秒即将开始运行任务.
Job
Job是整个定时任务的核心. 主要功能就是根据创建Job时的参数,得到下一次运行的时间. 代码如下,稍微有点长(会省略部分代码,可以看源码):
class Job(object): def __init__(self, interval): self.interval = interval # pause interval * unit between runs self.job_func = None # the job job_func to run self.unit = None # time units, e.g. 'minutes', 'hours', ... self.at_time = None # optional time at which this job runs self.last_run = None # datetime of the last run self.next_run = None # datetime of the next run self.period = None # timedelta between runs, only valid for self.start_day = None # Specific day of the week to start on def __lt__(self, other): return self.next_run < other.next_run def minute(self): assert self.interval == 1, 'Use minutes instead of minute' return self.minutes @property def minutes(self): self.unit = 'minutes' return self @property def hour(self): assert self.interval == 1, 'Use hours instead of hour' return self.hours @property def hours(self): self.unit = 'hours' return self @property def day(self): assert self.interval == 1, 'Use days instead of day' return self.days @property def days(self): self.unit = 'days' return self @property def week(self): assert self.interval == 1, 'Use weeks instead of week' return self.weeks @property def weeks(self): self.unit = 'weeks' return self @property def monday(self): assert self.interval == 1, 'Use mondays instead of monday' self.start_day = 'monday' return self.weeks def at(self, time_str): assert self.unit in ('days', 'hours') or self.start_day hour, minute = time_str.split(':') minute = int(minute) if self.unit == 'days' or self.start_day: hour = int(hour) assert 0 <= hour <= 23 elif self.unit == 'hours': hour = 0 assert 0 <= minute <= 59 self.at_time = datetime.time(hour, minute) return self def do(self, job_func, *args, **kwargs): self.job_func = functools.partial(job_func, *args, **kwargs) try: functools.update_wrapper(self.job_func, job_func) except AttributeError: # job_funcs already wrapped by functools.partial won't have # __name__, __module__ or __doc__ and the update_wrapper() # call will fail. pass self._schedule_next_run() return self @property def should_run(self): return datetime.datetime.now() >= self.next_run def run(self): logger.info('Running job %s', self) ret = self.job_func() self.last_run = datetime.datetime.now() self._schedule_next_run() return ret def _schedule_next_run(self): assert self.unit in ('seconds', 'minutes', 'hours', 'days', 'weeks') self.period = datetime.timedelta(**{self.unit: self.interval}) self.next_run = datetime.datetime.now() + self.period首先看一下几个参数的含义:
- interval:间隔多久,每interval秒或分等.
- job_func:job执行函数
- unit : 间隔单元,比如minutes, hours
- at_time :job具体执行时间点,比如10:30等
- last_run:job上一次执行时间
- next_run :job下一次即将运行时间
- period: 距离下次运行间隔时间
- start_day: 周的特殊天,也就是monday等的含义
再来看一下各个方法:
-
__lt__: 比较哪个job最先即将执行, Scheduler中next_run方法里使用min会用到, 有时合适的使用python这些特殊方法可以简化代码,看起来更pythonic.
-
second、seconds的区别就是second时默认interval ==1,即schedule.every().second和schedule.every(1).seconds是等价的,作用就是设置unit为seconds. minute和minutes、hour和hours、day和days、week和weeks也类似.
-
monday: 设置start_day 为monday, unit 为weeks,interval为1. 含义就是每周一执行job. 类似 tuesday、wednesday、thursday、friday、saturday、sunday一样.
-
at: 表示某天的某个时间点,所以不适合minutes、weeks且start_day 为空(即单纯的周)这些unit. 对于unit为hours时,time_str中小时部分为0.
-
do: 设置job对应的函数以及参数, 这里使用functools.update_wrapper去更新函数名等信.主要是functools.partial返回的函数和原函数名称不一样.具体可以看看官网文档. 然后调用_schedule_next_run去计算job下一次执行时间.
-
should_run: 判断job是否可以运行了.依据是当前时间点大于等于job的next_run
-
_schedule_next_run: 这是整个job的定时的逻辑部分是计算job下次运行的时间点的.描述一下流程:
-
计算下一次执行时间:
这里根据unit和interval计算出下一次运行时间. 举个例子,比如schedule.every().hour.do(job, message='things')下一次运行时间就是当前时间加上一小时的间隔. -
但是当start_day不为空时,即表示某个星期. 这时period就不能直接加在当前时间了. 看代码:
其中days_ahead表示job表示的星期几与当表示的星期几差几天. 比如今天是星期三,job表示的是星期五,那么days_ahead就为2,最终self.next_run效果就是在now基础上加了2天. -
当at_time不为空时, 需要更新执行的时间点,具体就是计算时、分、秒然后调用replace进行更新. 这里对unit为days或hours进行特殊处理:
当已经过了执行时间的话的话,unit为days的话减去一天, unit为hours的话减去一小时. 这样可以保证任务今天运行. -
后面还有一句代码:
这句的含义时对于像monday这些定时任务特殊情况的处理. 举个例子, 今天是星期四12:00,创建的job是星期四13:00, days_ahead <=7 这个条件满足,最终next_run实际加了7,这样的话这个任务就不会运行了. 所以这一步实际就是把7减掉. 看上去有点绕, 实际只要把days_ahead <= 0改为days_ahead < 0这句代码就不用了.
-
参考:
www.360doc/content/17/0911/21/17725421_686331723.shtml
wwwblogs/anpengapple/p/8051923.html
版权声明:本文标题:python任务调度之schedule 内容由林淑君副主任自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.xiehuijuan.com/baike/1686481335a72366.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论