Skip to content

ccamel/erlang-event-sourcing-xp

Repository files navigation

erlang-event-sourcing-xp

πŸ§ͺ Experimenting with Event Sourcing in Erlang using pure functional principles, gen_server-based aggregates, and pluggable Event Store backends.

erlang lint build test codecov

release semantic-release license

About

I'm a big fan of Erlang/OTP and Event Sourcing, and I strongly believe that the Actor Model and Event Sourcing are a natural fit. This repository is my way of exploring how these two concepts can work together in practice.

As an experiment, this repo won't cover every facet of event sourcing in depth, but it should provide some insights and spark ideas on the potential of this approach in Erlang.

Features

  • Aggregate β€” a reusable gen_server harness that keeps domain logic pure while delegating event sourcing boilerplate.
  • Aggregate Manager β€” a router and lifecycle supervisor that spins up aggregates on demand, rehydrates them from persisted events, and passivates idle instances.
  • Event Store β€” a behaviour-driven abstraction with drop-in backends so you can pick the storage engine that fits your deployment.
  • Snapshots β€” automatic checkpointing at configurable intervals to avoid replaying entire streams.
  • Passivation β€” idle aggregates are shut down cleanly and will rehydrate from the store on the next command.

Backend roadmap

Backend Status Icon Capabilities Highlights Ideal use cases
ETS βœ… Ready ets-logo Events + snapshots In-memory tables backed by the BEAM VM, blazing-fast reads/writes, zero external dependencies. Local development, benchmarks, ephemeral environments where latency matters more than durability.
Mnesia βœ… Ready mnesia-logo Events + snapshots Distributed, transactional, and replicated storage built into Erlang/OTP. Clusters that need lightweight distribution without introducing an external database.
PostgreSQL πŸ› οΈ Planned postgresql-logo Events + snapshots Durable SQL store with strong transactional guarantees and easy horizontal scaling. Production setups that already rely on Postgres or need rock-solid consistency.
MongoDB πŸ› οΈ Planned mongodb-logo Events + snapshots Flexible document database with built-in replication and sharding. Event streams that benefit from schemaless payload storage or multi-region clusters.

Let's play

This project is a work in progress, and I welcome any feedback or contributions. If you're interested in Event Sourcing, Erlang/OTP, or both, feel free to reach out!

Start the Erlang shell and run the following commands to play with the example:

%% Interactive demo showcasing the event sourcing engine.
%%
%% The example uses a simple "bank account" aggregate: a single stream
%% of domain events representing deposits and withdrawals.
%% Each command sent to the aggregate produces an event persisted
%% through the in-memory ETS backend (used here for both events
%% and snapshots).

%% Usage:
%%     rebar3 shell < examples/demo_bank.script

StoreContext = {event_sourcing_store_ets, event_sourcing_store_ets},

io:format("~n[1] starting in-memory store (ETS)~n", []),
StartRes = event_sourcing_core_store:start(StoreContext),
io:format(" -> ~p~n", [StartRes]),

io:format("[2] starting bank account aggregate manager~n", []),
{ok, BankMgr} =
    event_sourcing_core_mgr_aggregate:start_link(
        bank_account_aggregate,
        StoreContext,
        bank_account_aggregate
    ),
io:format(" -> BankMgr pid: ~p~n", [BankMgr]),

AccountId = <<"bank-account-123">>,

io:format("[3] deposit $100~n", []),
Res1 = event_sourcing_core_mgr_aggregate:dispatch(
    BankMgr,
    {bank, deposit, AccountId, 100}
),
io:format(" -> ~p~n", [Res1]),

io:format("[4] withdraw $10~n", []),
Res2 = event_sourcing_core_mgr_aggregate:dispatch(
    BankMgr,
    {bank, withdraw, AccountId, 10}
),
io:format(" -> ~p~n", [Res2]),

io:format("[5] withdraw $1000 (should fail)~n", []),
Res3 = event_sourcing_core_mgr_aggregate:dispatch(
    BankMgr,
    {bank, withdraw, AccountId, 1000}
),
io:format(" -> ~p~n", [Res3]),

ok.

Architecture

Overview

This project is structured around the core principles of Event Sourcing:

  • All changes are represented as immutable events.
  • Aggregates handle commands and apply events to evolve their state.
  • State is rehydrated by replaying historical events. Possible optimizations include snapshots and caching.

Event store

The event store is a core component in this experiment, designed as a customizable behaviour that any module can implement to handle event storage. Its primary responsibilities include storing and retrieving events.

% Initializes the event store
-callback start() -> {ok, initialized | already_initialized} | {error, term()}.

% Shuts down the event store.
-callback stop() -> {ok} | {error, term()}.

