In the last two articles we reviewed Python3 Multithreading and Multiprocessing examples.
This time we’ll talk about Python’s Coroutines.
What is Coroutine? In short – coroutine is a form of sequential processing. Meaning that coroutines are actually always running concurrently and never in parallel! Next question rises – why the heck someone would prefer coroutines over threads that are running in parallel?
The answer is that coroutines can provide a very high level of concurrency with very little overhead.
With threads you can have parallelism, but the time and resources waste for scheduling threads sometimes is even greater than the time that was saved by using them! So in real life it’s not always better to use many threads instead of single threaded coroutine because a coroutine can still do concurrency and manage the context switch by itself.
Ok enough words let’s see some code! The code below performs images download from CodeFlex.co website. The first version doesn’t use coroutines and downloads images sequentially. The second version of the code uses asyncio
library for creating coroutines and because of that it will spend significantly less time for this task.
from pathlib import Path
import logging
from urllib.request import urlopen, Request
import os
from time import time
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CODEFLEX_IMAGES_URLS = ['https://codeflex.co/wp-content/uploads/2021/01/pandas-dataframe-python-1024x512.png',
'https://codeflex.co/wp-content/uploads/2021/02/github-actions-deployment-to-eks-with-kustomize-1024x536.jpg',
'https://codeflex.co/wp-content/uploads/2021/02/boto3-s3-multipart-upload-1024x536.jpg',
'https://codeflex.co/wp-content/uploads/2018/02/kafka-cluster-architecture.jpg',
'https://codeflex.co/wp-content/uploads/2016/09/redis-cluster-topology.png']
def download_image(dir, img_url):
download_path = dir / os.path.basename(img_url)
with urlopen(img_url) as image, download_path.open('wb') as f:
f.write(image.read())
logger.info('Downloaded: ' + img_url)
def main():
images_dir = Path("codeflex_images")
Path("codeflex_images").mkdir(parents=False, exist_ok=True)
for img_url in CODEFLEX_IMAGES_URLS:
download_image(images_dir, img_url)
if __name__ == '__main__':
start = time()
main()
logger.info('Download time: %s seconds', time() - start)
The program output:
2021-03-30 19:21:02,370 - __main__ - INFO - Downloaded: https://codeflex.co/wp-content/uploads/2021/01/pandas-dataframe-python-1024x512.png
2021-03-30 19:21:03,145 - __main__ - INFO - Downloaded: https://codeflex.co/wp-content/uploads/2021/02/github-actions-deployment-to-eks-with-kustomize-1024x536.jpg
2021-03-30 19:21:03,890 - __main__ - INFO - Downloaded: https://codeflex.co/wp-content/uploads/2021/02/boto3-s3-multipart-upload-1024x536.jpg
2021-03-30 19:21:04,686 - __main__ - INFO - Downloaded: https://codeflex.co/wp-content/uploads/2018/02/kafka-cluster-architecture.jpg
2021-03-30 19:21:05,433 - __main__ - INFO - Downloaded: https://codeflex.co/wp-content/uploads/2016/09/redis-cluster-topology.png
2021-03-30 19:21:05,433 - __main__ - INFO - Download time: 4.3232290744781494 seconds
So as you can see everything runs here sequentially, we just downloading images one by one and this operation took ~4.3 seconds. If you check codeflex_images
directory you will find there five images.
Now let’s modify our code. We will use asyncio
library for creating coroutine for each download.
import asyncio
from pathlib import Path
import logging
from urllib.request import urlopen, Request
import os
from time import time
import aiohttp
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CODEFLEX_IMAGES_URLS = ['https://codeflex.co/wp-content/uploads/2021/01/pandas-dataframe-python-1024x512.png',
'https://codeflex.co/wp-content/uploads/2021/02/github-actions-deployment-to-eks-with-kustomize-1024x536.jpg',
'https://codeflex.co/wp-content/uploads/2021/02/boto3-s3-multipart-upload-1024x536.jpg',
'https://codeflex.co/wp-content/uploads/2018/02/kafka-cluster-architecture.jpg',
'https://codeflex.co/wp-content/uploads/2016/09/redis-cluster-topology.png']
async def download_image_async(session, dir, img_url):
download_path = dir / os.path.basename(img_url)
async with session.get(img_url) as response:
with download_path.open('wb') as f:
while True:
chunk = await response.content.read(512)
if not chunk:
break
f.write(chunk)
logger.info('Downloaded: ' + img_url)
async def main():
images_dir = Path("codeflex_images")
Path("codeflex_images").mkdir(parents=False, exist_ok=True)
async with aiohttp.ClientSession() as session:
tasks = [(download_image_async(session, images_dir, img_url)) for img_url in CODEFLEX_IMAGES_URLS]
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == '__main__':
start = time()
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main())
finally:
event_loop.close()
logger.info('Download time: %s seconds', time() - start)
The program output:
2021-03-30 19:33:14,574 - __main__ - INFO - Downloaded: https://codeflex.co/wp-content/uploads/2021/02/boto3-s3-multipart-upload-1024x536.jpg
2021-03-30 19:33:14,577 - __main__ - INFO - Downloaded: https://codeflex.co/wp-content/uploads/2021/01/pandas-dataframe-python-1024x512.png
2021-03-30 19:33:14,579 - __main__ - INFO - Downloaded: https://codeflex.co/wp-content/uploads/2016/09/redis-cluster-topology.png
2021-03-30 19:33:14,629 - __main__ - INFO - Downloaded: https://codeflex.co/wp-content/uploads/2018/02/kafka-cluster-architecture.jpg
2021-03-30 19:33:14,637 - __main__ - INFO - Downloaded: https://codeflex.co/wp-content/uploads/2021/02/github-actions-deployment-to-eks-with-kustomize-1024x536.jpg
2021-03-30 19:33:14,641 - __main__ - INFO - Download time: 0.8204469680786133 seconds
Wow! Just 0.8 of a second! It’s more than 5 times faster than the previous run!
I’ll explain the code. As I mentioned before we are using here asyncio
library which orchestrates all the coroutines behavior.
First of all I created an event loop which handles the asynchronous code. The event loop is running until completion and invokes the main()
function. Notice there’s async
keyword before function definition.
The async def
syntax marks a function as a coroutine. In order to get results from a coroutine we must use await
keyword which allows to coroutine to be suspended until awaitable completes. In the same time while the coroutine is awaiting for results some other useful work will be done.
download_image_async()
function is a coroutine where I downloading image by chunks in the while
loop and suspending execution while waiting for the I/O to complete. This allows the event loop to start/continue downloading the different images as each one has new data available during the download.
runs our coroutines sequence concurrently and when all the awaitable results are ready it returns them.await
asyncio.gather(*tasks, return_exceptions=True)
[…] I don’t know why but all of explanations on this topic are too complex or they are using examples with useless asyncio.sleep()… So far the best code sample that I found is this: https://codeflex.co/python3-async-await-example/ […]