Writing web servers in Python has never been easier. Import fastapi
, hack together some handler code, and off you go. For simple request-based workflows, this works just fine. But Python applications quickly become more complex than your typical tutorial.
Want to maintain state across requests? Or schedule periodic tasks? Suddenly, the elegant simplicity of your code devolves into a tangled mess. Debugging becomes harder, testing in isolation feels impossible, and concerns start blending into one another.
The good news? Design patterns like Hexagonal Architecture and the Actor model can help. By separating your core domain logic from external dependencies like databases, web servers, and third-party services, you can write more maintainable and adaptable code.
Do you really need these fancy design patterns? In the next chapters, we’ll explore the challenges of tightly coupled code and see how Hexagonal Architecture addresses them.
1. A Rough Start
Imagine we’re running LLM inference (think Ollama
or vLLM
) on one of our servers. To optimize performance, we want our worker to process batches of eight requests at a time. We also want the service to log token usage per user in a database. Here’s how we might start:
# endpoints.py
from fastapi import APIRouter, BackgroundTasks, FastAPI
from pydantic import BaseModel
app = FastAPI()
router = APIRouter()
# connection pool as a global
engine = create_engine("postgresql+psycopg2://me@localhost/mydb", pool_size=5)
class CompletionRequest(BaseModel):
prompt: str
# skip auth for now and assume we get some user_id per request
user_id: int
@router.post("/completion")
async def post_job(completion: CompletionRequest):
# vllm running locally exposed via http
data = {"prompts": [completion.prompt]}
url = "http://localhost:8000/generate"
response = requests.post(url, json=data)["completions"].json()[0]
tokens = response["tokens"]["total_tokens"]
# insert token usage
query = "INSERT INTO completions VALUES (:user_id, :tokens)"
with engine.connect() as conn:
conn.execute(query, {"user_id": user_id, "tokens": tokens})
return response["text"]
app.include_router(router)
This doesn’t look too bad at first. But on a second look, issues emerge:
- Batching Requests: How do we group incoming requests for the worker?
- Testing: How can we test without running
vLLM
or a database? - Error Handling: What happens if the worker fails?
The logic is tightly coupled to external dependencies, making it hard to test, debug, or scale. Let’s restart with Hexagonal Architecture.
2. The Wiring
Normally, I’d advocate to get started with the central part of our application, the domain logic. For our example, this is the batching of requests for the worker, which we will encapsulate in the Scheduler
.
To not get hung up in all the (cool) details of the scheduler, we will start with a look at how everything is wired together. Here’s our plan:
- Create a
SchedulerApi
interface to define what we expect from the scheduler. - Introduce a
Shell
class to expose the scheduler’s functionality via HTTP. - Use
FastAPI
internally for HTTP routing (but keep it abstracted). - Apply dependency injection for testing and flexibility.
Scheduler Interface
We start with the interface for the scheduler, which processes requests and returns completions.
# scheduler.py
class CompletionResponse(NamedTuple):
completion: str
tokens: int
class CompletionRequest(BaseModel):
prompt: str
user_id: int
class SchedulerApi(Protocol):
async def complete(self, request: CompletionRequest) -> CompletionResponse: ...
We don’t think about an actual implementation yet, but rather about the input and output types.
Creating the Shell
By providing an HTTP interface for the scheduler, the Shell
serves as the boundary between our application and its users.
It takes a reference to anything implementing SchedulerApi
and a AppConfig
and serves an ASGI compatible application.
# shell.py
from uvicorn import Config, Server
class Shell:
"""Provide user access to the scheduler."""
def __init__(self, config: AppConfig, scheduler: SchedulerApi) -> None:
self.config = config
self.app = build_app(scheduler)
self.server: Server | None = None
async def run(self) -> None:
config = Config(app=self.app, host=self.config.host, port=self.config.port)
self.server = Server(config)
await self.server.serve()
Encapsulating FastAPI
This ASGI
compatible application is built in build_app
. We will use FastAPI
, but this is an implementation detail fully encapsulated inside the Shell
.
# shell.py
from contextlib import asynccontextmanager
from fastapi import FastApi
def build_app(scheduler: SchedulerApi) -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[dict]:
yield {"scheduler": scheduler}
app = FastAPI(lifespan=lifespan)
app.include_router(routes())
return app
By putting the scheduler into the lifetime state of the FastAPI
app we make it available to requests handlers.
The ASGI specification defines this lifetime state as
an empty namespace where the application can persist state to be used when handling subsequent requests. The server will ensure that a shallow copy of the namespace is passed into each subsequent request/response call into the application.
Dependency Injection
To inject the scheduler into the handler functions, we define a simple extractor:
def with_scheduler(request: Request) -> SchedulerApi:
return request.state.scheduler
With this setup, handlers can access the scheduler through dependency injection:
# shell.py
async def complete(
request: CompletionRequest, scheduler: SchedulerApi = Depends(with_scheduler)
) -> str:
# handle incoming completion requests
response = await scheduler.complete(request)
return response.completion
You might have already spotted it - we still are missing the piece of code that builds the routing: `
# shell.py
def health() -> str:
return "ok"
def routes() -> APIRouter:
router = APIRouter()
router.add_api_route("/health", health)
router.add_api_route("/complete", complete, methods=["POST"])
return router
By defining clear boundaries and injecting dependencies, we create a flexible system that’s easy to test and extend. By using a mock implementation for the scheduler, we can validate the HTTP route’s behavior independently of the scheduler’s actual implementation.
# shell_test.py
from fastapi.testclient import TestClient
from src.scheduler import SchedulerApi
from src.shell import build_app
class SpyScheduler(SchedulerApi):
"""Keep track of any incoming completion requests"""
def __init__(self) -> None:
self.requests: list[CompletionRequest] = []
async def complete(self, request: CompletionRequest) -> CompletionResponse:
self.requests.append(prompt)
return CompletionResponse(completion=prompt, tokens=len(prompt))
def test_complete():
# Given an app with a spy scheduler
scheduler = SpyScheduler()
app = build_app(scheduler)
# When we call the jobs route
with TestClient(app) as client:
body = {"prompt": "hello", "user_id": 1}
response = client.post("/complete", json=body)
# Then we get a 200 response and the scheduler has been called
assert response.status_code == 200
assert response.json() == "hello"
assert len(scheduler.requests) == 1
No matter how complicated our domain logic becomes, our HTTP
tests do not care as we have mocked out the scheduler. Yes, FastAPI
provides some convenience helpers to override injected dependencies, but we are fully self-contained. And because we are using lifespan.state
which is part of the ASGI
interface, we are less dependent on FastAPI
in case we ever need to switch frameworks.
3. Owning the Event Loop
Running multiple asynchronous components like the Scheduler
and the Shell
together requires an orchestration piece. To achieve this, we introduce the Application
, which owns the event loop and runs both components as cooperative tasks:
# app.py
class App:
def __init__(self, config: AppConfig) -> None:
self.config = config
async def run(self) -> None:
async with asyncio.TaskGroup() as tg:
self.scheduler = Scheduler()
self.shell = Shell(self.config, self.scheduler)
tg.create_task(self.scheduler.run())
tg.create_task(self.shell.run())
def shutdown(self):
"""Gracefully shut down the application."""
# Implement shutdown logic here
Python task groups run multiple task together. But unlike asyncio.gather
, they also ensure cancelling of related tasks in the event of an error.
Have you ever found it difficult to add periodically running tasks to your FastAPI
application? Owning the event loop makes this trivially easy: Add another task to the task group which loops over an async sleep.
Now that we have done all the bootstrapping, our main function becomes quite simple. It’s purpose is to load the config and run the app:
# main.py
import asyncio
from dotenv import load_dotenv
from src.app import App
from src.config import AppConfig
if __name__ == "__main__":
load_dotenv()
config = AppConfig.from_env()
app = App(config)
try:
asyncio.run(app.run())
except KeyboardInterrupt:
app.shutdown()
Let’s recap: While we have not worked on the scheduler itself, we have defined an interface of the functionality it should offer, and have implemented a web server that can serve this interface. So rather then with the core of the hexagon, we started with an adapter.
4. Finally, the Domain
We are now ready to get started with our domain logic, the part of the application where we create something new and hopefully some value.
Remember the interface for the Scheduler
which we introduced?
# scheduler.py
from typing import NamedTuple, Protocol
from pydantic import BaseModel
class CompletionRequest(BaseModel):
prompt: str
user_id: int
class CompletionResponse(NamedTuple):
completion: str
tokens: int
class SchedulerApi(Protocol):
async def complete(self, request: CompletionRequest) -> CompletionResponse:
...
To facilitate batch processing, the scheduler will communicate with a WorkerApi
, an abstraction over the underlying inference. It’s very similar to the scheduler interface, but takes a batch of prompts and returns a batch of completions:
class WorkerApi(Protocol):
"""A worker that can complete prompts.
Might be an `Ollama` or `vLLM` process on the same machine.
Invocation could be over HTTP.
"""
async def complete(self, prompts: list[str]) -> list[CompletionResponse]:
...
Now we turn to the Scheduler
. It it supposed to collect requests and, once it has accumulated eight of them, send them to the worker.
To make our tests green, we start of with a simple implementation that just forwards each individual request directly to the worker:
class Scheduler(SchedulerApi):
"""Batch completion jobs and send them to a worker."""
def __init__(self, worker: WorkerApi) -> None:
self.worker = worker
async def complete(self, prompt: str) -> CompletionResponse:
return self.worker.complete([prompt])[0]
Notice how we can already set up a SpyWorker
that offers the WorkerApi
and test the scheduler?
# scheduler_test.py
import asyncio
class SpyWorker(WorkerApi):
"""Stub worker implementation that returns the prompt as the completion."""
def __init__(self):
self.counter = 0
async def complete(self, prompts: list[str]) -> list[CompletionResponse]:
self.counter += 1
return [CompletionResponse(prompt, len(prompt)) for prompt in prompts]
async def test_scheduler_completes():
# Given a running scheduler
worker = SpyWorker
scheduler = Scheduler(worker)
loop = asyncio.get_event_loop()
task = loop.create_task(scheduler.run())
# When doing 8 completion request
result = await asyncio.gather([scheduler.complete("Hi") for _ in range(8)])
# Then the prompt is returned, and the scheduler has only been called once
result[0].completion == "Hi"
assert worker.counter == 1
We receive an AssertionError
from the last line: The worker has been called eight times, instead of one time. If we want the Scheduler
to send batches of say eight requests to the worker, we need to adapt our code.
If we want to send batches, we need some internal state to buffer requests. For a start, we append each incoming request to an internal list. The run
method continuously checks if we have accumulated eight requests and then forwards them to the worker.
class Scheduler(SchedulerApi):
"""Batch completion jobs and send them to a worker."""
BATCH_SIZE = 8
def __init__(self, worker: WorkerApi) -> None:
self.incoming: list[str] = []
self.worker = worker
async def complete(self, prompt: str) -> CompletionResponse:
self.incoming.append(prompt)
async def run():
while True:
if len(self.incoming) == self.BATCH_SIZE:
await self.worker.complete(self.incoming)
self.incoming = []
Surely enough, our tests are still failing.
There are two things wrong with our implementation: First, we need a way to send the results back to the caller. And second, the scheduler should only be woken up once a new request is coming in. Otherwise the while loop will burn up all of our CPU and will never yield to other tasks.
The first problem is solved by using the asyncio.Future
class. It represents the result of a computation that can be awaited. We introduce an internal CompletionTask
, which holds a future allowing the scheduler to respond:
class CompletionTask(NamedTuple):
prompt: str
future: Future[CompletionResponse]
class Scheduler(SchedulerApi):
"""Batch completion jobs and send them to a worker."""
BATCH_SIZE = 8
def __init__(self, worker: WorkerApi) -> None:
self.incoming: list[CompletionTask] = []
self.worker = worker
async def complete(self, prompt: str) -> CompletionResponse:
future: Future[WorkerResponse] = Future()
self.incoming.append(CompletionTask(prompt, future))
return await future
async def run():
while True:
if len(self.incoming) == self.BATCH_SIZE:
prompts = [task.prompt for task in self.incoming]
futures = [task.future for task in self.incoming]
results = await self.worker.complete(prompts)
for future, result in zip(futures, results):
future.set_result(result)
self.incoming = []
That’s a lot better. We use the Future to communicate the result back to the caller.
Now to the second problem: We need an async
way to tell scheduler.run
that a new request is coming in. For this, we can use asyncio.Queue
:
from asyncio import Future, Queue
class Scheduler(SchedulerApi):
"""Batch completion jobs and send them to a worker."""
BATCH_SIZE = 8
def __init__(self, worker: WorkerApi) -> None:
self.incoming: Queue[CompletionTask] = Queue()
self.scheduled: list[CompletionTask] = []
self.worker = worker
async def complete(self, prompt: str) -> CompletionResponse:
future: Future[WorkerResponse] = Future()
self.incoming.append(CompletionTask(prompt, future))
return await future
async def run():
while True:
task = await self.incoming.get()
self.scheduled.append(task)
if len(self.scheduled) == self.BATCH_SIZE:
prompts = [task.prompt for task in self.incoming]
futures = [task.future for task in self.incoming]
results = await self.worker.complete(prompts)
for future, result in zip(futures, results):
future.set_result(result)
self.scheduled = []
Now the scheduler only gets woken up once a new request comes in. Did you notice that a bit of an Actor pattern sneaked in? The scheduler exposes an API which uses an async queue to communicate with the run method. The run method itself is could be running as a separate task.
But we created an additional problem:
If we have a worker that hangs forever (the great thing is that we can test with a SaboteurWorker
), we block the entire loop of the scheduler. We need to adapt our run method to not wait for the result of the worker:
async def run():
while True:
task = await self.incoming.get()
self.scheduled.append(task)
if len(self.scheduled) == self.BATCH_SIZE:
asyncio.create_task(self.complete_batch(self.incoming))
self.scheduled = []
async def complete_batch(self, tasks: list[CompletionTask]) -> None:
"""Complete a batch of tasks and send the result to the caller."""
results = await self.worker.complete([t.prompt for t in tasks])
for task, result in zip(tasks, results):
task.future.set_result(result)
Great. With just a few lines, we have updated the scheduler to run in the background, collect requests, send them to a worker, and give the results back.
But as always, life is more complicated. We don’t want clients to wait forever in case no one else is making a request. So we need a timeout after which a completion to the worker is always scheduled. We introduce a next_run
variable, which keeps track of the time of the next run. When receiving a task and no tasks are scheduled yet, we set the timeout as the current time plus some wait interval.
There are two cases where we schedule a run: In case we have reached our batch limit and if one request has been waiting for at least max_wait_time
.
async def run():
loop = asyncio.get_running_loop()
self.next_run = sys.maxsize
while True:
try:
async with asyncio.timeout_at(self.next_run):
task = await self.incoming.get()
if len(self.scheduled) == 0:
self.next_run = loop.time() + self.MAX_WAIT_TIME
self.scheduled.append(task)
except asyncio.TimeoutError:
pass
if len(self.scheduled) == self.BATCH_SIZE or loop.time() >= self.next_run:
self.worker_tasks.append(
asyncio.create_task(self.complete_batch(self.scheduled))
)
self.scheduled = []
self.next_run = sys.maxsize
There are other possibilities on how to implement this. Instead of introducing next_run
, we could for example also break every 100 ms
and check how long all scheduled requests have been waiting.
5. The Repository Pattern
As a last exercise, we want to store the token usage per user in a database. Unsurprisingly, we again start thinking about the interface that the database should offer:
class Repository(Protocol):
async def store_token_usage(self, user_id: int, tokens: int) -> None: ...
We update the scheduler to take in a repository on initialisation and to store token usage when receiving the results from the worker.
class Scheduler(SchedulerApi):
"""Batch completion jobs and send them to a worker."""
def __init__(self, worker: WorkerApi, repository: Repository) -> None:
self.worker = worker
self.repository = repository
...
async def complete_batch(self, tasks: list[CompletionTask]) -> None:
results = await self.worker.complete([task.request.prompt for task in tasks])
for task, result in zip(tasks, results):
task.future.set_result(result)
await self.repository.store_token_usage(task.request.user_id, result.tokens)
Later, we might optimize this code to only make one database request per worker call. But this change is already enough such we can test that the repository has been called:
# scheduler_test.py
class SpyRepository(Repository):
def __init__():
self.inserts = []
async def store_token_usage(self, user_id: int, tokens: int) -> None:
self.inserts.append((user_id, tokens))
def test_repository_is_called():
# Given a scheduler with a SpyRepository
repository = SpyRepository()
scheduler = Scheduler(repository=repository, ...)
loop = asyncio.get_event_loop()
loop.run_task(scheduler.run())
# When a completion is requested
request = CompletionRequest(prompt="prompt", user_id=1)
await scheduler.complete(request)
# Then the repository has been invoked
assert repository.inserts == [(1, len("prompt"))]
In production, we would probably use a postgres
database and introduce a repository that owns the connection pool:
# repository.py
from sqlalchemy.ext.asyncio import create_async_engine
class PostgresRepository(Repository):
def __init__(self) -> None:
self.engine = create_async_engine("connection_string", pool_size=5)
async def store_token_usage(self, user_id: int, tokens: int) -> None:
async with self.engine.begin() as connection:
stmt = "INSERT INTO token_usage (user_id, tokens) VALUES ($1, $2)"
await connection.execute(stmt, user_id, tokens)
Note that compared to the very initial version, the connection pool now is not a global variable, but stored as a member variable in the PostgresRepository
. We can test the Scheduler
and the Shell
with stub repositories in our tests, and only introduce the postgres
when wiring everything up in the Application
:
# app.py
class Application:
async def run(self) -> None:
async with asyncio.TaskGroup() as tg:
worker = Worker()
repository = PostgresRepository()
self.scheduler = Scheduler(worker, repository)
self.shell = Shell(self.config, self.scheduler)
tg.create_task(self.scheduler.run())
tg.create_task(self.shell.run())
6. Errors
One big topic we have not covered is non-happy code paths. Unsurprisingly, this is also an area where Hexagonal Architecture makes our life more happy. Similar to how we defined interfaces for the Repository
, we can define a RepositoryError
and introduce a SaboteurRepository
in our tests to understand how the Scheduler
should handle these errors. We might even introduce an RecoverableRepositoryError
and assert that this is handled differently. And we can test that the Shell
converts domain errors into the appropriate HTTP
status codes.
Did you notice that we have been using the same representation of CompletionRequest
in the Shell
and the Scheduler
? As our application gets more complex, the CompletionRequest
might become more concerned with parsing the HTTP
request. In this case, these two representations should diverge. No decision in any of the adapters should influence your domain logic and entities.
Conclusion
You have made it through a blog post which arguably turned a bit too long. I’m not hoping for your to go out there and start preaching Hexagonal Architecture or whatever new fancy named design pattern someone else has come up with.
But I do hope you see there is a cost involved in not separating your domain logic from the connectors to external services, data sources and inputs. This cost is that you will always be afraid that swapping your web framework or database will cause side effects in other parts of your codebase. You will always be wondering how that database error will propagate to your user (until you find out by getting woken up by a phone alert in the middle of the night). And your confidence in your changes will be lower, preventing you from fast and frequent deploys.
Start small, focus on the core domain, and gradually decouple components. The payoff in maintainability and happiness will be worth the effort.
Find the full code for this blog post on GitHub.
This post was inspired by this awesome article about Hexagonal Architecture and design patterns in Rust.