% Appends a list of events for a given stream.
-callback append(StreamId, Events) -> ok | {error, term()}
    when StreamId :: stream_id(),
         Events :: [event()].

% Folds events from a stream using a provided function
-callback fold(StreamId, FoldFun, InitialAcc, Range) -> Acc1
    when StreamId :: stream_id(),
         FoldFun :: fun((Event :: event(), AccIn) -> AccOut),
         InitialAcc :: term(),
         Range :: event_sourcing_range:range(),
         Acc1 :: term(),
         AccIn :: term(),
         AccOut :: term().

Snapshot Support

The event store supports snapshotting to optimize aggregate rehydration. Instead of replaying all events from the beginning, aggregates can:

  1. Load the latest snapshot (if available)
  2. Replay only events that occurred after the snapshot
  3. Automatically create new snapshots at configurable intervals

Snapshot Callbacks:

% Store a snapshot of aggregate state
-callback store(Snapshot) -> ok when Snapshot :: snapshot().

% Load the latest snapshot for a stream
-callback load_latest(StreamId) -> {ok, Snapshot} | {error, not_found}.

The snapshot record contains all necessary fields (domain, stream_id, sequence, timestamp, state), making the API consistent with event persistence where events are passed as complete records.

Configuring Snapshots:

% Start aggregate with snapshot every 10 events
event_sourcing_core_aggregate:start_link(
    Module,
    Store,
    Id,
    #{snapshot_interval => 10}
).

When snapshot_interval is set to a positive integer, a snapshot is automatically saved whenever the aggregate's sequence number is a multiple of that interval.

Additional future features

  • Support event subscriptions for real-time updates.
  • Implement snapshot retention policies (e.g., keep only last N snapshots).

Current Implementation

Aggregate

The aggregate is implemented as a gen_server that encapsulates domain logic and delegates event persistence to a pluggable Event Store (e.g. ETS or Mnesia).

The core idea is to separate concerns between domain behavior and infrastructure. To achieve this, the system is structured into three main components:

  • 🧩 Domain Module β€” a pure module that implements domain-specific logic via behaviour callbacks.
  • βš™οΈ aggregate β€” the glue that bridges domain logic and infrastructure (event sourcing logic, event persistence, etc.).
  • 🚦 gen_server β€” the OTP mechanism that provides lifecycle management and message orchestration.

The aggregate provides:

  • A behaviour for domain-specific modules to implement.
  • A generic OTP gen_server that:
    • Rehydrates state from events on startup (with optional snapshot loading).
    • Processes commands to produce events.
    • Applies events to evolve internal state.
    • Automatically passivates (shuts down) after inactivity.
    • Saves snapshots at configurable intervals for optimization.

The following diagram shows how the system processes a command using the event-sourced aggregate infrastructure.

sequenceDiagram
    actor User
    participant GenAggregate as aggregate
    participant GenServer as gen_server
    participant DomainModule as AggregateModule (callback)

    User ->> GenAggregate: gen_aggregate:start_link(...)
    activate GenAggregate
    GenAggregate ->>+ GenServer: gen_server:start_link(Module, State)
    GenServer ->> GenAggregate: gen_aggregate:init/1
    deactivate GenAggregate

    User ->> GenAggregate: gen_aggregate:dispatch(Pid, Command)
    activate GenAggregate
    GenAggregate ->> GenServer: gen_server:call(Pid, Command)
    GenServer ->> GenAggregate: gen_aggregate:handle_call/3

    GenAggregate ->> DomainModule: handle_command(Command, State)
    GenAggregate ->> GenAggregate: persist_events(Store, Events)

    loop For each Event
        GenAggregate ->> DomainModule: apply_event(Event, State)
    end
    deactivate GenAggregate
Loading

Passivation

Each aggregate instance (a gen_server) is automatically passivated β€” i.e., stopped β€” after a period of inactivity.

This helps:

  • Free up memory in long-lived systems
  • Keep the number of live processes bounded
  • Rehydrate state on demand from the event store

Passivation is configured via a timeout value when the aggregate is started (defaults to 5000 ms):

event_sourcing_core_aggregate:start_link(Module, Store, Id, #{timeout => 10000}).

When no messages are received within the timeout window:

  • A passivate message is sent to the process.
  • The aggregate process exits normally (stop).
  • Its state is discarded.
  • Future commands will cause the manager to rehydrate it from persisted events.

Snapshots

Snapshots provide a performance optimization for aggregate rehydration by avoiding the need to replay all events from the beginning of a stream.

How it works:

  1. On startup, the aggregate:

    • Attempts to load the latest snapshot from the event store
    • If found, initializes state from the snapshot
    • Replays only events that occurred after the snapshot sequence
  2. During command processing, snapshots are automatically created when:

    • A snapshot_interval is configured (e.g., 10)
    • The current sequence number is a multiple of the interval
    • For example, with snapshot_interval => 10, snapshots are saved at sequences 10, 20, 30, etc.

Configuration:

% Create aggregate with snapshots every 10 events
event_sourcing_core_aggregate:start_link(
    bank_account_aggregate,
    event_sourcing_store_ets,
    <<"account-123">>,
    #{
        timeout => 5000,
        snapshot_interval => 10  % Save snapshot every 10 events
    }
).

