Building a Pipeline That Cannot Lose Work
Building a Pipeline That Cannot Lose Work
At my last job I worked at a company that builds AI-powered damage assessment software for the automotive insurance industry. The core of the product is a claims processing pipeline — insurers submit damage reports, the system runs them through assessment, calculation, and report generation, then delivers results back to the insurer or body shop.
That pipeline has a fixed lifecycle: intake, validation, damage assessment, calculation, report generation, delivery. Each step depends on the previous one. External services time out. Files arrive corrupt. And in insurance, a claim that disappears silently isn’t just a bug — it’s a compliance issue.
The question was what infrastructure to build this on.
Why Elixir
We were already running a Phoenix app for the main platform. The reason Elixir was attractive for the pipeline specifically is OTP’s supervision model. When a worker crashes — and workers do crash, especially when talking to unreliable third-party APIs — its supervisor restarts it. The failure is isolated. Other workers keep running. You get fault containment as part of the runtime rather than something you have to implement yourself.
Oban for durability
Each pipeline stage is an Oban worker. Before Oban executes a job, it persists it to PostgreSQL. If the process crashes mid-execution, the job record is still in the database. When the node restarts, the job runs again.
This was the core requirement. The pipeline runs on multiple nodes, any of which can crash at any time. In-memory queues don’t survive crashes. A Redis-backed queue survives node crashes but not Redis restarts. PostgreSQL persistence was the only option that gave us the durability guarantee we needed.
One consequence: if a job can be retried, it has to be idempotent. Running the same external API call twice shouldn’t create duplicate records downstream. We added idempotency keys on all external calls and database uniqueness constraints on anything that shouldn’t be duplicated. Getting this right upfront saved a lot of pain later.
Delivering to many different integrations
Results get delivered to partner integrations — SOAP callbacks, REST webhooks, file transfers, proprietary APIs. Each partner’s integration behaves differently.
The first version had a large case statement in the delivery worker that branched on partner type. It worked but got hard to read as we added more partners. The refactor was to define a common delivery behaviour and implement it per integration type:
defmodule Pipeline.Delivery do
@callback deliver(claim :: map(), config :: map()) :: :ok | {:error, term()}
end
defmodule Pipeline.Delivery.SoapCallback do
@behaviour Pipeline.Delivery
def deliver(claim, config), do: # ...
end
defmodule Pipeline.Delivery.RestWebhook do
@behaviour Pipeline.Delivery
def deliver(claim, config), do: # ...
end
The delivery worker resolves the right module at runtime from the partner’s config. Adding a new integration type is one new module. The worker itself doesn’t change.
The orchestration refactor
Early on, each pipeline stage directly enqueued the next stage on success. Stage A enqueued Stage B, Stage B enqueued Stage C. This worked for the simple linear case.
When the pipeline needed to branch — some claims skip calculation and go straight to report generation, others need to wait for human review — the routing logic ended up scattered. Stage B had to know about Stage C and Stage D and when to pick between them. Adding a new branch meant reading through multiple files to understand the full picture.
The fix was a central dispatcher job. Individual stage workers do their work and exit. On completion, they enqueue the dispatcher. The dispatcher reads the claim’s current state and decides what runs next:
defmodule Pipeline.Dispatcher do
use Oban.Worker
def perform(%Oban.Job{args: %{"claim_id" => id}}) do
claim = Repo.get!(Claim, id)
case next_stage(claim) do
{:enqueue, worker, args} -> Oban.insert(worker.new(args))
:done -> mark_complete(claim)
end
end
defp next_stage(%{state: :validated, type: :standard}), do: {:enqueue, CalculationWorker, ...}
defp next_stage(%{state: :validated, type: :express}), do: {:enqueue, ReportWorker, ...}
defp next_stage(%{state: :calculated}), do: {:enqueue, ReportWorker, ...}
defp next_stage(%{state: :reported}), do: {:enqueue, DeliveryWorker, ...}
defp next_stage(%{state: :delivered}), do: :done
end
All routing decisions are in one place. To trace what happens to any given claim, you read one function.
Data retention workers
Claims contain personal data subject to retention rules — after a certain period, that data has to be redacted. We run scheduled Oban cron jobs that scan for claims past their retention date and redact the personal fields in place.
The constraint here is exactly-once execution. A retention job that runs twice due to a crash is fine for redaction (redacting already-redacted data is a no-op), but not fine for generating GDPR export bundles when a user has requested their data. Oban’s unique job feature handles this — jobs with the same unique key won’t be inserted twice within a configurable window.
What this setup costs you
Oban jobs live in PostgreSQL. Under high throughput the jobs table grows fast, and Oban’s polling queries add load to the database. We tune the polling interval per queue — high-priority queues poll every few seconds, lower-priority ones every minute — and run Oban’s pruner aggressively to keep the table from bloating.
Debugging is the other cost. When something goes wrong mid-pipeline, reconstructing what happened requires reading job records, checking logs from multiple workers, and correlating timestamps across services. We invested in structured logging with a consistent claim ID in every log line from every worker. Without that it would be genuinely difficult to trace a failure through the system.