What Python’s asyncio primitives get wrong about shared state

What Python’s asyncio primitives get wrong about shared state

The Async Coordination Problem That’s Breaking Python Apps (And How We Fixed It)

In the high-stakes world of concurrent Python development, one problem keeps tripping up even seasoned engineers: coordinating async tasks around shared state. While Python’s asyncio provides tools like asyncio.Event and asyncio.Condition, they each have critical gaps that only surface under real-world concurrency pressure.

This is exactly what we encountered while building Inngest’s Python SDK, where multiple async handlers needed to coordinate around WebSocket connection state. What started as a simple coordination problem evolved into a deep dive into asyncio’s limitations and ultimately, a robust solution that handles every edge case we threw at it.

The Scenario: A Connection State Machine

Imagine an async Python application managing a WebSocket connection that transitions through these states:

disconnected → connecting → connected → closing → closed

One of your concurrent handlers needs to drain pending requests when the connection starts shutting down. It must wait for the closing state:

python
state = “disconnected”

async def drain_requests():

Wait for the connection to start closing

# Drain pending requests
print("draining pending requests")

Simple enough, right? Let’s explore how each standard library tool handles this coordination challenge.

Attempt 1: Polling – The CPU-Wasting Approach

The most obvious solution: check the value in a loop with sleep intervals.

python
async def drain_requests():
while state != “closing”:
await asyncio.sleep(0.1)
print(“draining pending requests”)

This works, but the tradeoffs are brutal:

  • Latency vs. Efficiency: Short sleep intervals waste CPU cycles; long ones add unacceptable latency. There’s no sweet spot.
  • Duplication: Every consumer reimplements the same polling loop with identical tradeoffs.
  • No Event-Driven Wake: The consumer runs whether or not anything changed, wasting resources.

We need something smarter—something that sleeps until the state actually changes, not for arbitrary durations.

Attempt 2: asyncio.Event – The Boolean Trap

asyncio.Event is Python’s answer to “wake me up when something happens”:

python
closing_event = asyncio.Event()

async def drain_requests():
await closing_event.wait()
print(“draining pending requests”)

No polling, no wasted cycles. The handler blocks until the event fires. But here’s the problem: Event is boolean—it’s either set or unset. Our connection has five states, but drain_requests only cares about one.

What happens when another handler needs to wait for connected? You need a second event. A third handler waiting for “not disconnected”? A third event with inverted logic.

The setter becomes a coordination nightmare:

python
closing_event = asyncio.Event()
connected_event = asyncio.Event()

async def set_state(new_state):
global state
state = new_state
if new_state == “closing”:
closing_event.set()
if new_state == “connected”:
connected_event.set()

Every new condition requires another Event object. The coordination between events is where bugs live. Forget a set() or clear() call and a consumer blocks forever.

Attempt 3: asyncio.Condition – The Lost Update Problem

asyncio.Condition lets consumers wait on arbitrary predicates:

python
state = “disconnected”
condition = asyncio.Condition()

async def drain_requests():
async with condition:
await condition.wait_for(lambda: state == “closing”)
print(“draining pending requests”)

One coordination point, arbitrary predicates, no proliferation of Event objects. This is much better.

But it breaks under a common pattern.

The Lost Update: When State Changes Too Fast

Condition is designed to check the current value when a consumer wakes up. That’s fine when state only moves forward, but it falls apart when transitions are fast.

When the setter changes state, it calls notify_all(), which schedules wakeups for every waiting consumer. But in a single-threaded event loop, no consumer actually runs until the current coroutine yields. If the value changes again before that happens, consumers wake up and re-evaluate their predicate against the current value, not the value that triggered the notification.

The predicate fails and the consumer goes back to sleep, potentially forever.

Here’s a runnable reproduction:

python
import asyncio

state = “disconnected”
condition = asyncio.Condition()

async def set_state(new_state):
global state
async with condition:
state = new_state
condition.notify_all()

async def drain_requests():
async with condition:
await condition.wait_for(lambda: state == “closing”)
print(“draining pending requests”)

async def main():
task = asyncio.create_task(drain_requests())
await asyncio.sleep(0)

await set_state("closing")
await set_state("closed")

await asyncio.wait_for(task, timeout=1.0)

asyncio.run(main())

The value was "closing", but by the time drain_requests wakes and checks, it’s already "closed". The intermediate state is gone.

