Skip to content

Commit d6f8918

Browse files
author
Henry Hazan
committed
merge
2 parents 0cfd934 + 877d413 commit d6f8918

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+945
-2323
lines changed

README.md

+13-41
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,12 @@
1-
CQRS Eventsourcing Engine
2-
=========================
1+
CQRS Eventsourcing Workflow Engine
2+
==================================
33

44
[![Join the chat at https://gitter.im/cqrs-engine/Lobby](https://badges.gitter.im/cqrs-engine/Lobby.svg)](https://gitter.im/cqrs-engine/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
55

66
### IMPORTANT
77
Only the data structures are working, for testing and collecting feedbacks.
88
We believe that at the begining of December 2016, the framework will be usable.
99

10-
### IMPORTANT AGAIN
11-
We started to align our ideias with @slashdotdash, and join forces. Please, see
12-
commanded issues. An example app started here: https://github.com/work-capital/ev_sim
13-
14-
15-
16-
### Thanks
17-
Special thanks to:
18-
19-
[burmajam](https://github.com/burmajam) for sharing the very
20-
well written extreme driver to connect to Eventstore.
21-
22-
[slashdotdash](https://github.com/slashdotdash/commanded) for sharing the CQRS
23-
framework, where many parts of the code here are from his framework.
24-
25-
26-
### Motivation
2710

2811
### Pure functions data structures
2912
In the folder engine > types you will find the data structures, so you can write
@@ -41,10 +24,13 @@ your pure functions over them, under the "side effects" dimension.
4124
As aggregates listen for commands, process managers listen for events (sometimes commands also), and as aggregates emmits events, process managers dispatch commands.
4225

4326
* pure functional data structures for aggregates and process managers
27+
* use monads (monadex) to simulate different business scenarios
4428
* one abstraction to implement side-effects
4529
* multiple data-stores
4630
* plugable message queue for publishing events
4731
* one gen_server implementation for aggregates and process managers
32+
* automatic process-manager creation based on correlation-ids (as suggested by Greg Young)
33+
* easy use of FSM on process managers
4834

4935
### Develop
5036

@@ -56,9 +42,8 @@ Send events from the prompt:
5642

5743
```
5844
iex -S mix
45+
TODO: add example
5946
60-
Engine.Bus.send_command(%{%Account.Command.CreateAccount{} | :id => "jsdf"})
61-
Engine.Bus.send_command(%{%Account.Command.DepositMoney{} | :id => "jsdf", :amount => 23})
6247
```
6348

6449

@@ -71,30 +56,17 @@ docker run --name eventstore-node -it -p 2113:2113 -p 1113:1113 eventstore/event
7156
```
7257

7358
#### Resources
74-
Below you can see several resources I researched before writing this lib.
59+
Below you can see several resources I researched before writing this lib.
60+
Special thanks for Ben Smith, where many ideas were copied from
61+
[commanded](https://github.com/slashdotdash/commanded) library.
7562

63+
* [burmajam](https://github.com/burmajam) for sharing the very
64+
well written extreme driver to connect to Eventstore.
65+
* [slashdotdash](https://github.com/slashdotdash/commanded) for sharing the CQRS
66+
framework, where many parts of the code here are from his framework.
7667
* [cqrs-erlang](https://github.com/bryanhunter/cqrs-with-erlang) - A memory
7768
model using standard spawn functions CQRS in erlang.
7869
* [gen-aggregate](https://github.com/burmajam/gen_aggregate/) - Macro for the
7970
aggregate structure, using buffers.
8071

8172

82-
#### CQRS concepts
83-
84-
http://softwareengineering.stackexchange.com/questions/157522/cqrs-event-sourcing-is-it-correct-that-commands-are-generally-communicated
85-
86-
87-
If something sends a command, it entails expectation that it will be fulfilled. If you simply publish and hope that something somewhere picks it up and acts on it, there is no guarantee that this will be the case. By extrapolation, you also don't know if multiple handlers don't decide to act on a command, possibly resulting in the same change being applied more than once.
88-
89-
Events, on the other hand, are informative in nature, and it's reasonable to expect zero, two, or more components to be interested in a particular event. We don't really care in the scope of making the requested change.
90-
91-
**Example**
92-
93-
This could be compared to real life. If you have three children, walk into a room and simply shout "Clean the bathroom," you have no guarantee that someone will, and perhaphs if it won't be done twice (if you have obedient children that is ;-) You should fare better if you assign a specific child to do what you want done.
94-
95-
When that child finishes its job however, it's convenient if it shouts out "bathroom has been cleaned," so that everyone who wants to brush their teeth knows they can now do so.
96-
97-
98-
99-
100-

config/config.exs

-63
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,3 @@
1-
# This file is responsible for configuring your application
2-
# and its dependencies with the aid of the Mix.Config module.
31
use Mix.Config
42
import_config "#{Mix.env}.exs"
53

6-
#-------------------------------
7-
# ENGINE
8-
#-------------------------------
9-
10-
# Choose the event storage, can be Engine.Storage.Eventstore
11-
# or Engine.Storage.Postgres
12-
13-
config :engine,
14-
storage: Engine.Storage.Eventstore,
15-
nodes: [:'master@localhost', :'slave1@localhost'], # to use with SYN if we have many nodes
16-
snapshot_period: 50
17-
18-
#-------------------------------
19-
# EXTREME [Eventstore Driver]
20-
#-------------------------------
21-
22-
23-
config :extreme, :event_store,
24-
db_type: :node,
25-
host: "localhost",
26-
port: 1113,
27-
username: "admin",
28-
password: "changeit",
29-
reconnect_delay: 2_000,
30-
max_attempts: :infinity
31-
32-
33-
#-------------------------------
34-
# LOGGER
35-
#-------------------------------
36-
37-
# see config/dev.exs , config/test.exs, etc
38-
39-
40-
41-
# This configuration is loaded before any dependency and is restricted
42-
# to this project. If another project depends on this project, this
43-
# file won't be loaded nor affect the parent project. For this reason,
44-
# if you want to provide default values for your application for
45-
# 3rd-party users, it should be done in your "mix.exs" file.
46-
47-
# You can configure for your application as:
48-
#
49-
# config :engine, key: :value
50-
#
51-
# And access this configuration in your application as:
52-
#
53-
# Application.get_env(:engine, :key)
54-
#
55-
# Or configure a 3rd-party app:
56-
#
57-
# config :logger, level: :info
58-
#
59-
60-
# It is also possible to import configuration files, relative to this
61-
# directory. For example, you can emulate configuration per environment
62-
# by uncommenting the line below and defining dev.exs, test.exs and such.
63-
# Configuration from the imported file will override the ones defined
64-
# here (which is why it is important to import them last).
65-
#
66-
# import_config "#{Mix.env}.exs"

config/dev.exs

+20-7
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,26 @@
11
use Mix.Config
22

3+
#-------------------------------
4+
# WORKFLOW
5+
#-------------------------------
6+
7+
8+
config :workflow,
9+
adapter: Workflow.Extreme.Adapter
10+
11+
#-------------------------------
12+
# EXTREME [Eventstore Driver]
13+
#-------------------------------
14+
315

4-
# config :mix_test_watch,
5-
# clear: true,
6-
# tasks: [
7-
# "test"
8-
# # "dogma"
9-
# ]
10-
#
16+
config :extreme, :event_store,
17+
db_type: :node,
18+
host: "localhost",
19+
port: 1113,
20+
username: "admin",
21+
password: "changeit",
22+
reconnect_delay: 2_000,
23+
max_attempts: :infinity
1124

1225
#-------------------------------
1326
# LOGGER

config/test.exs

+18-18
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,27 @@
11
use Mix.Config
22

33
#-------------------------------
4-
# ENGINE
4+
# WORKFLOW
55
#-------------------------------
66

77

8-
config :engine,
9-
nodes: [:'master@localhost', :'slave1@localhost'], # to use with SYN if we have many nodes
10-
snapshot_period: 3
8+
config :workflow,
9+
adapter: Workflow.Extreme.Adapter
10+
11+
#-------------------------------
12+
# EXTREME [Eventstore Driver]
13+
#-------------------------------
14+
15+
16+
config :extreme, :event_store,
17+
db_type: :node,
18+
host: "localhost",
19+
port: 1113,
20+
username: "admin",
21+
password: "changeit",
22+
reconnect_delay: 2_000,
23+
max_attempts: :infinity
24+
1125

1226
#-------------------------------
1327
# LOGGER
@@ -44,17 +58,3 @@ config :logger, :log_error,
4458
format: "$dateT$time $node $metadata[$level] $levelpad$message\n",
4559
level: :error
4660

47-
#-------------------------------
48-
# TEST WATCH
49-
#-------------------------------
50-
51-
52-
# see https://github.com/lpil/mix-test.watch how
53-
# to exclude folders and files
54-
# config :mix_test_watch,
55-
# clear: true, # clean the consol e
56-
# tasks: [
57-
# "test"
58-
# #"dogma"
59-
# ]
60-

lib/adapter.ex

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
defmodule Workflow.Adapter do
2+
@moduledoc """
3+
Implement this functions below to add a new data storage for your events and snapshots.
4+
The persistence modules will retreive a pure list of events, ready for replay, instead of
5+
dealing with localized messages. It's an adapter responsability to filter and answer only the
6+
necessary data to be used inside Commanded
7+
"""
8+
9+
@type stream_id :: String.t
10+
@type start_version :: number
11+
@type read_event_batch_size :: number
12+
@type batch :: [struct()]
13+
@type stream :: String.t # The Stream ID
14+
@type reason :: atom
15+
@type expected_version :: number
16+
@type event_data :: [struct()]
17+
18+
19+
20+
@doc "Load a batch of events from storage"
21+
@callback read_stream_forward(stream_id, start_version, read_event_batch_size) ::
22+
{:ok, batch} | {:error, reason}
23+
24+
@doc "Load a list of events from an specific position"
25+
@callback append_to_stream(stream_id, expected_version, event_data) ::
26+
:ok | {:error, reason}
27+
28+
29+
30+
end

lib/container.ex

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
defmodule Workflow.Container do
2+
@moduledoc """
3+
Genserver to hold Aggregates or Process Managers
4+
"""
5+
use GenServer
6+
require Logger
7+
8+
alias Workflow.Container
9+
alias Workflow.Persistence
10+
11+
defstruct [
12+
module: nil,
13+
uuid: nil,
14+
data: nil,
15+
version: nil
16+
]
17+
18+
## API
19+
20+
def start_link(module, uuid) do
21+
GenServer.start_link(__MODULE__, %Container{
22+
module: module,
23+
uuid: uuid
24+
})
25+
end
26+
27+
def get_data(container), do:
28+
GenServer.call(container, {:data})
29+
30+
def get_state(container), do:
31+
GenServer.call(container, {:state})
32+
33+
def process_message(container, message), do:
34+
GenServer.call(container, {:process_message, message})
35+
36+
37+
## CALLBACKS
38+
39+
def init(%Container{} = state) do
40+
GenServer.cast(self, {:restore})
41+
{:ok, state}
42+
end
43+
44+
def handle_call({:data}, _from, %Container{data: data} = state), do:
45+
{:reply, data, state}
46+
47+
def handle_call({:state}, _from, %Container{} = state), do:
48+
{:reply, state, state}
49+
50+
@doc "Replay the events from the eventstore db"
51+
def handle_cast({:restore}, %Container{module: module} = state) do
52+
state = Persistence.rebuild_from_events(%Container{state |
53+
version: 0,
54+
data: struct(module) # empty data structure to be filled
55+
})
56+
{:noreply, state}
57+
end
58+
59+
@doc "Handle a command (for an aggregate) or an event (for the process manager)"
60+
def handle_call({:process_message, message}, _from, %Container{} = state) do
61+
{reply, state} = process(message, state)
62+
{:reply, reply, state}
63+
end
64+
65+
## INTERNALS
66+
67+
defp process(message,
68+
%Container{uuid: uuid, version: expected_version, data: data, module: module} = state) do
69+
event = module.handle(data, message) # process message for an aggregate or process manager
70+
wrapped_event = List.wrap(event)
71+
72+
new_data = Persistence.apply_events(module, data, wrapped_event)
73+
Persistence.persist_events(wrapped_event, uuid, expected_version)
74+
state = %Container{ state |
75+
data: new_data,
76+
version: expected_version + length(wrapped_event)
77+
}
78+
{:ok, state}
79+
end
80+
81+
# update the process instance's state by applying the event
82+
def mutate_state(module, data, event), do:
83+
module.apply(data, event)
84+
85+
86+
end

lib/dispatcher.ex

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule Workflow.Dispatcher do
2+
@moduledoc"""
3+
Dispatch commands and events (messages) to aggregates and process managers
4+
"""
5+
require Logger
6+
7+
alias Workflow.Repository
8+
alias Workflow.Container
9+
10+
def dispatch(message, module, uuid, timeout) do
11+
Logger.debug(fn -> "attempting to dispatch message: #{inspect message}, to: module: #{inspect module}" end)
12+
{:ok, container} = Repository.open(module, uuid)
13+
Container.execute(container, message, timeout)
14+
end
15+
16+
17+
end

0 commit comments

Comments
 (0)