Use Python to make a hot-loadable timing scheduler

Original link:

Is it not enough to use CRON for timed tasks?


Recently, there are a lot of Python scripts written, and there are also many scripts that run regularly. Then the CRON that comes with Linux may not be very intuitive. Otherwise, why are so many people developing CRON expression generators? In addition, CRON cannot be tied to scripts It can’t be used on Windows, so I want to use Python to implement one.

Of course, if you just want to use Python to do a timed task for a program, it is still very simple. If you don’t want to use CRON, you can directly use an infinite loop if to judge the time. For example, if you want to execute it every hour, then keep judging, Until that time is up and then go for it. However, if many scripts use this method, it will be too messy, and it is not easy to stop which script to stop, so it may be better to write a timing scheduler to manage all timing scripts uniformly.

exploration process

In fact, I originally thought of putting the CRON expression in a script, and then having the manager read it like a configuration file, but it was a bit troublesome for me to parse the CRON expression myself, and I couldn’t find a good way for a while. However, when I was looking for this thing, I found an interesting library called schedule , which can be installed with pip. It can use natural language-like grammatical structures to write timing statements, which looks very interesting, so I plan to use this library to write my scheduler.

It’s not very complicated to write, and this library is quite easy to understand. It’s much easier to use than the APScheduler library that is said to be commonly used in Python. That thing doesn’t look very user-friendly. The function will be written soon, but there is a problem, since I want to write a scheduler that can be hot loaded (auto-reloaded), what is the best way to monitor files? When I first implemented it, I thought about using the list directory and stat method to read the metadata of the file, then polling, and reloading if the content changed. However, it is also troublesome to write in this way, it is not accurate to be precise, and the performance is not very good, and polling is required, and the execution efficiency of the entire program will become lower when there are more files. Then I thought that the Django I used before seemed to have such a function. What method does it use? Usually when we run a Django project, it will write “Watching for file changes with StatReloader” in the first sentence. It seems that StatReloader is the module it uses to monitor files. Listening to the name, it is similar to what I thought before. I found a library with this name, so let me just search for another library. Later, I found a library that looks good, called watchdog. It seems that the usage is not very complicated, and it is said that it uses something in the kernel to monitor, and the performance is much better than polling, so I made this library. , I feel that the effect is not bad, and the reliability is also very good. As soon as the file is modified, the program will detect it and then reload it.



 import threading import time import schedule import os import importlib from watchdog.observers import Observer from import * reload_status = [ 0 ] def run_threaded ( job_func ): job_thread = threading . Thread ( target = job_func ) job_thread . start () class FileEventHandler ( FileSystemEventHandler ): def __init__ ( self , reload_status ): self . reload_status = reload_status def on_any_event ( self , event ): if not event . is_directory : self . reload_status [ 0 ] = 1 observer = Observer () event_handler = FileEventHandler ( reload_status ) observer . schedule ( event_handler , "tasks" , recursive = False ) observer . start () while True : reload_status [ 0 ] = 0 taskList = os . listdir ( "tasks" ) for task in taskList : if "__" in task or task . rsplit ( "." , 1 )[ - 1 ] != "py" : continue try : importlib . reload ( importlib . import_module ( "tasks." + task . split ( "." )[ 0 ])). run ( run_threaded , schedule ) except : print ( f "Task { task . split ( '.' )[ 0 ] } import failure" ) while True : if reload_status [ 0 ]: print ( "Task change, reloading..." ) time . sleep ( 1 ) schedule . clear () break else : schedule . run_pending () time . sleep ( 1 )

Managed script example

 def run ( run_threaded , schedule ): schedule . every (). second . do ( run_threaded , job ) def job (): print ( "The job is running." )

Note: The script should be placed in the “tasks” folder under the folder where the manager is located. For the specific timing, see the official schedule example .


I feel that the program is as simple as possible, and the function is as simple as possible. It is said that APScheduler is something like Java’s Quartz implemented in Python. It’s hard to look at it. A manager written in 50 lines like me looks pretty good?.

This article is reprinted from:
This site is for inclusion only, and the copyright belongs to the original author.