Original link: https://www.dongwm.com/post/python-concurrent-scheme/
foreword
This article deeply compares the applicable scenarios, advantages and disadvantages of the Python concurrency scheme, mainly to introduce the asyncio scheme.
Note: The code in this article requires Python 3.10 and above to run properly.
Python Concurrency and Parallelism
There are 3 concurrency and parallelism schemes in the Python world, as follows:
- threading
- multiprocessing
- Asynchronous IO (asyncio)
Note: The difference between concurrency and parallelism will not be mentioned first. In the end, it will be better explained with examples. In addition, concurrent.futures
will be mentioned later, but it is not an independent solution, so it is not listed here.
These solutions are designed to address performance bottlenecks with different characteristics. There are two main types of performance problems:
- CPU-bound. This is also referred to as a computationally intensive task, which is characterized by a large amount of computation. For example, the execution of various methods of Python’s built-in objects, scientific computing, video transcoding, and more.
- I/O-bound. All tasks involving network, memory access, disk I/O, etc. are IO-intensive tasks. Such tasks are characterized by low CPU consumption and most of the time of the task is waiting for the completion of the I/O operation. For example database connections, web services, file reading and writing, and more.
If you don’t know what type of task a task is, my experience is you ask yourself, if given a better and faster CPU it can be faster, then it’s a CPU-bound task, otherwise it’s I/O-bound Task.
For CPU-intensive tasks in these three schemes, there is only one optimization scheme, which is to use multi-process to make full use of multi-core CPU to complete tasks together to achieve the purpose of speeding up. For I/O-intensive tasks, all three options are fine .
Then take a small example of crawling a web page and writing it locally (a typical I/O intensive task) to disassemble and compare these solutions one by one. Let’s see an example:
import requests url = 'https://movie.douban.com/top250?start=' headers = { 'User-Agent' : 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36' # noqa } def fetch ( session , page ): with ( session . get ( f ' { url }{ page * 25 } ' , headers = headers ) as r , open ( f 'top250- { page } .html' , 'w' ) as f ): f.write ( r.text ) _ _ _ _ def main (): with requests . Session () as session : for p in range ( 25 ): fetch ( session , p ) if __name__ == '__main__' : main () |
In this example, 25 pages of Douban Movie Top250 will be fetched (each page displays 10 movies), using the requests library, different pages are requested in sequence, and it took a total of 3.9 seconds:
➜ time python io_non_concurrent.py python io_non_concurrent.py 0.23s user 0.05s system 7 % cpu 3.911 total |
Although this speed seems to be very good, on the one hand, Douban has done a good optimization, on the other hand, the bandwidth and network speed of my home are also relatively good. Then use the above three schemes to optimize and see the effect.
multi-process version
The Python interpreter uses a single process. If the server or your computer has multiple cores, it is actually very wasteful to use it, so you can speed up by multiple processes:
from multiprocessing import Pool def main (): with ( Pool () as pool , requests . Session () as session ): pool.starmap ( fetch , [( session , p ) for p in range ( 25 ) ] ) |
Note: The code that has already appeared above is omitted here, and only the changed part is shown.
Use a multi-process pool, but the number of processes is not specified, so 10 processes will be started to work together according to the number of cores of the Macbook. The time consumption is as follows:
➜ time python use_multiprocessing.py python use_multiprocessing.py 2.15s user 0.30s system 232 % cpu 1.023 total |
Multiprocessing can theoretically have a tenfold increase in efficiency, because 10 processes perform tasks together. Of course, since the number of tasks is 25, not an integer multiple, it is impossible to reduce the time consumption by 10 times, and because the capture is too fast, the efficiency improvement under the multi-process scheme is not fully displayed, so it takes 1 second, which is about 4 times. efficiency improvement.
There is no obvious disadvantage under the multi-process scheme, as long as the machine is powerful enough, it can be faster.
Multithreaded version
The Python interpreter is not thread-safe, and the GIL is designed for this purpose: GIL locks are obtained to access Python objects in threads. Therefore, at any one time, only one thread can execute code, so that race conditions (Race Condition) will not be triggered. Although GIL has many problems, GIL still has its advantages, such as simplifying memory management, etc. , these are not the focus of this article, so they will not be expanded. Those who are interested can learn about them.
Then some students will ask, since only one thread is always working at the same time, what is the reason why multiple threads can improve concurrency efficiency?
To explain this problem, it is still necessary to mention the GIL. Further reading link 1 “Understanding the Python GIL” has a good explanation (it should be noted here that the solution we mentioned is the new GIL of Python 3.2, not the old GIL of Python2, and there are many descriptions of the old GIL on the Internet , is actually outdated, this part can also be seen in the article of extended reading link 2 to help understand the difference), I cut a few PPTs to illustrate:
In the above picture, there is only one thread, so there is no need to release or acquire the GIL, but then a second thread appears, so there are multiple threads. At the beginning, thread 2 is suspended because it does not have the GIL.
Thread 1 will voluntarily give up the GIL within a cv_wait
cycle, such as I/O blocking, or timeout (threads cannot be held all the time, even if there is no I/O blocking within a cycle, the execution right must be released forcibly , the default time is 5 milliseconds, which can be set through sys.setswitchinterval
, of course, you have to know what you are doing before setting it) will trigger the operation of releasing the GIL.
A conventional example (non-timeout forced release) is demonstrated here. In the cv_wait
stage, thread 1 will send a signal to thread 2 due to I/O blocking. At this time, thread 1 gives up the GIL and suspends, and thread 2 obtains GIL, and so on, after which thread 2 releases the GIL to thread 1. This PPT is very well-known in the industry, and I suggest you watch it more. The subsequent PPT also lists the processing of timeout. Since it is a little far from our article, it will not be expanded. If you are interested, continue to read it. Btw, the first time I saw this PPT, I thought the timeout period was terrible, that is to say, it takes at least 200 switches per second, which is too wasteful, so you can try to increase the timeout period in the code.
Through the above content, multi-threading is controlled by the GIL, and each thread gets a better execution time, so it will not be blocked by a thread task all the time, because if the thread encounters blocking, it will voluntarily give up the GIL to hang itself. From the beginning, give the opportunity to other threads, which improves the overall efficiency of executing tasks. The most perfect scenario in multi-threaded mode is that the corresponding thread is doing something at any point in time, not that some threads are actually waiting to be executed, but are actually blocked.
Let’s take a look at the multi-threaded solution:
from multiprocessing.pool import ThreadPool def main (): with ( ThreadPool ( processes = 5 ) as pool , requests . Session () as session ): pool.starmap ( fetch , [( session , p ) for p in range ( 25 ) ] ) |
Here are 2 points:
- In both multi-process and multi-thread examples, I use [pool], which is a good habit, because too many threads (incoming) processes will bring additional overhead, including creation and destruction overhead, scheduling overhead, etc., and at the same time It also reduces the overall performance of the computer. Use the thread (process) process pool to maintain multiple thread (process) processes, waiting for the supervisory manager to assign tasks that can be executed concurrently. In this way, on the one hand, it avoids the cost of creating and destroying threads when processing tasks, and on the other hand, it avoids the over-scheduling problem caused by the expansion of the number of threads, and ensures full utilization of the kernel. In addition, the implementation of the process pool and thread pool in the standard library writes very little extra code, and the code structure is still very similar, which is especially suitable for writing comparative examples.
- If
processes
are not specified, it is also the same as the number of CPU cores, but it is not that the more threads, the better, because there are more threads, but the normal and effective execution is forced to be released by the GIL, which results in redundant context switching. burdened.
In this example, the number of threads is 5, which is actually an experience on the one hand, and the result of multiple debugging values on the other, so it also exposes that if a little carelessness in multi-threaded programming will make the optimization worse, there will also be There is no problem of finding the optimal value, because the GIL control thread is a black box operation, and the developer cannot directly control it, which is very unfriendly even for some relatively experienced Python developers.
Let’s take a look at the time:
➜ time python use_threading.py python use_threading.py 0.62s user 0.24s system 74 % cpu 1.157 total |
It can be seen that the multi-threaded scheme is more than twice as fast as the original scheme, but a little worse than the multi-process scheme (in fact I think it will be much worse in a real example). This is because the multi-core CPUs work independently in the multi-process scheme, while the multi-thread scheme cannot use that many threads due to efficiency issues, and due to the limitation of the GIL, it is still forced when the GIL does not need to be released. Release, the constant switching process reduces the efficiency and greatly reduces the effect.
concurrent.futures version
Here also mention the concurrent.futures
scheme by the way. In fact, it is not a completely new solution, it is a framework that has long appeared in other languages (such as Java), through which the startup, execution and shutdown of the thread (process) can be controlled. I understand it as the code that abstracts the multi-process pool and multi-thread pool, so that developers do not need to pay attention to the specific details and usage of multi-thread and multi-process modules. In fact, it is not difficult to understand, you can disassemble it like this:
In fact, it is not difficult to understand. For example, ThreadPoolExecutor can be disassembled like this: ThreadPoolExecutor = Thread + Pool + Executor
, which is actually thread + pool + executor. It is to create a thread pool in advance to be reused. Executor decouples task submission and task execution. It completes thread allocation (how and when) and task execution.
If you want to know its details, I recommend looking directly at the comments in the header of its source code file, which has a very detailed description of the data flow, which can be said to be more in-depth and accurate than any technical article.
Here is just a demonstration of the usage of ThreadPoolExecutor:
from functools import partial from concurrent.futures import ThreadPoolExecutor def main (): with ( ThreadPoolExecutor ( max_workers = 5 ) as pool , requests . Session () as session ): list ( pool . map ( partial ( fetch , session ), range ( 25 ))) |
Is it a familiar recipe? The interface is very similar to the thread pool of the process pool used above, but it should be noted that if max_workers
is not specified, the number is the number of CPUs + 4, and the maximum is 32. It is the same as the usage problem of multi-threading, and this max_workers
needs to be tuned (the same value is used here for comparison).
➜ time python use_executor.py _ _ python use_executor .py 0.63 s user 0.32 s system 82 % cpu 1.153 total _ |
Although concurrent.futures
is a more mainstream solution now, in my experience, its efficiency is slightly lower than the code that directly uses the process pool or thread pool, because it is highly abstract, but it complicates things, such as The corresponding queue (queue module) and semaphore (Semaphore) are used, which limit the performance improvement. So my suggestion is that Python beginners can use it, but advanced developers should control the concurrency implementation themselves.
asyncio version
In the previous multi-thread-related solutions, developers need to find one (or more) optimal number of threads based on experience or experiments. This value varies greatly in different scenarios, which is very unfriendly to beginners. It is very easy to fall into the situation of “multi-threading is being used, but it is used incorrectly or is not used well enough”.
Later, Python introduced a new concurrency model: aysncio. This section explains why the latest asyncio solution is a better choice. First, look at a page of PPT in “Understanding the Python GIL”:
Let’s recall that it mentioned that when there is only a single thread, the GIL is not actually triggered, this independent thread can execute forever. This is also the entry point that asyncio found: because it is single-process and single-threaded, it is theoretically not restricted by the GIL . Under the event-driven mechanism, single-threaded performance can be better utilized, especially through the await keyword, developers can decide the scheduling scheme themselves, rather than multi-threading controlled by the GIL.
Imagine that, in the best case scenario, all await places are potentially I/O blocked. Then, during execution, if you encounter I/O blocking, you can switch the coroutine and execute other tasks that can continue to be executed. Therefore, this thread has been working without blocking. It can be said that the utilization rate reaches 100%! This is never possible under a multi-threaded solution.
Speaking of this, let’s go back and reorganize and understand it again, starting with the basic theory.
coroutine
A coroutine is a special function. This function adds the async keyword before the original def keyword. In essence, it is a generator function that can generate values or receive values sent from the outside (through the send method), but it is the most The important feature is that it can save the context (or state) when needed, suspend itself and give control to the caller, and since it saves the context at the time of suspension, it can be executed in the future.
In fact, when calling a coroutine, it will not execute immediately:
In : async def a (): ... : print ( 'Called' ) ... : In : a () # did not execute, just returned the coroutine object Out : < coroutine object a at 0x10444aa40 > In : await a () # Use await to actually execute Called |
asynchrony and concurrency
Asynchronous (asynchronous), non-blocking (non-blocking), concurrent (concurrent) are very confusing words. Combined with the asyncio scenario, my understanding is:
- Coroutines are executed asynchronously. In asyncio, coroutines can [pause] themselves while waiting for the execution result, so that other coroutines can run at the same time.
- Asynchronous allows execution without waiting for the completion of blocking logic to allow other code to run at the same time, so it will not [block] other code, then this is [non-blocking] code
- When a program written in asynchronous code is executed, it seems that the tasks in it are executed and completed at the same time (because it will switch in wait), so it seems to be [concurrent]
Event Loop (EventLoop)
In fact, I have understood the concept of Event Loop for many years, starting from the Twisted era. I always thought it was very mysterious and complicated, but now it seems that I think too much. For beginners, it is better to change the way of thinking, its focus is event + loop: Loop is a loop, each task is placed on this loop as an event, the event will loop continuously, and the execution event will be triggered when the conditions are met. Its features are as follows:
- An event loop runs in a thread
- Awaitables objects (coroutine, Task, Future will be mentioned below) can be registered to the event loop
- If another coroutine is called in the coroutine (via await), the coroutine will be suspended, a context switch will occur and the other coroutine will be executed, and so on.
- If the coroutine encounters I/O blocking during execution, the coroutine will suspend with the context, and then return control to EventLoop
- Since it is loop. After all registered events are executed, the loop will restart
Future/Task
I think asyncio.Future
is like Promise
in Javascript, it is a placeholder object that represents an unfinished thing, which will be implemented or completed in the future (of course, it may throw an exception due to internal errors). It is very similar to the concurrent.futures.Futures
implemented in the concurrent.futures
scheme mentioned above, but with a lot of customization for asyncio’s event loop. asyncio.Future
is just a container for data.
asyncio.Task
is a subclass of asyncio.Future
, which is used to run coroutines in the event loop.
A very intuitive example is mentioned in the official documentation. I rewrite it here to execute it in IPython and explain:
In : async def set_after ( fut ): # Create a coroutine that will sleep asynchronously for 3 seconds, and then set the result to the future object ... : await asyncio . sleep ( 3 ) ... : fut . set_result ( 'Done' ) ... : In : loop = asyncio . get_event_loop () # Get the current event loop In : fut = loop . create_future () # Create a Future in the event loop In : fut # At this point it is still the default pending state, because it is not called Out : < Future pending > In : task = loop . create_task ( set_after ( fut )) # create (or register) a task in the event loop In : task # Enter it immediately, the task has just been created and is still being executed Out : < Task pending name = 'Task-3044' coro =< set_after () running at < ipython - input - 51 - 1 fd5c9e97768 > : 2 > wait_for =< Future pending cb = [ < TaskWakeupMethWrapper object at 0x1054d32b0 > ()] >> In : fut # Enter it immediately, the task has just been created at this time, and it has not been executed yet, so the future does not change Out : < Future pending > In : task # After three seconds, the task execution is completed Out : < Task finished name = 'Task-3044' coro =< set_after () done , defined at < ipython - input - 51 - 1 fd5c9e97768 > : 1 > result = None > In : fut # Future has also set the result, so the status is finished Out : < Future finished result = 'Done' > |
Can feel:
- A Future object is not a task, but a container for storing state
-
create_task
will let the event loop schedule the execution of the coroutine - To create tasks,
ensure_future
andcreate_task
can be used.ensure_future
is a higher-level encapsulated function, but Python3.7 and above should usecreate_task
Next is to understand the role of await. If the coroutine awaits a Future object, the Task will suspend the execution of the coroutine and wait for the Future to complete. And when the Future completes, the execution of the wrapped coroutine will continue:
“` python
In : async def a():
…: print(‘IN a’)
…: await b()
…: await c()
…: print(‘OUT a’)
…:
In : async def b():
…: print(‘IN b’)
…: await d()
…: print(‘OUT b’)
…:
…:
In : async def c():
…: print(‘IN c’)
…: await asyncio.sleep(1)
…: print(‘OUT c’)
…:
…:
In : async def d():
…: print(‘IN d’)
…
Original: In-depth comparison of Python concurrency schemes
This article is reprinted from: https://www.dongwm.com/post/python-concurrent-scheme/
This site is for inclusion only, and the copyright belongs to the original author.