Python Async
Stop Waiting, Start Doing - All at Once
Master asynchronous programming in Python. Learn how asyncio and aiohttp let you handle thousands of concurrent tasks without threads, and why every AI engineer needs this skill.
What is Asynchronous Programming?
The Restaurant Analogy
Synchronous vs Asynchronous - Think of a Restaurant:
Imagine a restaurant with one waiter. In synchronous mode, the waiter takes an order, goes to the kitchen, waits until the food is ready, brings it back, and only THEN takes the next order. If cooking takes 20 minutes, 10 customers wait 200 minutes total.
In asynchronous mode, the waiter takes order 1, sends it to the kitchen, and immediately takes order 2, then 3. When food is ready, the waiter picks it up and delivers. Same one waiter, but now 10 customers are served in ~25 minutes!
In Programming Terms:
Synchronous code blocks execution while waiting for I/O (network calls, file reads, database queries). Your program sits idle doing nothing.
Asynchronous code says "I will come back when this is done" and moves on to other work. The CPU stays busy with useful tasks instead of waiting.
This is especially critical for AI applications - when you are calling the OpenAI API, waiting 2-5 seconds per request, you could be processing dozens of other requests simultaneously.
Real-World Impact:
- Swiggy's order tracking service handles 100K+ concurrent WebSocket connections using async Python
- A synchronous approach would need 100K threads = impossible memory usage
- Async Python can handle all 100K connections in a single thread with ~500MB RAM
Note: Async is NOT about making things faster in CPU terms. It is about not wasting time waiting. If your code is CPU-bound (heavy math), async will not help. It shines for I/O-bound tasks (API calls, database queries, file operations).
The Event Loop - The Brain of Async Python
Understanding the Event Loop
What is the Event Loop?
Think of the event loop as a traffic controller at a busy intersection. It continuously checks: "Is any task ready? Has any I/O completed? Does anyone need attention?" and routes execution accordingly.
The event loop is the core of asyncio. It runs in a single thread and manages all your coroutines (async functions). It decides which coroutine gets to run next based on which ones are waiting and which ones are ready.
How It Works - Step by Step:
- 1. Start: You call asyncio.run(main()). This creates an event loop and starts your main coroutine.
- 2. Execute: The coroutine runs until it hits an "await" statement.
- 3. Pause and Switch: At "await", the coroutine says "I am waiting for something." The event loop pauses it and checks if any other coroutine is ready to run.
- 4. Resume: When the awaited operation completes (e.g., API response arrives), the event loop resumes that coroutine from where it paused.
- 5. Repeat: This cycle continues until all coroutines are done.
Key Vocabulary:
- Coroutine: A function defined with "async def". It can be paused and resumed. It is NOT a thread.
- await: The keyword that says "pause me here and let others run until this operation completes."
- Task: A wrapped coroutine that the event loop can schedule. Created with asyncio.create_task().
- Future: A placeholder for a result that does not exist yet but will in the future.
The Golden Rule:
Never block the event loop! If you call time.sleep(5) instead of await asyncio.sleep(5), you freeze the ENTIRE event loop for 5 seconds. No other coroutine can run. It is like the traffic controller falling asleep - everything stops.
Note: Common mistake: using regular 'requests' library inside async code. The requests library is synchronous and will block the event loop. Use aiohttp instead!
asyncio Fundamentals - async/await in Practice
Writing Your First Async Code
The Basic Pattern:
Every async program follows this pattern: define coroutines with "async def", call them with "await", and run the whole thing with asyncio.run().
import asyncio
async def fetch_user_data(user_id):
print(f"Fetching user {user_id}...")
await asyncio.sleep(2) # Simulates API call
return {"id": user_id, "name": f"User_{user_id}"}
async def main():
# Sequential - takes 6 seconds
user1 = await fetch_user_data(1)
user2 = await fetch_user_data(2)
user3 = await fetch_user_data(3)
asyncio.run(main())But wait - this is still sequential! Each await waits for the previous one. To run them concurrently, we need asyncio.gather().
asyncio.gather() - True Concurrency:
async def main():
# Concurrent - takes only 2 seconds!
results = await asyncio.gather(
fetch_user_data(1),
fetch_user_data(2),
fetch_user_data(3)
)
# results = [user1_data, user2_data, user3_data]
asyncio.run(main())asyncio.gather() starts all three coroutines and runs them concurrently. Since each takes 2 seconds but they overlap, total time is ~2 seconds instead of 6!
Error Handling in Async Code:
When using gather(), if one task fails, by default ALL results are lost. Use return_exceptions=True to handle errors gracefully:
results = await asyncio.gather(
fetch_user(1),
fetch_user(2), # This might fail
fetch_user(3),
return_exceptions=True # Do not cancel others on failure
)
for r in results:
if isinstance(r, Exception):
print(f"Failed: {r}")
else:
print(f"Success: {r}")Note: Rule of thumb: Use gather() when you need all results together. Use create_task() when you want to start something in the background and maybe check on it later.
aiohttp - Making Async HTTP Requests
aiohttp: The Async Alternative to requests
Why aiohttp?
The popular "requests" library is synchronous. It blocks the entire thread while waiting for HTTP responses. aiohttp is built from the ground up for async Python. It lets you make hundreds of HTTP requests concurrently without blocking.
Flipkart analogy: Imagine you need to check prices from 100 different sellers. With requests, you call seller 1, wait, call seller 2, wait... 100 sequential calls. With aiohttp, you call all 100 at once and collect responses as they arrive.
Basic aiohttp Usage:
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
# Fetch 10 URLs concurrently
urls = [f"https://api.example.com/item/{i}" for i in range(10)]
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} items")
asyncio.run(main())Key Concept: ClientSession
Always reuse a single ClientSession for multiple requests. Creating a new session per request is wasteful - it is like opening a new phone line for every word you speak. A session maintains connection pooling, cookies, and headers.
- DO: Create one session, make many requests through it
- DO NOT: Create a new aiohttp.ClientSession() inside every request function
- Use "async with" to ensure the session is properly closed when done
Real-World Performance Comparison:
Fetching 100 API endpoints (each takes ~200ms):
- requests (sync): 100 x 200ms = ~20 seconds
- aiohttp (async): All at once = ~200ms (100x faster!)
- requests + ThreadPool(10): 10 batches = ~2 seconds
For AI applications calling LLM APIs, this difference is massive. Processing 50 prompts sequentially at 3 seconds each = 150 seconds. Concurrently = ~3 seconds.
Note: aiohttp is essential for AI automation. When you are batch-processing hundreds of prompts through OpenAI or Claude APIs, async requests can reduce execution time from minutes to seconds.
Semaphores & Rate Limiting - Controlling Concurrency
Do Not Overwhelm the Server!
The Problem with Unlimited Concurrency:
If you fire 10,000 requests simultaneously, you will: (a) get rate-limited by the API, (b) exhaust your system file descriptors, (c) possibly get your IP banned. It is like 10,000 people trying to enter a movie hall at once - chaos!
Zomato analogy: During a flash sale, Zomato does not let all users hit the payment gateway at once. They use a queue system. Similarly, you need to limit how many async requests run concurrently.
asyncio.Semaphore - Your Concurrency Limiter:
A semaphore is like a bouncer at a club. It only lets N people in at a time. When someone leaves, the next person enters.
import asyncio
import aiohttp
semaphore = asyncio.Semaphore(10) # Max 10 concurrent requests
async def fetch_with_limit(session, url):
async with semaphore: # Waits if 10 are already running
async with session.get(url) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
urls = [f"https://api.example.com/item/{i}" for i in range(1000)]
tasks = [fetch_with_limit(session, url) for url in urls]
results = await asyncio.gather(*tasks)
# All 1000 URLs fetched, but only 10 at a timeCommon Pitfalls:
- Creating semaphore inside the function: Each call gets a NEW semaphore - no limiting happens!
- Too low a limit: Setting semaphore to 1 makes everything sequential again. Find the sweet spot.
- Not handling timeouts: Use aiohttp.ClientTimeout to avoid hanging forever on slow responses.
Note: OpenAI API has a rate limit of ~60 RPM on free tier and ~3500 RPM on paid tiers. Always use semaphores when batch-processing to stay within limits.
Real-World Pattern: Batch Processing LLM Calls
AI Engineer's Most Common Async Pattern
The Scenario:
You have 500 product descriptions that need to be summarized using an LLM API. Each API call takes 2-5 seconds. Synchronously, that is 1000-2500 seconds (17-42 minutes). With async and a concurrency limit of 20, it drops to under 2 minutes.
Production-Ready Pattern:
import asyncio
import aiohttp
from typing import List, Dict
class AsyncLLMProcessor:
def __init__(self, api_key: str, max_concurrent: int = 20):
self.api_key = api_key
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
self.errors = []
async def process_single(self, session, prompt, index):
async with self.semaphore:
try:
headers = {"Authorization": f"Bearer {self.api_key}"}
payload = {"model": "gpt-4", "messages": [
{"role": "user", "content": prompt}
]}
async with session.post(
"https://api.openai.com/v1/chat/completions",
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
data = await resp.json()
return {"index": index, "result": data}
except Exception as e:
self.errors.append({"index": index, "error": str(e)})
return None
async def process_batch(self, prompts: List[str]):
async with aiohttp.ClientSession() as session:
tasks = [
self.process_single(session, prompt, i)
for i, prompt in enumerate(prompts)
]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]Key Design Decisions:
- Semaphore limit of 20: Balances speed vs. rate limits. Adjust based on your API tier.
- Timeout of 30 seconds: Prevents hanging on stuck requests.
- Error collection: Errors are collected, not raised. You can retry failed ones later.
- Index tracking: Results may come back out of order. Index helps map back to original prompts.
Note: This batch processing pattern is used daily by AI engineers. Whether you are processing embeddings, generating summaries, or running evaluations - this is the template you will use.
Async vs Threads vs Multiprocessing - When to Use What
Choosing the Right Concurrency Model
The Three Approaches Compared:
| Feature | asyncio | Threading | Multiprocessing |
|---|---|---|---|
| Best for | I/O-bound | I/O-bound | CPU-bound |
| Concurrency | Single thread | Multiple threads | Multiple processes |
| GIL affected? | No (single thread) | Yes | No |
| Memory usage | Very low | Medium | High |
| 1000 tasks | Easy | Risky | Impossible |
| Complexity | Medium | High (race conditions) | Medium |
Decision Flowchart:
- Making API calls / DB queries / File I/O? Use asyncio
- Running ML model inference / heavy math? Use multiprocessing
- Need to call sync libraries in async code? Use asyncio.to_thread() (Python 3.9+)
- Legacy codebase with threading? Keep threading, but consider migration
Python GIL Explained Simply:
Python has a Global Interpreter Lock (GIL) - only one thread can execute Python code at a time. It is like a speaking stick in a meeting: only the person holding it can talk.
This is why threading does NOT speed up CPU work in Python - threads take turns, not run in parallel. But for I/O, threads release the GIL while waiting, so it still helps.
Asyncio sidesteps the GIL entirely because it uses a single thread with cooperative multitasking. No lock contention, no race conditions, no deadlocks.
Note: For AI automation work, you will use asyncio 90% of the time (API calls), multiprocessing 9% (data preprocessing), and threading rarely (legacy library integration).
Frequently Asked Questions
What is Python Async?
Master asynchronous programming in Python. Learn how asyncio and aiohttp let you handle thousands of concurrent tasks without threads, and why every AI engineer needs this skill.
How does Python Async work?
The Restaurant Analogy Synchronous vs Asynchronous - Think of a Restaurant: Imagine a restaurant with one waiter. In synchronous mode , the waiter takes an order, goes to the kitchen, waits until the food is ready, brings it back, and only THEN takes the next order.
Related topics
Practice this on DevInterviewMaster
Read the full Python Async breakdown with interactive demos, quizzes, and Hinglish notes.
800+ system-design, LLD, coding, and design-pattern topics. Unlock everything with Pro (₹499, one-time) or Ultimate (₹999, one-time) — lifetime access, no subscription.