正在进入ing...

python 方便的定时器库之 schedule

发布时间:2021-11-18 浏览量: 691 文章分类: python

日常工作总竟然会遇到下面这种场景,比如需要针对不同的业务做报警,A业务的预期是每30分钟一次报警、而B业务是希望每60分钟一次报警。之前针对这种问题都是自己写定时器或使用python自带的sched库,又累又麻烦。但是这次我接到了一个更变态的需求。由于需要用到的数据源在几个不同的国家(中国、日本、东南亚、北美),又涉及到了时区问题、又涉及到差异时间问题,所以在写脚本之前,就专门网上查了一下,就发现了这个宝藏库,也是今天的主角schedule。(APScheduler由于是一个框架,暂时也没用到过。)

安装

安装就很简单了,直接pip install schedule就可以了。

简单使用

**schedule方法是串行的,也就是说,如果各个任务之间时间不冲突,那是没问题的;如果时间有冲突的话,会串行的执行命令,这个时候要用多线程比较好**

例子:

# !/usr/bin/env python
# -*- coding:utf-8 -*-
import time,os
import schedule

def job(msg):
    now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    print(f"[{now_time}] {msg}")

schedule.every(10).minutes.do(job,'这是每10分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every(1).minutes.do(job,'这是每1分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every(2).minutes.do(job,'这是每2分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every(3).minutes.do(job,'这是每3分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every(4).minutes.do(job,'这是每4分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every(5).minutes.do(job,'这是每5分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every().hour.do(job,'这是每小时一次') # 每隔 1 小时运行一次 job 函数

while True:
    schedule.run_pending() # 运行所有可以运行的任务
    time.sleep(1)

执行结果:

[2021-11-18 15:02:41] 这是每1分钟一次
[2021-11-18 15:03:42] 这是每1分钟一次
[2021-11-18 15:04:42] 这是每4分钟一次
[2021-11-18 15:04:42] 这是每3分钟一次
[2021-11-18 15:04:42] 这是每2分钟一次
[2021-11-18 15:04:42] 这是每1分钟一次
[2021-11-18 15:05:42] 这是每1分钟一次
[2021-11-18 15:06:42] 这是每2分钟一次
[2021-11-18 15:06:42] 这是每1分钟一次
[2021-11-18 15:07:40] 这是每5分钟一次
[2021-11-18 15:07:42] 这是每3分钟一次
[2021-11-18 15:07:42] 这是每1分钟一次
[2021-11-18 15:08:43] 这是每4分钟一次
[2021-11-18 15:08:43] 这是每2分钟一次
[2021-11-18 15:08:43] 这是每1分钟一次
[2021-11-18 15:09:43] 这是每1分钟一次
[2021-11-18 15:10:43] 这是每3分钟一次
[2021-11-18 15:10:43] 这是每2分钟一次
[2021-11-18 15:10:43] 这是每1分钟一次
[2021-11-18 15:11:43] 这是每1分钟一次
[2021-11-18 15:12:40] 这是每10分钟一次
[2021-11-18 15:12:41] 这是每5分钟一次
[2021-11-18 15:12:43] 这是每4分钟一次
[2021-11-18 15:12:43] 这是每2分钟一次
[2021-11-18 15:12:43] 这是每1分钟一次
[2021-11-18 15:13:44] 这是每3分钟一次
[2021-11-18 15:13:44] 这是每1分钟一次
[2021-11-18 15:14:44] 这是每2分钟一次
[2021-11-18 15:14:44] 这是每1分钟一次
[2021-11-18 15:15:44] 这是每1分钟一次

同时还有很多非常个性化的设置。

schedule.every(10).minutes.do(job, name) # 每隔十分钟执行一次任务
schedule.every(10).seconds.do(job, name) # 每隔十秒钟执行一次任务
schedule.every().hour.do(job, name) # 每隔一小时执行一次任务
schedule.every().day.at("10:30").do(job, name) # 每天的10:30执行一次任务
schedule.every(5).to(10).days.do(job, name) # 每隔5到10天执行一次任务 
schedule.every().monday.do(job, name) # 每周一的这个时候执行一次任务
schedule.every().wednesday.at("13:15").do(job, name) # 每周三13:15执行一次任务

如果出现了上面说到的时间存在冲突的问题,也可以使用多线程解决。

# !/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import time
import threading
import schedule

def job(msg):
    now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    print(f"[{now_time}] {msg}")

def worker_main():
    while 1:
        job_func = jobqueue.get()
        job_func()

jobqueue = queue.Queue()

schedule.every(10).minutes.do(job,'这是每10分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every(1).minutes.do(job,'这是每1分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every(2).minutes.do(job,'这是每2分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every(3).minutes.do(job,'这是每3分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every(4).minutes.do(job,'这是每4分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every(5).minutes.do(job,'这是每5分钟一次') # 每隔 10 分钟运行一次 job 函数
schedule.every().hour.do(job,'这是每小时一次') # 每隔 1 小时运行一次 job 函数

worker_thread = threading.Thread(target=worker_main)
worker_thread.start()

while 1:
    schedule.run_pending()
    time.sleep(1)

这样在来看 时间在执行的时候就不会出现因为可能被函数内业务影响占用而导致执行延后的问题。

[2021-11-18 15:31:28] 这是每1分钟一次
[2021-11-18 15:32:28] 这是每2分钟一次
[2021-11-18 15:32:28] 这是每1分钟一次
[2021-11-18 15:33:28] 这是每3分钟一次
[2021-11-18 15:33:28] 这是每1分钟一次
[2021-11-18 15:34:28] 这是每4分钟一次
[2021-11-18 15:34:28] 这是每2分钟一次
[2021-11-18 15:34:28] 这是每1分钟一次
[2021-11-18 15:35:28] 这是每5分钟一次
[2021-11-18 15:35:28] 这是每1分钟一次