Writing web servers in Python has never been easier. Import fastapi, hack together some handler code, and off you go. For simple request-based workflows, this works just fine. But Python applications quickly become more complex than your typical tutorial.

Want to maintain state across requests? Or schedule periodic tasks? Suddenly, the elegant simplicity of your code devolves into a tangled mess. Debugging becomes harder, testing in isolation feels impossible, and concerns start blending into one another.

The good news? Design patterns like Hexagonal Architecture and the Actor model can help. By separating your core domain logic from external dependencies like databases, web servers, and third-party services, you can write more maintainable and adaptable code.

Do you really need these fancy design patterns? In the next chapters, we’ll explore the challenges of tightly coupled code and see how Hexagonal Architecture addresses them.

1. A Rough Start

Imagine we’re running LLM inference (think Ollama or vLLM) on one of our servers. To optimize performance, we want our worker to process batches of eight requests at a time. We also want the service to log token usage per user in a database. Here’s how we might start:

# endpoints.py
from fastapi import APIRouter, BackgroundTasks, FastAPI
from pydantic import BaseModel


app = FastAPI()
router = APIRouter()

# connection pool as a global
engine = create_engine("postgresql+psycopg2://me@localhost/mydb", pool_size=5)


class CompletionRequest(BaseModel):
	prompt: str
	# skip auth for now and assume we get some user_id per request
	user_id: int
	

@router.post("/completion")
async def post_job(completion: CompletionRequest):
	# vllm running locally exposed via http
	data = {"prompts": [completion.prompt]}
	url = "http://localhost:8000/generate"
	response = requests.post(url, json=data)["completions"].json()[0]
	tokens = response["tokens"]["total_tokens"]

	# insert token usage
	query = "INSERT INTO completions VALUES (:user_id, :tokens)"
	with engine.connect() as conn:
		conn.execute(query, {"user_id": user_id, "tokens": tokens})
	
	return response["text"]


app.include_router(router)

This doesn’t look too bad at first. But on a second look, issues emerge:

  • Batching Requests: How do we group incoming requests for the worker?
  • Testing: How can we test without running vLLM or a database?
  • Error Handling: What happens if the worker fails?

The logic is tightly coupled to external dependencies, making it hard to test, debug, or scale. Let’s restart with Hexagonal Architecture.

2. The Wiring

Normally, I’d advocate to get started with the central part of our application, the domain logic. For our example, this is the batching of requests for the worker, which we will encapsulate in the Scheduler.

To not get hung up in all the (cool) details of the scheduler, we will start with a look at how everything is wired together. Here’s our plan:

  1. Create a SchedulerApi interface to define what we expect from the scheduler.
  2. Introduce a Shell class to expose the scheduler’s functionality via HTTP.
  3. Use FastAPI internally for HTTP routing (but keep it abstracted).
  4. Apply dependency injection for testing and flexibility.

Scheduler Interface

We start with the interface for the scheduler, which processes requests and returns completions.

# scheduler.py
class CompletionResponse(NamedTuple):
	completion: str
	tokens: int


class CompletionRequest(BaseModel):
    prompt: str
    user_id: int


class SchedulerApi(Protocol):
    async def complete(self, request: CompletionRequest) -> CompletionResponse: ...

We don’t think about an actual implementation yet, but rather about the input and output types.

Creating the Shell

By providing an HTTP interface for the scheduler, the Shell serves as the boundary between our application and its users.

It takes a reference to anything implementing SchedulerApi and a AppConfig and serves an ASGI compatible application.

# shell.py
from uvicorn import Config, Server


class Shell:
    """Provide user access to the scheduler."""

    def __init__(self, config: AppConfig, scheduler: SchedulerApi) -> None:
        self.config = config
        self.app = build_app(scheduler)
        self.server: Server | None = None

    async def run(self) -> None:
        config = Config(app=self.app, host=self.config.host, port=self.config.port)
        self.server = Server(config)
        await self.server.serve()

Encapsulating FastAPI

This ASGI compatible application is built in build_app. We will use FastAPI, but this is an implementation detail fully encapsulated inside the Shell.

# shell.py
from contextlib import asynccontextmanager
from fastapi import FastApi