Setting snapshot_interval => 0 (the default) disables automatic snapshotting.

Aggregate Manager

The aggregate manager is implemented as a gen_server. It serves as a router and supervisor for aggregate processes, ensuring that commands are dispatched to the correct aggregate instance based on their stream ID.

The manager is responsible for:

  • Routing commands to the appropriate aggregate process.
  • Managing the lifecycle of aggregate instances, starting new ones as needed.
  • Monitoring aggregate processes and cleaning up when they terminate.

How it works

The aggregate manager maintains a mapping of stream IDs to aggregate process PIDs. When a command is received:

  1. The Router module extracts the target aggregate type and stream ID from the command.
  2. If the aggregate type matches the manager's configured Aggregate module:
    • The manager checks its internal pids map for an existing process for the stream ID.
    • If none exists, it spawns a new event_sourcing_core_aggregate process using the provided Aggregate, Store, and stream ID, then monitors it.
    • The command is forwarded to the aggregate process via event_sourcing_core_aggregate:dispatch/2.
  3. If the aggregate type mismatches or routing fails, an error is returned.
flowchart LR
    %% Aggregate Managers
    Mgr1((Agg. Mgr<br>Order)):::manager
    Mgr2((Agg. Mgr<br>User)):::manager
    Mgr3((Agg. Mgr<br>Bank)):::manager

    %% Aggregate Instances
    Agg1((Order<br>order-123)):::aggregate
    Agg1((Order<br>order-123)):::aggregate
    Agg2((Order<br>order-456)):::aggregate
    Agg2((Order<br>order-456)):::aggregate

    Agg3((User<br>user-123)):::aggregate

    Mgr1 -->|cmd| Agg1
    Mgr1 -.-|monitoring| Agg1
    Mgr1 -->|cmd| Agg2
    Mgr1 -.-|monitoring| Agg2
    Mgr2 -->|cmd| Agg3
    Mgr2 -.-|monitoring| Agg3
Loading

Options

The manager can be configured with options such as:

  • timeout: Timeout for operations.
  • sequence_zero: Function to initialize event sequences.
  • sequence_next: Function to increment sequences.
  • now_fun: Function to provide timestamps.

Project organization

apps/
β”œβ”€β”€ event_sourcing_contract
β”‚   β”œβ”€β”€ include/event_sourcing.hrl                  % Shared types and records
β”‚   └── src                                         % Public behaviours (the contract)
β”‚       β”œβ”€β”€ event_sourcing_contract.app.src
β”‚       β”œβ”€β”€ event_sourcing_aggregate_behaviour.erl
β”‚       β”œβ”€β”€ event_sourcing_event_store_behaviour.erl
β”‚       └── event_sourcing_snapshot_store_behaviour.erl
β”œβ”€β”€ event_sourcing_core
β”‚   β”œβ”€β”€ src                                         % Core processes built on the contract
β”‚   β”‚   β”œβ”€β”€ event_sourcing_core.app.src
β”‚   β”‚   β”œβ”€β”€ event_sourcing_core_aggregate.erl
β”‚   β”‚   β”œβ”€β”€ event_sourcing_core_mgr_aggregate.erl
β”‚   β”‚   β”œβ”€β”€ event_sourcing_core_mgr_behaviour.erl
β”‚   β”‚   └── event_sourcing_core_store.erl
β”‚   └── test                                        % Aggregate + store suites
β”œβ”€β”€ event_sourcing_store_ets
β”‚   β”œβ”€β”€ src/event_sourcing_store_ets.erl            % ETS-backed store implementation
β”‚   └── test                                        % ETS-focused tests (planned)
└── event_sourcing_store_mnesia
    β”œβ”€β”€ src/event_sourcing_store_mnesia.erl         % Mnesia-backed store implementation
    └── test                                        % Mnesia-focused tests (planned)

Build

rebar3 compile

Test

rebar3 eunit

Lint

rebar3 do dialyzer, fmt --check

dialyzer runs the type analysis, while fmt --check makes sure all Erlang sources are already formatted.

About

πŸ§ͺ Experimenting with Event Sourcing in Erlang

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 3

  •  
  •  
  •  

Languages