Durable Lambda Functions: The Pipeline Pattern with Built-in Rollback
A durable Lambda function is a regular Lambda function designed for reliable multi-step workflows. After each step, AWS saves the progress automatically. If something fails, the function picks up from the last successful step instead of starting over. It can even pause and wait for external events — from seconds to up to a year. You get the reliability of a state machine with the simplicity of writing plain sequential Python.
In this post, I’ll walk through a real use case: restoring a database from a production snapshot and anonymizing the data afterward. This kind of workflow has several steps that need to run in order — find the latest snapshot, restore an RDS instance from it, reset the password, and run the anonymizer. Each step has to wait for the previous one to finish before moving on. And if anything goes wrong along the way, we need to roll back: clean up the restored instance and log what happened so nothing is left behind.
The Pipeline Pattern
The aws-durable-execution-sdk-python package gives you the building blocks for durable workflows — steps, checkpoints, waits, callbacks, and parallel execution. But once you have more than a couple of steps, writing them inline gets messy fast. The code becomes hard to read, hard to reorder, and hard to maintain.
A cleaner approach is to wrap the SDK in a simple Pipeline abstraction. Each step in the pipeline is a durable step with a name and an action. Steps that create resources (like restoring a database) can also define a rollback function. Steps that don’t need rollback just skip it. Here’s what the pipeline definition looks like for our staging DB refresh:
pipeline = Pipeline(name="staging-db-refresh")
pipeline.add_step("find_latest_snapshot", find_latest_snapshot)
pipeline.add_step("restore_db", restore_db, rollback=delete_restored_db)
pipeline.add_step("wait_for_db_ready", wait_for_db_ready)
pipeline.add_step("reset_db_password", reset_db_password)
pipeline.add_step("wait_for_password_ready", wait_for_password_ready)
pipeline.add_step("run_anonymizer", run_anonymizer)
pipeline.add_step("wait_for_anonymizer_complete", wait_for_anonymizer_complete)
pipeline.run(context, ctx)
Reading this, you immediately see the full workflow — seven steps, top to bottom. Adding a new step is one line. Adding rollback to an existing step is one argument. That’s it.
If a step fails, the pipeline automatically rolls back every completed step that has a rollback function, in reverse
order. So if run_anonymizer blows up, delete_restored_db fires to clean up the instance we created in step two.
Steps without a rollback (like find_latest_snapshot) are simply skipped during rollback — there’s nothing to undo.
Under the Hood
The Pipeline class itself is around 60 lines. It uses attrs for clean dataclass-style definitions and aws-lambda-powertools for structured logging.
A Step holds a name, an action function, and an optional rollback function:
@attrs.define
class Step:
name: str
action: Callable[[dict, DurableContext], Any]
rollback: Callable[[dict], None] | None = None
def execute(self, ctx: dict, context: DurableContext) -> None:
self.action(ctx, context)
def undo(self, ctx: dict) -> None:
if self.rollback:
self.rollback(ctx)
The Pipeline runs steps sequentially, tracks which ones completed, and handles rollback on failure:
@attrs.define
class Pipeline:
name: str
steps: list[Step] = attrs.Factory(list)
results: list[StepResult] = attrs.Factory(list)
_completed: list[Step] = attrs.field(factory=list, alias="_completed")
def add_step(
self,
name: str,
action: Callable[[dict, DurableContext], Any],
rollback: Callable[[dict], None] | None = None,
) -> Pipeline:
self.steps.append(Step(name=name, action=action, rollback=rollback))
return self
def run(self, context: DurableContext, ctx: dict) -> list[StepResult]:
self.results.clear()
self._completed.clear()
total = len(self.steps)
step_names = [s.name for s in self.steps]
logger.info("pipeline started", pipeline=self.name, total_steps=total, steps=step_names)
for idx, step in enumerate(self.steps, 1):
try:
logger.info("step starting", pipeline=self.name, step=step.name, progress=f"{idx}/{total}")
step.execute(ctx, context)
result = StepResult(step_name=step.name, success=True)
self._completed.append(step)
logger.info("step completed", pipeline=self.name, step=step.name, progress=f"{idx}/{total}")
except Exception as e:
result = StepResult(step_name=step.name, success=False, error=e)
self.results.append(result)
logger.exception("step failed", pipeline=self.name, step=step.name, progress=f"{idx}/{total}")
self._rollback(ctx)
raise
self.results.append(result)
logger.info("pipeline completed", pipeline=self.name, total_steps=total)
return self.results
def _rollback(self, ctx: dict) -> None:
rollback_steps = [s.name for s in reversed(self._completed) if s.rollback]
logger.info("rollback starting", pipeline=self.name, steps_to_rollback=rollback_steps)
for step in reversed(self._completed):
if step.rollback:
try:
logger.info("rollback step", pipeline=self.name, step=step.name)
step.undo(ctx)
logger.info("rollback step completed", pipeline=self.name, step=step.name)
except Exception:
logger.exception("rollback step failed", pipeline=self.name, step=step.name)
logger.info("rollback finished", pipeline=self.name)
Notice that _rollback doesn’t stop if a rollback step fails — it logs the error and keeps going. You always want to
attempt every rollback, even if one of them breaks.
Wrap-up
Before durable functions, I ran this same workflow with Step Functions. Migrating to a durable Lambda simplified things significantly. All the business logic lives in one place — one Python file, not a state machine definition plus separate Lambda handlers for each step. There’s only one function to deploy, test, and debug instead of coordinating several.
If you’re running multi-step workflows and want to keep everything in code, durable Lambda functions are worth a serious look. That said, keep in mind that they’re not available in all AWS regions yet — check the AWS region availability page before you start.
The full code from this post is available as a GitHub Gist , and if you need a starting point for your infrastructure, I built this on top of my CDK Lambda template .
Related Posts
Building a Serverless Customer Support Ticket Routing Service
In this blog post, we will build a serverless customer support ticket routing service using AWS services like Lambda, API Gateway, SNS, and SQS.
Read moreBuilding a REST API with AWS Lambda URLs, Python, and AWS CDK
Introduction AWS Lambda is a powerful serverless platform ideal for building small-scale REST services. There are three common methods to create a REST API with an AWS Lambda function: API Gateway, Application Load Balancer, and Lambda URLs (I’m not going to compare them here, but each has its pros and cons).
Read moreHow to Setup, Deploy, and Observe AWS Lambda Function in a Microservice Architecture
Introduction AWS Lambda is an excellent tool for building microservices due to its scalability, cost-efficiency, and maintainability. However, setting up, structuring, and monitoring a Lambda Function can be challenging.
Read more