Building Fault-Tolerant Distributed Task Orchestration with Horde and Elixir
Building Fault-Tolerant Distributed Task Orchestration with Horde and Elixir
The Problem
Let’s say you’re building a video processing system. Users upload videos, you transcode them to different formats, generate thumbnails, upload to S3, and send notifications.
The naive approach is to throw everything into Oban and chain jobs together. This works until:
- A server crashes mid-process → the entire task restarts from scratch
- Transcoding fails at 90% → start over from the beginning
- You deploy new code → running tasks just stop
- You scale to multiple servers → no coordination between them
What you actually need is something smarter. Each task needs its own process that can:
- Remember where it left off if the server crashes
- Move to a different server automatically during deploys
- React to job completions without polling
- Handle complex state transitions (not just linear chains)
This is where distributed task orchestration comes in. Here’s how to build it with Horde and Elixir.
The Big Picture
The core idea: give each task its own GenServer that orchestrates everything. When a video upload starts, we spawn a TaskManager process that lives until the task completes. This process coordinates all the steps, reacts to job completions, and handles failures.
Here’s how the pieces fit together:
PostgreSQL (task state)
↑
|
TaskManager GenServer (per task)
↓
Oban Jobs (individual steps)
The tricky part is making this work across multiple servers. That’s where Horde comes in—it’s basically a distributed process registry. When you start a TaskManager on server A, Horde makes sure every server in the cluster knows about it. If server A crashes, Horde automatically restarts the TaskManager on server B.
The flow goes like this:
- User uploads video → create task in DB → spawn TaskManager GenServer
- TaskManager kicks off first Oban job (download video)
- When download finishes, Oban notifies the TaskManager via telemetry
- TaskManager kicks off next jobs (transcode, thumbnails)
- Repeat until done
The beauty is that TaskManager’s state lives in the database, not in memory. So if a server crashes, the new TaskManager just picks up where the old one left off.
Part 1: The TaskManager GenServer
Let’s start with the heart of the system. Each task gets its own GenServer that tracks progress and kicks off jobs.
The first thing I learned: don’t keep state in the GenServer. When a process crashes and restarts, you lose everything in memory. Instead, the database is the source of truth, and the GenServer is just a coordinator.
defmodule VideoProcessor.TaskManager do
use GenServer
require Logger
# State machine states
@states [:queued, :processing, :waiting, :completed, :failed]
defmodule State do
defstruct [
:task_id,
:attached?,
:current_state,
:finished_jobs,
:next_steps
]
def new(task_id) do
%__MODULE__{
task_id: task_id,
attached?: false,
current_state: :queued,
finished_jobs: %{},
next_steps: []
}
end
def attach(state), do: %{state | attached?: true}
def add_finished_job(state, job_id, job_state) do
%{state | finished_jobs: Map.put(state.finished_jobs, job_id, job_state)}
end
def update_state(state, new_state, next_steps) do
%{state | current_state: new_state, next_steps: next_steps}
end
end
# Client API
def start_link({task_id, name}) do
GenServer.start_link(__MODULE__, task_id, name: name)
end
# Server callbacks
@impl GenServer
def init(task_id) do
Logger.metadata([{:task_id, task_id}])
state = State.new(task_id)
send(self(), :try_attach)
{:ok, state}
end
@impl GenServer
def handle_info(:try_attach, state) do
# Load task from database
case load_task_from_db(state.task_id) do
{:ok, task} ->
state = State.attach(state)
Logger.info("TaskManager started for task #{state.task_id}")
# Start processing if ready
if task.state == :queued do
send(self(), :process_next_step)
end
{:noreply, state}
{:error, _reason} ->
# Retry with backoff
Process.send_after(self(), :try_attach, 500)
{:noreply, state}
end
end
@impl GenServer
def handle_info(:process_next_step, %{attached?: true} = state) do
case determine_next_jobs(state.task_id) do
{:ok, jobs_to_start} when length(jobs_to_start) > 0 ->
# Insert jobs into Oban
Enum.each(jobs_to_start, &enqueue_oban_job/1)
# Update state to waiting
state = State.update_state(state, :waiting, jobs_to_start)
{:noreply, state}
{:ok, []} ->
# No more jobs, task complete
mark_task_complete(state.task_id)
state = State.update_state(state, :completed, [])
{:noreply, state}
end
end
@impl GenServer
def handle_cast({:job_completed, job_id, job_state}, state) do
Logger.debug("Job #{job_id} completed with state: #{job_state}")
state = State.add_finished_job(state, job_id, job_state)
# Check if all waiting jobs are done
if all_jobs_finished?(state) do
send(self(), :process_next_step)
end
{:noreply, state}
end
# Helper functions
defp load_task_from_db(task_id) do
# Load from your database
{:ok, %{id: task_id, state: :queued}}
end
defp determine_next_jobs(task_id) do
# Return list of jobs based on current task state
{:ok, []}
end
defp enqueue_oban_job(job_params) do
# Insert into Oban queue
:ok
end
defp all_jobs_finished?(state) do
Enum.all?(state.next_steps, fn job_id ->
Map.has_key?(state.finished_jobs, job_id)
end)
end
defp mark_task_complete(task_id) do
# Update database
:ok
end
end
A few things worth explaining:
The :try_attach pattern: When a TaskManager starts, it immediately sends itself a message to load the task from the database. Why not do this in init/1? Because if the database is temporarily down, you don’t want init/1 to crash—that would cause a restart loop. By using a message, we can retry with backoff until the database comes back.
Why handle_cast for job completions: We use cast instead of call because we don’t need a response. The Oban job is already done—we just need to notify the TaskManager. Using cast means Oban’s telemetry handler doesn’t block waiting for a reply.
The finished_jobs map: This tracks which jobs have completed. When all jobs in next_steps are done, we know it’s safe to move to the next state. This prevents race conditions where multiple jobs finish simultaneously.
One gotcha: make sure all_jobs_finished?/1 checks against the jobs you’re actually waiting for, not all jobs ever created. I initially had a bug where old job IDs would linger in state.
Part 2: Distributing with Horde
Okay, so we have a GenServer that manages a single task. Now we need to make this work across multiple servers.
The naive approach would be to just run these GenServers on any server. But then you have problems:
- How do you find which server has a specific TaskManager?
- What happens when a server crashes?
- How do you prevent starting the same TaskManager twice?
This is exactly what Horde solves. Think of it as a distributed process registry that every server in your cluster can see.
Setting up Horde
# In your application.ex
defmodule VideoProcessor.Application do
use Application
def start(_type, _args) do
children = [
# Database, PubSub, etc...
# Horde Registry - distributed process registry
{Horde.Registry,
[
name: VideoProcessor.TaskRegistry,
keys: :unique,
members: :auto # Auto-discover cluster nodes
]},
# Horde DynamicSupervisor - manages TaskManager processes
{Horde.DynamicSupervisor,
[
name: VideoProcessor.TaskSupervisor,
strategy: :one_for_one,
distribution_strategy: Horde.UniformDistribution,
members: :auto
]}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
That members: :auto option is doing a lot of heavy lifting. It tells Horde to automatically discover all connected nodes and form a cluster. When you deploy a new server, it joins the cluster automatically. When a server crashes, Horde detects it within a few seconds and migrates processes.
One thing that bit me early on: Horde is eventually consistent. There’s a brief window (usually <1 second) where two servers might disagree about which processes are running where. This is fine for most use cases, but if you need strict consistency, you’ll want additional safeguards (like unique database constraints).
Finding and managing TaskManagers
Now we need helper functions to interact with the registry:
defmodule VideoProcessor.TaskRegistry.Operations do
@registry VideoProcessor.TaskRegistry
def get_task_manager_pid(task_id) do
case Horde.Registry.lookup(@registry, task_id) do
[{pid, _}] -> {:ok, pid}
[] -> :not_found
end
end
def gen_server_name(task_id) do
{:via, Horde.Registry, {@registry, task_id}}
end
def list_all_tasks do
Horde.Registry.select(@registry, [{{:"$1", :"$2", :"$3"}, [], [:"$1"]}])
end
def count_active_tasks do
Horde.Registry.count(@registry)
end
end
The gen_server_name/1 function is interesting—it returns a tuple that GenServer understands as “register this process with Horde instead of the local registry”. This is what makes the process discoverable across the cluster.
Starting and stopping TaskManagers
defmodule VideoProcessor.TaskSupervisor do
alias VideoProcessor.TaskRegistry.Operations
def start_task_manager(task_id) do
case Operations.get_task_manager_pid(task_id) do
:not_found ->
child_spec = %{
id: task_id,
restart: :transient,
start: {
VideoProcessor.TaskManager,
:start_link,
[{task_id, Operations.gen_server_name(task_id)}]
}
}
case Horde.DynamicSupervisor.start_child(__MODULE__, child_spec) do
{:ok, pid} ->
Logger.info("TaskManager started for #{task_id}")
{:ok, pid}
{:error, {:already_started, _pid}} ->
{:error, :already_started}
end
{:ok, _pid} ->
{:error, :already_started}
end
end
def stop_task_manager(task_id) do
case Operations.get_task_manager_pid(task_id) do
{:ok, pid} ->
Horde.DynamicSupervisor.terminate_child(__MODULE__, pid)
:not_found ->
:ok
end
end
end
A few notes on this code:
The restart: :transient option means “restart this process if it crashes, but not if it exits normally”. When a task completes successfully, the TaskManager exits with :normal, and we don’t want to restart it.
Also notice we check if the TaskManager already exists before starting it. This prevents a race condition where two servers try to start the same TaskManager simultaneously.
Part 3: Hooking into Oban Job Completions
Here’s where it gets interesting. We need TaskManagers to react when Oban jobs complete. My first version polled the database every 5 seconds to check job status. Worked, but felt gross.
Then I discovered Oban emits telemetry events for everything. We can hook into these events and notify the TaskManager directly—no polling needed.
defmodule VideoProcessor.ObanHook do
require Logger
def attach do
:telemetry.attach(
"oban-job-complete-handler",
[:oban, :job, :stop],
&__MODULE__.handle_event/4,
nil
)
end
def handle_event([:oban, :job, :stop], _measurements, metadata, _config) do
%{job: job, state: state} = metadata
if state in [:completed, :failed, :cancelled, :discarded] do
notify_task_manager(job.args["task_id"], job.id, state)
end
end
defp notify_task_manager(task_id, job_id, job_state) do
case VideoProcessor.TaskRegistry.Operations.get_task_manager_pid(task_id) do
{:ok, pid} ->
GenServer.cast(pid, {:job_completed, job_id, job_state})
:not_found ->
Logger.warning("No TaskManager found for task #{task_id}")
end
end
end
In your Application start:
def start(_type, _args) do
# ... other children
# Attach Oban hook
VideoProcessor.ObanHook.attach()
Supervisor.start_link(children, strategy: :one_for_one)
end
This hook attaches to Oban’s :stop event, which fires whenever a job finishes (success or failure). We check if the job is in a final state, then look up the TaskManager and send it a cast.
One thing to watch out for: if the TaskManager doesn’t exist, we just log a warning and move on. This can happen if the task was cancelled or if the TaskManager crashed. Don’t treat this as an error—it’s expected behavior.
Also, we use cast here instead of call because we don’t want to block Oban’s telemetry handler. If the TaskManager is slow to respond, we don’t want to slow down Oban’s job processing.
Part 4: Testing Distributed Behavior
Testing distributed systems is notoriously painful. How do you test that a TaskManager actually migrates when a node crashes? You can’t just write a unit test for that.
Enter LocalCluster—it lets you spawn real BEAM nodes in your tests. Not Docker containers or VMs, actual Erlang nodes running on your machine.
defmodule VideoProcessor.ClusterTest do
use ExUnit.Case, async: false
@moduletag :distributed
setup do
# Start 3 additional nodes (4 total with current node)
{:ok, _cluster, nodes} = LocalCluster.start_nodes("task-node", 3,
applications: [:video_processor]
)
# Wait for Horde cluster to form
wait_for_cluster_formation()
{:ok, nodes: nodes}
end
test "tasks automatically rebalance when node crashes", %{nodes: [node1, node2, node3]} do
# Start 4 tasks
{:ok, _} = start_task("task-1")
{:ok, _} = start_task("task-2")
{:ok, _} = start_task("task-3")
{:ok, _} = start_task("task-4")
# Verify all running
assert 4 == count_active_tasks()
# Find which node has task-1
task1_node = find_task_node("task-1")
# Kill that node
LocalCluster.stop_nodes([task1_node])
# Wait for Horde to detect failure and rebalance
Process.sleep(2000)
# task-1 should now be running on a different node
new_task1_node = find_task_node("task-1")
assert new_task1_node != task1_node
assert 4 == count_active_tasks()
end
defp wait_for_cluster_formation do
# Wait until Horde sees all nodes
:timer.sleep(1000)
end
defp find_task_node(task_id) do
case Horde.Registry.whereis_name({VideoProcessor.TaskRegistry, task_id}) do
pid when is_pid(pid) -> node(pid)
:undefined -> nil
end
end
defp count_active_tasks do
VideoProcessor.TaskRegistry.Operations.count_active_tasks()
end
end
The test above does something pretty cool: it starts 4 tasks across 4 nodes, kills one of the nodes, then verifies that the task on the dead node migrated to a different node.
A few gotchas I learned the hard way:
Port conflicts: Each node needs its own port for Phoenix, Oban’s HTTP endpoints, etc. I burned a few hours debugging “address already in use” errors before realizing I needed unique ports per node.
Timing: That Process.sleep(2000) is necessary. Horde needs time to detect the node failure and migrate processes. This usually takes 1-2 seconds, but in tests it can be faster or slower depending on system load.
Cleanup: Always stop your nodes in the test teardown, or you’ll have zombie nodes hanging around. This led to some really confusing test failures where old nodes from previous tests were still running.
The whereis_name check is your friend—it returns the PID of a process, and you can call node(pid) to see which server it’s on. Super useful for debugging distribution issues.
Part 5: Handling Deployments
One of the requirements was zero-downtime deploys. When we push new code, running tasks shouldn’t get interrupted.
The good news: Horde handles most of this automatically. When you start a new server, it joins the cluster and Horde gradually migrates some processes over. When you shut down an old server, Horde migrates its processes to healthy servers.
But there’s a catch: you need to give Horde time to do this. If you kill a server immediately, processes don’t have time to migrate.
defmodule VideoProcessor.TaskManager do
# ... existing code
@impl GenServer
def terminate(reason, state) do
Logger.info("TaskManager #{state.task_id} terminating: #{inspect(reason)}")
case reason do
:shutdown ->
# Graceful shutdown during deployment
# Horde will restart this on another node
:ok
:normal ->
# Task completed normally
:ok
_ ->
# Unexpected crash - log error
Logger.error("TaskManager crashed: #{inspect(reason)}")
:ok
end
end
end
The terminate/2 callback is important here. When a server gets a shutdown signal (during a deploy), it calls terminate/2 on all GenServers with reason :shutdown. This is your cue to clean up.
In our case, we just log and exit. Horde will automatically restart the TaskManager on another node. The new TaskManager loads state from the database and continues where the old one left off.
Our deploy process looks like this:
- Start new server (Node B) - Horde sees it and starts migrating processes
- Wait 30 seconds - gives Horde time to rebalance
- Send SIGTERM to old server (Node A) - triggers graceful shutdown
- Wait for old server to drain - all remaining processes migrate to Node B
- Old server exits
The 30 second wait is crucial. Without it, you’re basically just restarting all your TaskManagers at once, which creates a thundering herd problem.
We use Fly.io which handles this automatically with their rolling deployment strategy. If you’re on Kubernetes or ECS, you’ll need to configure readiness probes and grace periods.
Part 6: Monitoring & Observability
When things go wrong (and they will), you need to see what’s happening. Which tasks are running? Which servers are they on? Is the cluster healthy?
I built a simple LiveView dashboard that shows all active tasks and their locations:
defmodule VideoProcessorWeb.TaskDashboardLive do
use Phoenix.LiveView
def mount(_params, _session, socket) do
if connected?(socket) do
:timer.send_interval(1000, self(), :refresh)
end
{:ok, assign(socket, tasks: load_tasks())}
end
def handle_info(:refresh, socket) do
{:noreply, assign(socket, tasks: load_tasks())}
end
defp load_tasks do
VideoProcessor.TaskRegistry.Operations.list_all_tasks()
|> Enum.map(fn task_id ->
case Horde.Registry.whereis_name({VideoProcessor.TaskRegistry, task_id}) do
pid when is_pid(pid) ->
%{id: task_id, node: node(pid), pid: pid}
:undefined ->
%{id: task_id, node: nil, pid: nil}
end
end)
end
end
Template:
<div>
<h1>Active Tasks: <%= length(@tasks) %></h1>
<table>
<thead>
<tr>
<th>Task ID</th>
<th>Node</th>
<th>PID</th>
</tr>
</thead>
<tbody>
<tr :for={task <- @tasks}>
<td><%= task.id %></td>
<td><%= task.node %></td>
<td><%= inspect(task.pid) %></td>
</tr>
</tbody>
</table>
</div>
This dashboard has been invaluable for debugging. You can instantly see if tasks are distributed evenly, or if one server is overloaded. During deploys, you can watch tasks migrate in real-time.
The :refresh message fires every second, so the view updates live. You might want to increase this to 5 seconds to reduce load, but 1 second is nice for debugging.
Important Gotchas & Design Decisions
Here are some non-obvious things to consider:
1. Horde vs :global registry
When I first read about distributed registries, I looked at Erlang’s built-in :global registry. Why not just use that?
The difference is consistency. :global is strongly consistent—it guarantees that only one process with a given name exists across the cluster. But this comes at a cost: it’s slower and can block during network partitions.
Horde is eventually consistent. For a brief moment (usually <1 second), you might have duplicate processes. But in practice this hasn’t been an issue because:
- Our tasks are idempotent (running twice doesn’t break anything)
- State is in the database with uniqueness constraints
- The brief inconsistency window is acceptable for our use case
If you need strong consistency, use :global. But for most systems, Horde’s performance wins.
2. Restart strategies matter
child_spec = %{
id: task_id,
restart: :transient, # Don't restart if exits normally
start: {TaskManager, :start_link, [...]}
}
Setting restart: :transient instead of :permanent was a game-changer. Before this, TaskManagers would restart in a loop even after completing successfully. Not great for logs.
3. Split brain scenarios
If your cluster partitions (network fails between servers), Horde might start duplicate processes. For about 30 seconds, servers can’t see each other. Both think they’re alone, so both start TaskManagers for all tasks.
When the network heals, you briefly have duplicates. Horde kills the extras, but they might have already started duplicate Oban jobs.
The fix: add database uniqueness constraints on anything that shouldn’t be duplicated. Oban jobs should have unique keys. Tasks should have unique IDs. If something tries to create a duplicate, it fails gracefully.
Also, make your jobs idempotent. If a job runs twice, it should be safe.
4. Monitor cluster health
def cluster_healthy? do
registry_members = Horde.Cluster.members(VideoProcessor.TaskRegistry)
supervisor_members = Horde.Cluster.members(VideoProcessor.TaskSupervisor)
length(registry_members) == length(supervisor_members) and
length(registry_members) > 0
end
We expose this as a Prometheus metric and alert if it returns false. Has caught a few issues where nodes couldn’t see each other.
Performance & Scalability
GenServers are surprisingly cheap. Here’s what you can expect:
Memory: Each TaskManager uses about 30-50KB of memory. The BEAM can easily handle 10K+ processes per server.
Failover time: When a server crashes, Horde detects it and migrates processes in 1-2 seconds.
Network overhead: Horde uses a gossip protocol which is chatty on the network. This works fine for single-region deployments. For multi-region setups, consider Horde’s delta-crdt options to reduce network traffic.
Scaling:
- Horizontal scaling works well—add more servers, tasks distribute automatically
- Each server can handle thousands of TaskManagers
- CPU usage is typically negligible unless you’re doing heavy processing in the GenServer itself
Why Not Just Use X?
Before settling on this approach, I looked at several alternatives:
Oban Pro’s workflow batches: These are great for simple dependencies (“run these 5 jobs, then run this final job”). But we needed more complex logic—branching, waiting for external webhooks, conditional steps. Batches don’t really handle that.
Temporal/Conductor/Airflow: These are purpose-built for orchestration. But they’re entire systems you need to run, monitor, and maintain. We already had Elixir/Phoenix—adding another system felt like overkill. Plus I wanted to learn how to build this stuff, not just use a black box.
Plain GenServers with :pg: You could build something similar with just GenServers and Erlang’s :pg process groups. But you’d have to implement all the distribution logic yourself—detecting failures, migrating processes, preventing duplicates. Horde does this for you.
Sagas/Ecto.Multi: For simple cases, Ecto.Multi works great. But it’s synchronous and doesn’t handle long-running operations well. You also can’t easily retry individual steps.
In the end, Horde was the sweet spot: powerful enough for complex orchestration, but still just Elixir/OTP under the hood.
Wrapping Up
The key insight here is treating each task as its own process. Instead of managing a global state machine, each TaskManager only cares about its own task. This makes the code simpler to understand and debug.
Key principles:
- Start simple—get it working on a single server first
- Use the database as your source of truth, not GenServer state
- Make your jobs idempotent from day one
- Test with LocalCluster early
- Monitor cluster health and process distribution
This pattern is surprisingly general. It works for any orchestration need where you have:
- Multiple steps that need coordination
- State that must survive failures
- Long-running processes that span deployments
- Complex branching logic or conditional steps
Examples: webhook retries, report generation, data sync jobs, video processing, ETL pipelines.
Resources
These helped me figure this stuff out:
- Horde Documentation - The examples are really good
- Oban Telemetry Events - How to hook into job completions
- LocalCluster for Testing - Essential for distributed tests
- Fly.io Clustering Guide - If you’re deploying on Fly