def build_app(scheduler: SchedulerApi) -> FastAPI:
    @asynccontextmanager
    async def lifespan(app: FastAPI) -> AsyncIterator[dict]:
        yield {"scheduler": scheduler}

    app = FastAPI(lifespan=lifespan)
    app.include_router(routes())
    return app

By putting the scheduler into the lifetime state of the FastAPI app we make it available to requests handlers.

The ASGI specification defines this lifetime state as

an empty namespace where the application can persist state to be used when handling subsequent requests. The server will ensure that a shallow copy of the namespace is passed into each subsequent request/response call into the application.

Dependency Injection

To inject the scheduler into the handler functions, we define a simple extractor:

def with_scheduler(request: Request) -> SchedulerApi:
    return request.state.scheduler

With this setup, handlers can access the scheduler through dependency injection:

# shell.py
async def complete(
    request: CompletionRequest, scheduler: SchedulerApi = Depends(with_scheduler)
) -> str:
	# handle incoming completion requests
    response = await scheduler.complete(request)
    return response.completion

You might have already spotted it - we still are missing the piece of code that builds the routing: `

# shell.py
def health() -> str:
    return "ok"


def routes() -> APIRouter:
    router = APIRouter()
    router.add_api_route("/health", health)
    router.add_api_route("/complete", complete, methods=["POST"])
    return router

By defining clear boundaries and injecting dependencies, we create a flexible system that’s easy to test and extend. By using a mock implementation for the scheduler, we can validate the HTTP route’s behavior independently of the scheduler’s actual implementation.

# shell_test.py
from fastapi.testclient import TestClient
from src.scheduler import SchedulerApi
from src.shell import build_app


class SpyScheduler(SchedulerApi):
	"""Keep track of any incoming completion requests"""
    def __init__(self) -> None:
        self.requests: list[CompletionRequest] = []

    async def complete(self, request: CompletionRequest) -> CompletionResponse:
        self.requests.append(prompt)
        return CompletionResponse(completion=prompt, tokens=len(prompt))


def test_complete():
    # Given an app with a spy scheduler
    scheduler = SpyScheduler()
    app = build_app(scheduler)

    # When we call the jobs route
    with TestClient(app) as client:
	    body = {"prompt": "hello", "user_id": 1}
        response = client.post("/complete", json=body)

    # Then we get a 200 response and the scheduler has been called
    assert response.status_code == 200
    assert response.json() == "hello"
    assert len(scheduler.requests) == 1

No matter how complicated our domain logic becomes, our HTTP tests do not care as we have mocked out the scheduler. Yes, FastAPI provides some convenience helpers to override injected dependencies, but we are fully self-contained. And because we are using lifespan.state which is part of the ASGI interface, we are less dependent on FastAPI in case we ever need to switch frameworks.

3. Owning the Event Loop

Running multiple asynchronous components like the Scheduler and the Shell together requires an orchestration piece. To achieve this, we introduce the Application, which owns the event loop and runs both components as cooperative tasks:

# app.py
class App:
    def __init__(self, config: AppConfig) -> None:
        self.config = config

    async def run(self) -> None:
        async with asyncio.TaskGroup() as tg:
            self.scheduler = Scheduler()
            self.shell = Shell(self.config, self.scheduler)

            tg.create_task(self.scheduler.run())
            tg.create_task(self.shell.run())

	def shutdown(self):
		"""Gracefully shut down the application."""
		# Implement shutdown logic here

Python task groups run multiple task together. But unlike asyncio.gather , they also ensure cancelling of related tasks in the event of an error.

Have you ever found it difficult to add periodically running tasks to your FastAPI application? Owning the event loop makes this trivially easy: Add another task to the task group which loops over an async sleep.

Now that we have done all the bootstrapping, our main function becomes quite simple. It’s purpose is to load the config and run the app:

# main.py
import asyncio

from dotenv import load_dotenv

from src.app import App
from src.config import AppConfig


if __name__ == "__main__":
    load_dotenv()
    config = AppConfig.from_env()
    app = App(config)

    try:
        asyncio.run(app.run())
    except KeyboardInterrupt:
        app.shutdown()

Let’s recap: While we have not worked on the scheduler itself, we have defined an interface of the functionality it should offer, and have implemented a web server that can serve this interface. So rather then with the core of the hexagon, we started with an adapter.

4. Finally, the Domain

We are now ready to get started with our domain logic, the part of the application where we create something new and hopefully some value.

Remember the interface for the Scheduler which we introduced?

# scheduler.py
from typing import NamedTuple, Protocol
from pydantic import BaseModel


class CompletionRequest(BaseModel):
    prompt: str
    user_id: int


class CompletionResponse(NamedTuple):
    completion: str
    tokens: int


class SchedulerApi(Protocol):
	async def complete(self, request: CompletionRequest) -> CompletionResponse:
		...

To facilitate batch processing, the scheduler will communicate with a WorkerApi, an abstraction over the underlying inference. It’s very similar to the scheduler interface, but takes a batch of prompts and returns a batch of completions:

class WorkerApi(Protocol):
    """A worker that can complete prompts.

    Might be an `Ollama` or `vLLM` process on the same machine.
    Invocation could be over HTTP.
    """

    async def complete(self, prompts: list[str]) -> list[CompletionResponse]:
	    ...

Now we turn to the Scheduler. It it supposed to collect requests and, once it has accumulated eight of them, send them to the worker.

To make our tests green, we start of with a simple implementation that just forwards each individual request directly to the worker:

class Scheduler(SchedulerApi):
    """Batch completion jobs and send them to a worker."""

    def __init__(self, worker: WorkerApi) -> None:
        self.worker = worker

    async def complete(self, prompt: str) -> CompletionResponse:
        return self.worker.complete([prompt])[0]

Notice how we can already set up a SpyWorker that offers the WorkerApi and test the scheduler?

# scheduler_test.py
import asyncio


class SpyWorker(WorkerApi):
    """Stub worker implementation that returns the prompt as the completion."""
    def __init__(self):
	    self.counter = 0

    async def complete(self, prompts: list[str]) -> list[CompletionResponse]:
	    self.counter += 1
        return [CompletionResponse(prompt, len(prompt)) for prompt in prompts]


async def test_scheduler_completes():
	# Given a running scheduler
	worker = SpyWorker
	scheduler = Scheduler(worker)
	loop = asyncio.get_event_loop()
    task = loop.create_task(scheduler.run())

	# When doing 8 completion request
	result = await asyncio.gather([scheduler.complete("Hi") for _ in range(8)])
	
	# Then the prompt is returned, and the scheduler has only been called once
	result[0].completion == "Hi"
	assert worker.counter == 1

We receive an AssertionError from the last line: The worker has been called eight times, instead of one time. If we want the Scheduler to send batches of say eight requests to the worker, we need to adapt our code.

If we want to send batches, we need some internal state to buffer requests. For a start, we append each incoming request to an internal list. The run method continuously checks if we have accumulated eight requests and then forwards them to the worker.

class Scheduler(SchedulerApi):
	"""Batch completion jobs and send them to a worker."""

	BATCH_SIZE = 8

    def __init__(self, worker: WorkerApi) -> None:
        self.incoming: list[str] = []
        self.worker = worker

    async def complete(self, prompt: str) -> CompletionResponse:
        self.incoming.append(prompt)
	
    async def run():
	    while True:
		    if len(self.incoming) == self.BATCH_SIZE:
			    await self.worker.complete(self.incoming)
			    self.incoming = []

Surely enough, our tests are still failing.

There are two things wrong with our implementation: First, we need a way to send the results back to the caller. And second, the scheduler should only be woken up once a new request is coming in. Otherwise the while loop will burn up all of our CPU and will never yield to other tasks.

The first problem is solved by using the asyncio.Future class. It represents the result of a computation that can be awaited. We introduce an internal CompletionTask, which holds a future allowing the scheduler to respond:

class CompletionTask(NamedTuple):
    prompt: str
    future: Future[CompletionResponse]


class Scheduler(SchedulerApi):
	"""Batch completion jobs and send them to a worker."""
	BATCH_SIZE = 8
	
    def __init__(self, worker: WorkerApi) -> None:
        self.incoming: list[CompletionTask] = []
        self.worker = worker
        
    async def complete(self, prompt: str) -> CompletionResponse:
        future: Future[WorkerResponse] = Future()
        self.incoming.append(CompletionTask(prompt, future))
        return await future

	async def run():
		while True:
			if len(self.incoming) == self.BATCH_SIZE:
				prompts = [task.prompt for task in self.incoming]
				futures = [task.future for task in self.incoming]
		        results = await self.worker.complete(prompts)
		        for future, result in zip(futures, results):
		            future.set_result(result)
		        
		        self.incoming = []

That’s a lot better. We use the Future to communicate the result back to the caller.

Now to the second problem: We need an async way to tell scheduler.run that a new request is coming in. For this, we can use asyncio.Queue:

from asyncio import Future, Queue


class Scheduler(SchedulerApi):
	"""Batch completion jobs and send them to a worker."""
	BATCH_SIZE = 8
	
    def __init__(self, worker: WorkerApi) -> None:
        self.incoming: Queue[CompletionTask] = Queue()
        self.scheduled: list[CompletionTask] = []
        self.worker = worker
        
    async def complete(self, prompt: str) -> CompletionResponse:
        future: Future[WorkerResponse] = Future()
        self.incoming.append(CompletionTask(prompt, future))
        return await future

	async def run():
		while True:
			task = await self.incoming.get()
			self.scheduled.append(task)
			if len(self.scheduled) == self.BATCH_SIZE:
				prompts = [task.prompt for task in self.incoming]
				futures = [task.future for task in self.incoming]
		        results = await self.worker.complete(prompts)
		        for future, result in zip(futures, results):
		            future.set_result(result)
		        
		        self.scheduled = []

Now the scheduler only gets woken up once a new request comes in. Did you notice that a bit of an Actor pattern sneaked in? The scheduler exposes an API which uses an async queue to communicate with the run method. The run method itself is could be running as a separate task.

But we created an additional problem:

If we have a worker that hangs forever (the great thing is that we can test with a SaboteurWorker), we block the entire loop of the scheduler. We need to adapt our run method to not wait for the result of the worker:

async def run():
	while True:
		task = await self.incoming.get()
		self.scheduled.append(task)
		if len(self.scheduled) == self.BATCH_SIZE:
			asyncio.create_task(self.complete_batch(self.incoming))
			self.scheduled = []
		    
async def complete_batch(self, tasks: list[CompletionTask]) -> None:
    """Complete a batch of tasks and send the result to the caller."""
    results = await self.worker.complete([t.prompt for t in tasks])
    for task, result in zip(tasks, results):
        task.future.set_result(result)

Great. With just a few lines, we have updated the scheduler to run in the background, collect requests, send them to a worker, and give the results back.

But as always, life is more complicated. We don’t want clients to wait forever in case no one else is making a request. So we need a timeout after which a completion to the worker is always scheduled. We introduce a next_run variable, which keeps track of the time of the next run. When receiving a task and no tasks are scheduled yet, we set the timeout as the current time plus some wait interval.

There are two cases where we schedule a run: In case we have reached our batch limit and if one request has been waiting for at least max_wait_time.

async def run():
    loop = asyncio.get_running_loop()
    self.next_run = sys.maxsize

    while True:
        try:
            async with asyncio.timeout_at(self.next_run):
                task = await self.incoming.get()

			if len(self.scheduled) == 0:
                self.next_run = loop.time() + self.MAX_WAIT_TIME
            self.scheduled.append(task)
        except asyncio.TimeoutError:
            pass

        if len(self.scheduled) == self.BATCH_SIZE or loop.time() >= self.next_run:
            self.worker_tasks.append(
                asyncio.create_task(self.complete_batch(self.scheduled))
            )
            self.scheduled = []
            self.next_run = sys.maxsize

There are other possibilities on how to implement this. Instead of introducing next_run, we could for example also break every 100 ms and check how long all scheduled requests have been waiting.

5. The Repository Pattern

As a last exercise, we want to store the token usage per user in a database. Unsurprisingly, we again start thinking about the interface that the database should offer:

class Repository(Protocol):
    async def store_token_usage(self, user_id: int, tokens: int) -> None: ...

We update the scheduler to take in a repository on initialisation and to store token usage when receiving the results from the worker.

class Scheduler(SchedulerApi):
    """Batch completion jobs and send them to a worker."""

    def __init__(self, worker: WorkerApi, repository: Repository) -> None:
        self.worker = worker
        self.repository = repository
        ...

	async def complete_batch(self, tasks: list[CompletionTask]) -> None:
        results = await self.worker.complete([task.request.prompt for task in tasks])
        for task, result in zip(tasks, results):
            task.future.set_result(result)
            await self.repository.store_token_usage(task.request.user_id, result.tokens)

Later, we might optimize this code to only make one database request per worker call. But this change is already enough such we can test that the repository has been called:

# scheduler_test.py
class SpyRepository(Repository):
	def __init__():
		self.inserts = []

    async def store_token_usage(self, user_id: int, tokens: int) -> None:
        self.inserts.append((user_id, tokens))


def test_repository_is_called():
	# Given a scheduler with a SpyRepository
	repository = SpyRepository()
	scheduler = Scheduler(repository=repository, ...)
	
	loop = asyncio.get_event_loop()
	loop.run_task(scheduler.run())

    # When a completion is requested
    request = CompletionRequest(prompt="prompt", user_id=1)
    await scheduler.complete(request)

	# Then the repository has been invoked
	assert repository.inserts == [(1, len("prompt"))]
	

In production, we would probably use a postgres database and introduce a repository that owns the connection pool:

# repository.py
from sqlalchemy.ext.asyncio import create_async_engine


class PostgresRepository(Repository):
    def __init__(self) -> None:
        self.engine = create_async_engine("connection_string", pool_size=5)

    async def store_token_usage(self, user_id: int, tokens: int) -> None:
        async with self.engine.begin() as connection:
	        stmt = "INSERT INTO token_usage (user_id, tokens) VALUES ($1, $2)"
            await connection.execute(stmt, user_id, tokens)

Note that compared to the very initial version, the connection pool now is not a global variable, but stored as a member variable in the PostgresRepository. We can test the Scheduler and the Shell with stub repositories in our tests, and only introduce the postgres when wiring everything up in the Application:

# app.py
class Application:
    async def run(self) -> None:
        async with asyncio.TaskGroup() as tg:
            worker = Worker()
            repository = PostgresRepository()

            self.scheduler = Scheduler(worker, repository)
            self.shell = Shell(self.config, self.scheduler)

            tg.create_task(self.scheduler.run())
            tg.create_task(self.shell.run())

6. Errors

One big topic we have not covered is non-happy code paths. Unsurprisingly, this is also an area where Hexagonal Architecture makes our life more happy. Similar to how we defined interfaces for the Repository, we can define a RepositoryError and introduce a SaboteurRepository in our tests to understand how the Scheduler should handle these errors. We might even introduce an RecoverableRepositoryError and assert that this is handled differently. And we can test that the Shell converts domain errors into the appropriate HTTPstatus codes.

Did you notice that we have been using the same representation of CompletionRequest in the Shell and the Scheduler? As our application gets more complex, the CompletionRequest might become more concerned with parsing the HTTP request. In this case, these two representations should diverge. No decision in any of the adapters should influence your domain logic and entities.

Conclusion

You have made it through a blog post which arguably turned a bit too long. I’m not hoping for your to go out there and start preaching Hexagonal Architecture or whatever new fancy named design pattern someone else has come up with.

But I do hope you see there is a cost involved in not separating your domain logic from the connectors to external services, data sources and inputs. This cost is that you will always be afraid that swapping your web framework or database will cause side effects in other parts of your codebase. You will always be wondering how that database error will propagate to your user (until you find out by getting woken up by a phone alert in the middle of the night). And your confidence in your changes will be lower, preventing you from fast and frequent deploys.

Start small, focus on the core domain, and gradually decouple components. The payoff in maintainability and happiness will be worth the effort.

Find the full code for this blog post on GitHub.

This post was inspired by this awesome article about Hexagonal Architecture and design patterns in Rust.