Python gets a lot of flak for its performance story. However, the introduction of Aysncio into the standard library goes someway to resolving some of those performance problems. There is now a wide choice of libraries which make use of the new async/await syntax, including a number of server implementations.
The Aiohttp library comes with both a client and server. However, today I want to focus on the server and one of my favourite features – background tasks. Typically, when building a Python based micro-service with Flask, you might have a background task running in something like Celery. While these background tasks are more limited than Celery tasks, they allow you to run tasks in the background while still receiving requests.
A Simple Example
I have written code, which provides you with a simple example of how you can use such a background task. We are going to write a server that has one endpoint. This endpoint allows a user to post a JSON dictionary containing a URL. This URL is then sent to a thread pool where it is immediately scraped without blocking the users request. Once we have the data we need from the URL this placed in queue which is then processed by our background task which simply posts the data to another endpoint.
Get & Post Requests
For this we are going to need to implement a post and get request method. Our get request is going to be run in a thread pool so we can use the ever-popular requests library to grab our page. However, our post-request is going to be made inside an async background task, so itself must be asynchronous. Otherwise we would end up blocking the event loop.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
async def post_request(url, json, proxy=None): async with aiohttp.ClientSession() as client: try: async with client.post(url, json=json, proxy=proxy, timeout=60) as response: html = await response.text() return {'html': html, 'status': response.status} except aiohttp.ClientError as err: return {'error': err} def get_request(url): try: res = requests.get(url) return {'html': res.text, 'status': res.status_code, 'url': res.url, 'original_url': url} except requests.RequestException: return |
Both of these small functions are pretty basic and don’t do much in the way of error or logging, but are enough to demonstrate the workings of a background task.
Initialising the Server and Setting Up Our Route
1 2 3 4 5 6 7 8 9 10 |
class ScraperServer: def __init__(self, host, port): self.host = host self.port = port self.pool = ThreadPoolExecutor(max_workers=20) self.loop = asyncio.get_event_loop() self.data_to_save = deque([]) self.example_endpoint = 'http://127.0.0.1:8000' |
We begin by initialising our server class by passing a port and host. We also define a thread pool, which will use to run our synchronous get requests. If you had a long running CPU bound task, you could instead use a process pool in much the same way. We finally create a queue using deque from the collections module, allowing us to easily append and pop data from our queue. It is this queue that our background task will process. Finally, we have the example endpoint which we will use to post our data off to.
1 2 3 4 5 6 7 8 9 10 11 12 |
async def get_urls(self, request): data = await request.json() url = data.get('url') if url: t = self.loop.run_in_executor(self.pool, get_request, url) t.add_done_callback(self.scrape_callback) return web.json_response({'Status': 'Dispatched'}) def scrape_callback(self, return_value): return_value = return_value.result() if return_value: self.data_to_save.append(return_value) |
We then move onto defining our async view. This particular view is very simple, we simply await the JSON from the incoming request. Then we attempt to grab the URL from the provided JSON. If the JSON contains a URL, we send the URL to our get_request function which is then executed within the thread pool. This allows us to return a response to the person making a request without blocking. We add a call back to this which will be executed, once the request is completed. The callback simply puts the data in our queue which will be processed by our background task.
Creating & Registering The Background Task
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
async def process_queue(self): while True: if self.data_to_save: data_to_post = self.data_to_save.popleft() print('Sending URL data to database') res = await post_request(self.example_endpoint, data_to_post) else: await asyncio.sleep(0.1) async def start_background_tasks(self, app): app['dispatch'] = app.loop.create_task(self.process_queue()) async def cleanup_background_tasks(self, app): app['dispatch'].cancel() await app['dispatch'] |
Our background task is very simple. It is simply an async function which contains a while True loop. Inside this loop we check if there are any items in the queue to be post to our dummy server. If there are any items, we pop these items and make an async post request. If there are no items we await asyncio.sleep. This is very important. Without putting the await statement here, we could end up in a situation where our background task never gives up the event loop to incoming server requests.
We then define two async functions which simply take in our yet to be created app and then add a task to the event loop. This allows the background task to be run in the same event loop as the server and cancel when the server itself is shut down.
The Most Complicated Bit: Creating Our App
1 2 3 4 5 6 7 8 9 10 11 |
async def create_app(self): app = web.Application() app.router.add_post('/', self.get_urls) return app def run_app(self): loop = self.loop app = loop.run_until_complete(self.create_app()) app.on_startup.append(self.start_background_tasks) app.on_cleanup.append(self.cleanup_background_tasks) web.run_app(app, host=self.host, port=self.port) |
This part of the code is the most confusing. Our create app function simply returns a web app with our route added to the server’s homepage. In the run app method we then run this application forever within our event loop. Appending the tasks which are to be run on start up and shut down of the server. We can then finally pass our app to the web.run_app function to be run on our specified host and port.
Complete Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
import asyncio from collections import deque from concurrent.futures import ThreadPoolExecutor from random import choice from aiohttp import web, ClientSession, ClientError import requests async def post_request(url, json, proxy=None): async with ClientSession() as client: try: async with client.post(url, json=json, proxy=proxy, timeout=60) as response: html = await response.text() return {'html': html, 'status': response.status} except ClientError as err: return {'error': err} def get_request(url): try: res = requests.get(url) return {'html': res.text, 'status': res.status_code, 'url': res.url, 'original_url': url} except requests.RequestException: return class ScraperServer: def __init__(self, host, port): self.host = host self.port = port self.pool = ThreadPoolExecutor(max_workers=20) self.loop = asyncio.get_event_loop() self.data_to_save = deque([]) self.example_endpoint = 'http://127.0.0.1:8000' async def get_urls(self, request): data = await request.json() url = data.get('url') if url: t = self.loop.run_in_executor(self.pool, get_request, url) t.add_done_callback(self.scrape_callback) return web.json_response({'Status': 'Dispatched'}) def scrape_callback(self, return_value): return_value = return_value.result() if return_value: self.data_to_save.append(return_value) async def process_queue(self): while True: if self.data_to_save: data_to_post = self.data_to_save.popleft() print('Sending URL data to database') res = await post_request(self.example_endpoint, data_to_post) else: await asyncio.sleep(0.1) async def start_background_tasks(self, app): app['dispatch'] = app.loop.create_task(self.process_queue()) async def cleanup_background_tasks(self, app): app['dispatch'].cancel() await app['dispatch'] async def create_app(self): app = web.Application() app.router.add_post('/', self.get_urls) return app def run_app(self): loop = self.loop app = loop.run_until_complete(self.create_app()) app.on_startup.append(self.start_background_tasks) app.on_cleanup.append(self.cleanup_background_tasks) web.run_app(app, host=self.host, port=self.port) if __name__ == '__main__': s = ScraperServer(host='127.0.0.1', port=5002) s.run_app() |
We now have a simple server which takes requests, deals with them and then processes them in the background. This can be very powerful and can be used to create servers which can process long running tasks by using these tasks in conjunction with thread and process pools.