Asynchronous code is great if you want to get the most out of your backend system that also sends requests to external systems or does other I/O operations. Refactoring synchronous backend code to get those benefits is not that great, or at least not as easy as I had anticipated.
The post turned out quite lengthy (again) so here’s what it’s about:
Crashing Gunicorn worker - client receives an empty response when blocking I/O operation is executed
Initialization of
ContextVar
in global context - client’s data accessible to other clients
Crashing Gunicorn worker
In the case of my project we were not that much on a hunt for performance gains but wanted to benefit from automatic documentation that FastAPI provides. However, we defined our middlewares and endpoints async
but left most of our code synchronous. And that peculiar mix resulted in crashes of Gunicorn workers.
Minimum reproducible example
Here’s a simplified version of our code
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def read_root():
start = time.time()
# this can be a request sent to another service
# or any other I/O operation
time.sleep(30)
return {f"This took {time.time() - start} seconds to complete"}
What’s important is that there's a blocking operation when handling requests - time.sleep
instead of await asyncio.sleep
.
When the above application is run via a Gunicorn:
gunicorn -k uvicorn.workers.UvicornWorker 'main:app' -b localhost:8000 -t 30
sending a request results in a quite cryptic error
curl localhost:8000
curl: (52) Empty reply from server
which becomes apparent after checking server logs
[2024-02-27 21:57:53 +0100] [18085] [CRITICAL] WORKER TIMEOUT (pid:18110)
[2024-02-27 21:57:54 +0100] [18085] [ERROR] Worker (pid:18110) was sent code 134!
As you probably guess, that’s caused by the blocking sleep in handling the request which purely by accident is the exact value of Gunicorn’s worker timeout. That is clear in case of such a simple code snippet but may be harder to debug in a production code. In our case handling a request involved exchanging quite an amount of requests with an external service (which is notorious for causing integration problems).
What is exactly going on?
The main process of Gunicorn essentially does 2 things:
Forwards requests to it’s workers
Manages workers - starts them up, kills if they are not responsive for a specific length of time (yes, the timeout that is passed to
gunicorn
command), etc.
OK, but the worker is actually alive so why such harsh treatment?
It’s alive but not able to respond to “poking” done by Gunicorn’s main process. Because of the blocking operation the execution control is not handed back to Uvicorn event loop.
Here’s an illustration of what would happen if await asyncio.sleep
was used
In that case the execution would return to the event loop which would then call a function that handles the “poke”.
Let’s use the non-blocking operation then:
import asyncio
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def read_root():
start = time.time()
# this can be a request sent to another service
# or any other I/O operation
await asyncio.sleep(30)
return {f"This took {time.time() - start} seconds to complete"}
The response is then
curl localhost:8000
["This took 30.002015352249146 seconds to complete"]%
and no sign of problems in the server logs
gunicorn -k uvicorn.workers.UvicornWorker 'main:app' -b localhost:8000 -t 30
[2024-02-27 22:40:32 +0100] [21686] [INFO] Starting gunicorn 21.2.0
[2024-02-27 22:40:32 +0100] [21686] [INFO] Listening at: http://127.0.0.1:8000 (21686)
[2024-02-27 22:40:32 +0100] [21686] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2024-02-27 22:40:32 +0100] [21710] [INFO] Booting worker with pid: 21710
[2024-02-27 22:40:33 +0100] [21710] [INFO] Started server process [21710]
[2024-02-27 22:40:33 +0100] [21710] [INFO] Waiting for application startup.
[2024-02-27 22:40:33 +0100] [21710] [INFO] Application startup complete.
[2024-02-27 22:41:53 +0100] [21686] [INFO] Handling signal: winch
Solution, bigger and smaller workarounds
What can be done to avoid such problem:
Employ non-blocking operations. This task is challenging with legacy code. Every function leading to the troublesome blockage requires modification, including decorators. This issue intensifies if the altered code must remain compatible with synchronous operations.
Run the synchronous code in a threadpool. The below code does not block
import time
import anyio.to_thread
from fastapi import FastAPI
from starlette.concurrency import run_in_threadpool
app = FastAPI()
@app.get("/")
async def read_root():
start = time.time()
# this can be a request sent to another service
# or any other I/O operation
await anyio.to_thread.run_sync(time.sleep, 30)
return {f"This took {time.time() - start} seconds to complete"}
This is a great option and Starlette/FastAPI actually uses that “trick” to call synchronous middleware or endpoint. Since web frameworks do it automatically, then it’s also a valid option to give up on asynchronous endpoints if your use case allows it.
- Get rid of Gunicorn. That may sound controversial but that’s actually a valid option if your app is orchestrated by Kubernetes. In that case Kubernetes manages the “workers” (pods) and Gunicorn doesn’t provide big value.
Initialization of ContextVar
in global context
That one was really hard to debug and was caused in big part by how the dependency injection library we use is implemented but it can occur also in case of the code you write yourself. I’ll make the case easier to understand.
Intro: ContextVar in async Python
When processing a request quite often you need to store a state in memory so that it is accessible later in the chain of processing. This could be for example user information (id, email, roles etc.) determined in a middleware based on authentication token and referenced in application or domain code.
Asynchronous code runs, by default, in a single process and thread so if you were to store this data in a global variable, it’d be accessible, or even overwritten by other requests processing.
ContextVar
is a solution for such case. It serves the same goal as threading.local
and can replace it. The value of a ContextVar
will be different for each “chain” of asynchronous processing code
import asyncio
import contextvars
email: contextvars.ContextVar[str] = contextvars.ContextVar("email")
async def get_email() -> str:
return email.get()
async def set_email(value: str) -> None:
print(f"Setting email to {value}")
email.set(value)
async def email_processing(value: str, seconds_of_sleep: int) -> None:
await set_email(value)
await asyncio.sleep(seconds_of_sleep)
print(f"{await get_email()}, expected {value}")
async def main() -> None:
await asyncio.gather(
email_processing("value1@example.com", 1),
email_processing("value2@example.com", 0),
)
if name == "__main__":
asyncio.run(main())
This prints
python contextvar_example.py
Setting email to value1@example.com
Setting email to value2@example.com
value2@example.com, expected value2@example.com
value1@example.com, expected value1@example.com
Using ContextVar
is quite easy but, as you’ll see below, it’s still possible to introduce bugs 🙂
State provided by web frameworks
state
variableGlobal ContextVar
The bug that we implemented was that one of our ContextVar
variable sometimes became global - shared between requests - which caused overwriting data and accessing other users data (thankfully we noticed it before deploying the changes to production).
The following simplified example illustrates the problem:
import asyncio
import contextvars
request_state: contextvars.ContextVar[dict] = contextvars.ContextVar(
"email"
)
async def app_startup() -> None:
request_state.set({})
async def process_request(email: str, processing_time: int) -> None:
state = request_state.get()
state["email"] = email
await asyncio.sleep(processing_time)
print(f"Processing request with {state=}, expected email {email}")
async def main() -> None:
await app_startup()
await asyncio.gather(process_request("value1@example.com", 2), process_request("value2@example.com", 1))
if name == "__main__":
asyncio.run(main())
which results in
python global_context_var.py
Processing request with state={'email': 'value2@example.com'},
expected email value2@example.com
Processing request with state={'email': 'value2@example.com'},
expected email value1@example.com
ContextVar
’s value is set during application startup which makes it shared with subsequent coroutines and/or tasks. When the state is updated during requests processing, it’s also updated for the other coroutine so instead of value1@example.com
the first request is processed with value2@example.com
.
Again, it’s relatively easy to spot the problem in the above exemplary code but most of the time there’s a lot of indirection. In our case there were 2 cases:
Our dependency injection library uses descriptors which were evaluated during application startup and caused a
ContextVar
to be initialized (instances of dependencies are stored in theContextVar
)Due to infrastructure problems an exception during app startup was raised which caused getting an instance of Sentry reporter via dependency injection library… you know the rest.
Solution, workarounds
I can’t think of any pattern that would protect the codebase from introducing such bugs, so sorry, no general solution here. If you are aware of such, please let know in the comments.
We did, however, introduce a safety mechanism - a middleware that checks whether the ContextVar
has a value set, reports an error to Sentry if that’s true, resets the variable and proceeds with processing.
Request for patterns
As you can see, even though asynchronous programming concepts in Python are quite simple it’s possible to get into complex problems. If you know some other problematic cases, patterns for solving them or clever ways to refactor code to support async
, please write in the comments. Same if you’re interested in some other topics related to async
in Python.