This isn’t a contrived edge case. In our SDK’s connection manager, a close signal can arrive and the connection can shut down in the same event loop tick. drain_requests never runs, and any in-flight work just disappears.

The Fix: Per-Consumer Queues

Instead of waking consumers and asking “is the current state what you want?”, buffer every transition into a per-consumer queue. Each consumer drains its own queue and checks each transition individually. The consumer never misses a state.

Each consumer registers its own asyncio.Queue. When the value changes, the setter pushes (old, new) into every registered queue. Here’s a simplified version that illustrates the core idea:

python
class ValueWatcher:
def init(self, initial_value):
self._value = initial_value
self._watch_queues: list[asyncio.Queue] = []

@property
def value(self):
    return self._value

@value.setter
def value(self, new_value):
    if new_value == self._value:
        return

    old_value = self._value
    self._value = new_value

    for queue in self._watch_queues:
        queue.put_nowait((old_value, new_value))

async def wait_for(self, target):
    queue = asyncio.Queue()
    self._watch_queues.append(queue)

    try:
        if self._value == target:
            return

        while True:
            old, new = await queue.get()
            if new == target:
                return
    finally:
        self._watch_queues.remove(queue)

wait_for registers a queue, checks the current value, then drains transitions until it finds a match. The try/finally ensures the queue gets deregistered even if the caller cancels.

The queue buffers and delivers every intermediate transition in order, even if the value changes multiple times before a consumer runs.

Making It Production-Ready

We still need a handful of features to make it production-ready. Our final implementation needs:

  • Thread Safety: A threading.Lock protects the value and queue list. Each queue is paired with its event loop, and the setter uses loop.call_soon_threadsafe instead of put_nowait directly.
  • Atomic Registration: wait_for checks the current value and registers the queue inside the same lock acquisition, closing the race where a transition could slip between registration and the initial check.
  • Full Generic Typing: Generic[T] end-to-end, so predicates, queues, and return values are all type-checked.
  • Predicate-Based Matching: wait_for, wait_for_not, and wait_for_not_none all route through a shared _wait_for_condition(predicate) core.
  • Timeouts: Every wait method accepts an optional timeout, backed by asyncio.wait_for.
  • Conditional Set: set_if atomically sets the value only when the current value satisfies a predicate, useful for state machine transitions that should only happen from a specific state.
  • Change Watching: wait_for_change waits for any transition regardless of value, handy for logging or reacting to state churn.
  • Callback API: on_change and on_value for synchronous consumers alongside the async wait API.
  • Resilient Notifications: The setter catches RuntimeError (closed loop) and suppresses callback exceptions so one failure doesn’t block other consumers.

The full implementation is about 300 lines, most of which is docstrings and convenience methods built on the same core.

One Caveat

The setter deduplicates by equality: if the new value == the current value, no notification fires. This works well for enums, strings, and ints, but mutating a mutable object in place and reassigning the same reference won’t trigger consumers (because obj == obj is trivially True). Stick to immutable values and this isn’t a concern.

Wrapping Up

The core insight is simple: asyncio.Condition asks consumers “is the current state what you want?” when it should ask “did the state ever become what you want?” Per-consumer queues make that possible by buffering every transition instead of just notifying about the latest one.

We use ValueWatcher throughout Inngest’s Python SDK to coordinate WebSocket connection state, worker lifecycle, and graceful shutdown. If you’re managing shared mutable state in asyncio, give it a try.


Tags: asyncio, concurrency, Python, async coordination, state management, WebSocket, event loops, thread safety, ValueWatcher, Inngest SDK

Viral Phrases: “The Async Coordination Problem That’s Breaking Python Apps,” “The Lost Update Problem,” “Per-Consumer Queues: The Game-Changing Solution,” “Why Your asyncio.Condition is Failing Under Pressure,” “The 300-Line Fix That Saved Our SDK”

Viral Sentences: “Python’s asyncio tools are broken for real-world concurrency,” “Your connection state transitions are disappearing in the event loop,” “The setter changes state, but consumers wake up too late,” “Buffering every transition instead of just notifying about the latest one,” “This isn’t a contrived edge case—it’s happening in production right now”

,

0 replies

Leave a Reply

Want to join the discussion?
Feel free to contribute!

Leave a Reply

Your email address will not be published. Required fields are marked *