From bf39c6a4b05c099d6aa14511526917df486bbbfe Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Sun, 5 Mar 2023 20:12:39 +0100 Subject: [PATCH 1/2] feat: add todo list app example --- Cargo.toml | 1 + examples/todo-list-app/Cargo.toml | 33 +++ examples/todo-list-app/build.rs | 18 ++ examples/todo-list-app/proto/buf.lock | 7 + examples/todo-list-app/proto/buf.yaml | 12 ++ .../proto/todolist/v1/todo_list.proto | 19 ++ .../proto/todolist/v1/todo_list_api.proto | 55 +++++ .../src/command/add_todo_list_item.rs | 202 ++++++++++++++++++ .../src/command/create_todo_list.rs | 154 +++++++++++++ examples/todo-list-app/src/command/mod.rs | 2 + examples/todo-list-app/src/domain/mod.rs | 1 + .../todo-list-app/src/domain/todo/item.rs | 66 ++++++ .../todo-list-app/src/domain/todo/list.rs | 178 +++++++++++++++ examples/todo-list-app/src/domain/todo/mod.rs | 2 + examples/todo-list-app/src/grpc.rs | 103 +++++++++ examples/todo-list-app/src/lib.rs | 14 ++ examples/todo-list-app/src/main.rs | 68 ++++++ .../todo-list-app/src/query/get_todo_list.rs | 0 .../src/query/list_todo_lists_by_owner.rs | 0 examples/todo-list-app/src/query/mod.rs | 2 + examples/todo-list-app/src/tracing.rs | 22 ++ 21 files changed, 959 insertions(+) create mode 100644 examples/todo-list-app/Cargo.toml create mode 100644 examples/todo-list-app/build.rs create mode 100644 examples/todo-list-app/proto/buf.lock create mode 100644 examples/todo-list-app/proto/buf.yaml create mode 100644 examples/todo-list-app/proto/todolist/v1/todo_list.proto create mode 100644 examples/todo-list-app/proto/todolist/v1/todo_list_api.proto create mode 100644 examples/todo-list-app/src/command/add_todo_list_item.rs create mode 100644 examples/todo-list-app/src/command/create_todo_list.rs create mode 100644 examples/todo-list-app/src/command/mod.rs create mode 100644 examples/todo-list-app/src/domain/mod.rs create mode 100644 examples/todo-list-app/src/domain/todo/item.rs create mode 100644 examples/todo-list-app/src/domain/todo/list.rs create mode 100644 examples/todo-list-app/src/domain/todo/mod.rs create mode 100644 examples/todo-list-app/src/grpc.rs create mode 100644 examples/todo-list-app/src/lib.rs create mode 100644 examples/todo-list-app/src/main.rs create mode 100644 examples/todo-list-app/src/query/get_todo_list.rs create mode 100644 examples/todo-list-app/src/query/list_todo_lists_by_owner.rs create mode 100644 examples/todo-list-app/src/query/mod.rs create mode 100644 examples/todo-list-app/src/tracing.rs diff --git a/Cargo.toml b/Cargo.toml index 8672c7e2..255b0478 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,4 +6,5 @@ members = [ # Crates as examples "examples/bank-accounting", + "examples/todo-list-app", ] diff --git a/examples/todo-list-app/Cargo.toml b/examples/todo-list-app/Cargo.toml new file mode 100644 index 00000000..f989427b --- /dev/null +++ b/examples/todo-list-app/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "todo-list-app" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.69" +async-trait = "0.1.65" +eventually = { path = "../../eventually", features = ["serde-prost", "tracing"] } +eventually-macros = { path = "../../eventually-macros" } +opentelemetry = { version = "0.18.0", features = ["rt-tokio"] } +opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] } +prost = "0.11.8" +prost-types = "0.11.8" +thiserror = "1.0.38" +tokio = { version = "1.13.0", features = ["macros", "rt-multi-thread"] } +tonic = { version = "0.8.1", features = ["gzip", "transport"] } +tonic-health = "0.8.0" +tonic-reflection = "0.6.0" +tower = "0.4.11" +tower-http = { version = "0.4.0", features = ["trace"] } +tracing = "0.1.29" +tracing-opentelemetry = "0.18.0" +tracing-subscriber = { version = "0.3.3", features = ["fmt", "std", "registry", "env-filter"] } +uuid = { version = "1.3.0", features = ["v4", "fast-rng"] } + +[dev-dependencies] +lazy_static = "1.4.0" + +[build-dependencies] +tonic-build = { version = "0.8.0", features = ["prost"] } diff --git a/examples/todo-list-app/build.rs b/examples/todo-list-app/build.rs new file mode 100644 index 00000000..ceb97162 --- /dev/null +++ b/examples/todo-list-app/build.rs @@ -0,0 +1,18 @@ +use std::{env, path::PathBuf}; + +fn main() { + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + + tonic_build::configure() + .file_descriptor_set_path(out_dir.join("todolist.v1_descriptor.bin")) + .build_server(true) + .build_client(false) + .compile( + &[ + "proto/todolist/v1/todo_list.proto", + "proto/todolist/v1/todo_list_api.proto", + ], + &["proto"], + ) + .unwrap(); +} diff --git a/examples/todo-list-app/proto/buf.lock b/examples/todo-list-app/proto/buf.lock new file mode 100644 index 00000000..19abf63f --- /dev/null +++ b/examples/todo-list-app/proto/buf.lock @@ -0,0 +1,7 @@ +# Generated by buf. DO NOT EDIT. +version: v1 +deps: + - remote: buf.build + owner: googleapis + repository: googleapis + commit: 75b4300737fb4efca0831636be94e517 diff --git a/examples/todo-list-app/proto/buf.yaml b/examples/todo-list-app/proto/buf.yaml new file mode 100644 index 00000000..02e45324 --- /dev/null +++ b/examples/todo-list-app/proto/buf.yaml @@ -0,0 +1,12 @@ +version: v1 +deps: + - buf.build/googleapis/googleapis +breaking: + use: + - FILE +lint: + use: + - DEFAULT + # - COMMENTS + - UNARY_RPC + - PACKAGE_NO_IMPORT_CYCLE diff --git a/examples/todo-list-app/proto/todolist/v1/todo_list.proto b/examples/todo-list-app/proto/todolist/v1/todo_list.proto new file mode 100644 index 00000000..5157759a --- /dev/null +++ b/examples/todo-list-app/proto/todolist/v1/todo_list.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package todolist.v1; + +import "google/protobuf/timestamp.proto"; + +message TodoItem { + string description = 1; + bool completed = 2; + google.protobuf.Timestamp due_date = 3; + google.protobuf.Timestamp creation_time = 4; +} + +message TodoList { + string title = 1; + string owner = 2; + map items = 3; + google.protobuf.Timestamp creation_time = 4; +} diff --git a/examples/todo-list-app/proto/todolist/v1/todo_list_api.proto b/examples/todo-list-app/proto/todolist/v1/todo_list_api.proto new file mode 100644 index 00000000..764510ef --- /dev/null +++ b/examples/todo-list-app/proto/todolist/v1/todo_list_api.proto @@ -0,0 +1,55 @@ +syntax = "proto3"; + +package todolist.v1; + +import "google/protobuf/timestamp.proto"; +import "todolist/v1/todo_list.proto"; + +service TodoListService { + rpc CreateTodoList(CreateTodoListRequest) returns (CreateTodoListResponse); + rpc GetTodoList(GetTodoListRequest) returns (GetTodoListResponse); + rpc AddTodoItem(AddTodoItemRequest) returns (AddTodoItemResponse); + rpc ToggleTodoItem(ToggleTodoItemRequest) returns (ToggleTodoItemResponse); + rpc DeleteTodoItem(DeleteTodoItemRequest) returns (DeleteTodoItemResponse); +} + +message CreateTodoListRequest { + string title = 1; + string owner = 2; +} + +message CreateTodoListResponse { + string todo_list_id = 1; +} + +message GetTodoListRequest { + string todo_list_id = 1; +} + +message GetTodoListResponse { + TodoList todo_list = 1; +} + +message AddTodoItemRequest { + string todo_list_id = 1; + string description = 2; + google.protobuf.Timestamp due_date = 3; +} + +message AddTodoItemResponse { + string todo_item_id = 1; +} + +message ToggleTodoItemRequest { + string todo_list_id = 1; + string todo_item_id = 2; +} + +message ToggleTodoItemResponse {} + +message DeleteTodoItemRequest { + string todo_list_id = 1; + string todo_item_id = 2; +} + +message DeleteTodoItemResponse {} diff --git a/examples/todo-list-app/src/command/add_todo_list_item.rs b/examples/todo-list-app/src/command/add_todo_list_item.rs new file mode 100644 index 00000000..6a1cbc63 --- /dev/null +++ b/examples/todo-list-app/src/command/add_todo_list_item.rs @@ -0,0 +1,202 @@ +use std::{sync::Arc, time::Instant}; + +use async_trait::async_trait; +use eventually::{aggregate, command, message::Message}; + +use crate::domain::todo; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Command { + pub todo_list_id: String, + pub todo_item_id: String, + pub title: String, + pub description: Option, + pub due_date: Option, +} + +impl Message for Command { + fn name(&self) -> &'static str { + "AddTodoListItem" + } +} + +#[derive(Debug, Clone)] +pub struct Handler { + clock: Arc Instant>, + repository: R, +} + +impl Handler +where + R: aggregate::repository::Repository, +{ + pub fn new(clock: fn() -> Instant, repository: R) -> Handler { + Self { + clock: Arc::new(clock), + repository, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed to create add new item to todo::List: {0}")] + AddItem(#[source] todo::list::Error), + #[error("failed to get the todo::List from repository: {0}")] + Get(#[source] aggregate::repository::GetError), + #[error("failed to save new todo::List to repository: {0}")] + Save(#[source] SaveErr), +} + +#[async_trait] +impl command::Handler for Handler +where + R: aggregate::repository::Repository, +{ + type Error = Error< + >::Error, + >::Error, + >; + + async fn handle(&self, envelope: command::Envelope) -> Result<(), Self::Error> { + let cmd = envelope.message; + let now = (self.clock)(); + + let mut todo_list = self + .repository + .get(&cmd.todo_list_id) + .await + .map(todo::list::ListRoot::from) + .map_err(Error::Get)?; + + todo_list + .add_item(todo::list::AddItem { + id: cmd.todo_item_id, + title: cmd.title, + description: cmd.description, + due_date: cmd.due_date, + now, + }) + .map_err(Error::AddItem)?; + + self.repository + .save(&mut todo_list) + .await + .map_err(Error::Save)?; + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use std::time::Instant; + + use eventually::{aggregate, command, event}; + use lazy_static::lazy_static; + + use crate::{command::add_todo_list_item, domain::todo}; + + lazy_static! { + static ref TEST_TODO_LIST_ID: String = String::from("new-todo-list"); + static ref TEST_TITLE: String = String::from("my-list"); + static ref TEST_OWNER: String = String::from("owner@test.com"); + static ref TEST_TODO_ITEM_ID: String = String::from("new-todo-item"); + static ref TEST_TODO_ITEM_TITLE: String = String::from("do something please"); + static ref NOW: Instant = Instant::now(); + static ref CLOCK: fn() -> Instant = || *NOW; + } + + #[tokio::test] + async fn it_works() { + command::test::Scenario + .given(vec![event::Persisted { + stream_id: TEST_TODO_LIST_ID.clone(), + version: 1, + event: todo::list::Event::WasCreated(todo::list::WasCreated { + id: TEST_TODO_LIST_ID.clone(), + title: TEST_TITLE.clone(), + owner: TEST_OWNER.clone(), + creation_time: *NOW, + }) + .into(), + }]) + .when( + add_todo_list_item::Command { + todo_list_id: TEST_TODO_LIST_ID.clone(), + todo_item_id: TEST_TODO_ITEM_ID.clone(), + title: TEST_TITLE.clone(), + description: None, + due_date: None, + } + .into(), + ) + .then(vec![event::Persisted { + stream_id: TEST_TODO_LIST_ID.clone(), + version: 2, + event: todo::list::Event::ItemWasAdded(todo::item::WasAdded { + id: TEST_TODO_ITEM_ID.clone(), + title: TEST_TITLE.clone(), + description: None, + due_date: None, + creation_time: *NOW, + }) + .into(), + }]) + .assert_on(|event_store| { + add_todo_list_item::Handler::new( + *CLOCK, + aggregate::EventSourcedRepository::from(event_store), + ) + }) + .await; + } + + #[tokio::test] + async fn it_fails_when_adding_twice_the_same_todo_item() { + command::test::Scenario + .given(vec![ + event::Persisted { + stream_id: TEST_TODO_LIST_ID.clone(), + version: 1, + event: todo::list::Event::WasCreated(todo::list::WasCreated { + id: TEST_TODO_LIST_ID.clone(), + title: TEST_TITLE.clone(), + owner: TEST_OWNER.clone(), + creation_time: *NOW, + }) + .into(), + }, + event::Persisted { + stream_id: TEST_TODO_LIST_ID.clone(), + version: 2, + event: todo::list::Event::ItemWasAdded(todo::item::WasAdded { + id: TEST_TODO_ITEM_ID.clone(), + title: TEST_TITLE.clone(), + description: None, + due_date: None, + creation_time: *NOW, + }) + .into(), + }, + ]) + .when( + add_todo_list_item::Command { + todo_list_id: TEST_TODO_LIST_ID.clone(), + todo_item_id: TEST_TODO_ITEM_ID.clone(), + title: TEST_TITLE.clone(), + description: None, + due_date: None, + } + .into(), + ) + .then_fails() + .assert_on(|event_store| { + add_todo_list_item::Handler::new( + *CLOCK, + aggregate::EventSourcedRepository::from(event_store), + ) + }) + .await; + } +} diff --git a/examples/todo-list-app/src/command/create_todo_list.rs b/examples/todo-list-app/src/command/create_todo_list.rs new file mode 100644 index 00000000..2b5714c1 --- /dev/null +++ b/examples/todo-list-app/src/command/create_todo_list.rs @@ -0,0 +1,154 @@ +use std::{sync::Arc, time::Instant}; + +use async_trait::async_trait; +use eventually::{aggregate, command, message::Message}; + +use crate::domain::todo; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Command { + pub id: String, + pub title: String, + pub owner: String, +} + +impl Message for Command { + fn name(&self) -> &'static str { + "CreateTodoList" + } +} + +#[derive(Debug, Clone)] +pub struct Handler { + clock: Arc Instant>, + repository: R, +} + +impl Handler +where + R: aggregate::repository::Saver, +{ + pub fn new(clock: fn() -> Instant, repository: R) -> Handler { + Self { + clock: Arc::new(clock), + repository, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed to create new todo::List: {0}")] + Create(#[source] todo::list::Error), + #[error("failed to save new todo::List to repository: {0}")] + Repository(#[source] R), +} + +#[async_trait] +impl command::Handler for Handler +where + R: aggregate::repository::Saver, +{ + type Error = Error; + + async fn handle(&self, envelope: command::Envelope) -> Result<(), Self::Error> { + let cmd = envelope.message; + let now = (self.clock)(); + + let mut todo_list = todo::list::ListRoot::create(todo::list::Create { + id: cmd.id, + title: cmd.title, + owner: cmd.owner, + now, + }) + .map_err(Error::Create)?; + + self.repository + .save(&mut todo_list) + .await + .map_err(Error::Repository)?; + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use std::time::Instant; + + use eventually::{aggregate, command, event}; + use lazy_static::lazy_static; + + use crate::{command::create_todo_list, domain::todo}; + + lazy_static! { + static ref TEST_ID: String = String::from("new-todo-list"); + static ref TEST_TITLE: String = String::from("my-list"); + static ref TEST_OWNER: String = String::from("owner@test.com"); + static ref NOW: Instant = Instant::now(); + static ref CLOCK: fn() -> Instant = || *NOW; + } + + #[tokio::test] + async fn it_works() { + command::test::Scenario + .when( + create_todo_list::Command { + id: TEST_ID.clone(), + title: TEST_TITLE.clone(), + owner: TEST_OWNER.clone(), + } + .into(), + ) + .then(vec![event::Persisted { + stream_id: TEST_ID.clone(), + version: 1, + event: todo::list::Event::WasCreated(todo::list::WasCreated { + id: TEST_ID.clone(), + title: TEST_TITLE.clone(), + owner: TEST_OWNER.clone(), + creation_time: *NOW, + }) + .into(), + }]) + .assert_on(|event_store| { + create_todo_list::Handler::new( + *CLOCK, + aggregate::EventSourcedRepository::from(event_store), + ) + }) + .await; + } + + #[tokio::test] + async fn when_todo_list_with_same_id_exists_create_command_fails() { + command::test::Scenario + .given(vec![event::Persisted { + stream_id: TEST_ID.clone(), + version: 1, + event: todo::list::Event::WasCreated(todo::list::WasCreated { + id: TEST_ID.clone(), + title: TEST_TITLE.clone(), + owner: TEST_OWNER.clone(), + creation_time: *NOW, + }) + .into(), + }]) + .when( + create_todo_list::Command { + id: TEST_ID.clone(), + title: TEST_TITLE.clone(), + owner: TEST_OWNER.clone(), + } + .into(), + ) + .then_fails() + .assert_on(|event_store| { + create_todo_list::Handler::new( + *CLOCK, + aggregate::EventSourcedRepository::from(event_store), + ) + }) + .await; + } +} diff --git a/examples/todo-list-app/src/command/mod.rs b/examples/todo-list-app/src/command/mod.rs new file mode 100644 index 00000000..5c1dae76 --- /dev/null +++ b/examples/todo-list-app/src/command/mod.rs @@ -0,0 +1,2 @@ +pub mod add_todo_list_item; +pub mod create_todo_list; diff --git a/examples/todo-list-app/src/domain/mod.rs b/examples/todo-list-app/src/domain/mod.rs new file mode 100644 index 00000000..ff6eb8ec --- /dev/null +++ b/examples/todo-list-app/src/domain/mod.rs @@ -0,0 +1 @@ +pub mod todo; diff --git a/examples/todo-list-app/src/domain/todo/item.rs b/examples/todo-list-app/src/domain/todo/item.rs new file mode 100644 index 00000000..fc426eb2 --- /dev/null +++ b/examples/todo-list-app/src/domain/todo/item.rs @@ -0,0 +1,66 @@ +use std::time::Instant; + +use eventually::{aggregate::Aggregate, message::Message}; + +#[derive(Debug, Clone)] +pub struct Item { + id: String, + title: String, + description: Option, + completed: bool, + due_date: Option, + creation_time: Instant, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct WasAdded { + pub id: String, + pub title: String, + pub description: Option, + pub due_date: Option, + pub creation_time: Instant, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Event { + WasAdded(WasAdded), +} + +impl Message for Event { + fn name(&self) -> &'static str { + panic!("it should not be called here, must be implemented on todo::list::Event!") + } +} + +#[derive(Debug, Eq, PartialEq, thiserror::Error)] +pub enum ApplyError {} + +impl Aggregate for Item { + type Id = String; + type Event = Event; + type Error = ApplyError; + + fn type_name() -> &'static str { + "TodoItem" + } + + fn aggregate_id(&self) -> &Self::Id { + &self.id + } + + fn apply(state: Option, event: Self::Event) -> Result { + use Event::*; + + match (state, event) { + (None, WasAdded(evt)) => Ok(Self { + id: evt.id, + title: evt.title, + description: evt.description, + completed: false, + due_date: evt.due_date, + creation_time: evt.creation_time, + }), + _ => todo!(), + } + } +} diff --git a/examples/todo-list-app/src/domain/todo/list.rs b/examples/todo-list-app/src/domain/todo/list.rs new file mode 100644 index 00000000..0fdbc438 --- /dev/null +++ b/examples/todo-list-app/src/domain/todo/list.rs @@ -0,0 +1,178 @@ +use std::time::Instant; + +use eventually::{ + aggregate::{Aggregate, Root}, + message::Message, +}; +use eventually_macros::aggregate_root; + +use crate::domain::todo; + +#[derive(Debug, Clone)] +pub struct List { + id: String, + title: String, + owner: String, + items: Vec, + creation_time: Instant, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct WasCreated { + pub id: String, + pub title: String, + pub owner: String, + pub creation_time: Instant, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum Event { + WasCreated(WasCreated), + ItemWasAdded(todo::item::WasAdded), +} + +impl Message for Event { + fn name(&self) -> &'static str { + match self { + Event::WasCreated(_) => "TodoListWasCreated", + Event::ItemWasAdded(_) => "TodoItemWasAdded", + } + } +} + +#[derive(Debug, Eq, PartialEq, thiserror::Error)] +pub enum ApplyError { + #[error("todo list not created yet, invalid event at this state")] + NotCreatedYet, + #[error("todo list already created, cannot be created again")] + AlreadyCreated, + #[error("failed to apply todo item domain event, {0}")] + Item(#[from] todo::item::ApplyError), +} + +impl Aggregate for List { + type Id = String; + type Event = Event; + type Error = ApplyError; + + fn type_name() -> &'static str { + "TodoList" + } + + fn aggregate_id(&self) -> &Self::Id { + &self.id + } + + fn apply(state: Option, event: Self::Event) -> Result { + use Event::*; + + match (state, event) { + (None, WasCreated(evt)) => Ok(Self { + id: evt.id, + title: evt.title, + owner: evt.owner, + items: Vec::new(), + creation_time: evt.creation_time, + }), + (Some(_), WasCreated(_)) => Err(ApplyError::AlreadyCreated), + (None, _) => Err(ApplyError::NotCreatedYet), + (Some(mut list), ItemWasAdded(evt)) => { + let item = todo::item::Item::apply(None, todo::item::Event::WasAdded(evt))?; + list.items.push(item); + Ok(list) + } + } + } +} + +#[aggregate_root(List)] +#[derive(Debug, Clone)] +pub struct ListRoot; + +#[derive(Debug, Eq, PartialEq, thiserror::Error)] +pub enum Error { + #[error("the provided id is empty")] + EmptyId, + #[error("the title specified is empty")] + EmptyTitle, + #[error("no owner has been specified")] + NoOwnerSpecified, + #[error("the provided item id is empty")] + EmptyItemId, + #[error("the title for the item specified is empty")] + EmptyItemTitle, + #[error("item already exists")] + ItemAlreadyExists, + #[error("failed to record domain event: {0}")] + RecordDomainEvent(#[from] ApplyError), +} + +#[derive(Debug)] +pub struct Create { + pub id: String, + pub title: String, + pub owner: String, + pub now: Instant, +} + +impl ListRoot { + pub fn create(input: Create) -> Result { + if input.id.is_empty() { + return Err(Error::EmptyId); + } + if input.title.is_empty() { + return Err(Error::EmptyTitle); + } + if input.owner.is_empty() { + return Err(Error::NoOwnerSpecified); + } + + let event = Event::WasCreated(WasCreated { + id: input.id, + title: input.title, + owner: input.owner, + creation_time: input.now, + }); + + Ok(Root::record_new(event.into()).map(Self)?) + } +} + +#[derive(Debug)] +pub struct AddItem { + pub id: String, + pub title: String, + pub description: Option, + pub due_date: Option, + pub now: Instant, +} + +impl ListRoot { + pub fn add_item(&mut self, input: AddItem) -> Result<(), Error> { + if input.id.is_empty() { + return Err(Error::EmptyItemId); + } + if input.title.is_empty() { + return Err(Error::EmptyItemTitle); + } + + let item_already_exists = self + .items + .iter() + .any(|item| item.aggregate_id() == &input.id); + + if item_already_exists { + return Err(Error::ItemAlreadyExists); + } + + let event = Event::ItemWasAdded(todo::item::WasAdded { + id: input.id, + title: input.title, + description: input.description, + due_date: input.due_date, + creation_time: input.now, + }); + + Ok(self.record_that(event.into())?) + } +} diff --git a/examples/todo-list-app/src/domain/todo/mod.rs b/examples/todo-list-app/src/domain/todo/mod.rs new file mode 100644 index 00000000..08784f9d --- /dev/null +++ b/examples/todo-list-app/src/domain/todo/mod.rs @@ -0,0 +1,2 @@ +pub mod item; +pub mod list; diff --git a/examples/todo-list-app/src/grpc.rs b/examples/todo-list-app/src/grpc.rs new file mode 100644 index 00000000..df424ed7 --- /dev/null +++ b/examples/todo-list-app/src/grpc.rs @@ -0,0 +1,103 @@ +use std::{error::Error as StdError, sync::Arc}; + +use async_trait::async_trait; +use eventually::{aggregate, command::Handler, version}; +use tracing::instrument; + +use crate::{ + command::{add_todo_list_item, create_todo_list}, + domain::todo, + proto, +}; + +#[derive(Clone)] +pub struct TodoListService +where + R: aggregate::Repository + 'static, +{ + pub id_generator: Arc String>, + pub create_todo_list: create_todo_list::Handler, + pub add_todo_list_item: add_todo_list_item::Handler, +} + +#[async_trait] +impl proto::todo_list_service_server::TodoListService for TodoListService +where + R: aggregate::Repository + 'static, + >::Error: StdError + Send + Sync + 'static, + >::Error: StdError + Send + Sync + 'static, +{ + async fn create_todo_list( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let request = request.into_inner(); + let todo_list_id = (self.id_generator)(); + + self.create_todo_list + .handle( + create_todo_list::Command { + id: todo_list_id.clone(), + title: request.title, + owner: request.owner, + } + .into(), + ) + .await + .map(|_| tonic::Response::new(proto::CreateTodoListResponse { todo_list_id })) + .map_err(|e| { + use create_todo_list::Error::*; + + let error_msg = e.to_string(); + + match e { + Create(todo::list::Error::NoOwnerSpecified | todo::list::Error::EmptyTitle) => { + tonic::Status::invalid_argument(error_msg) + } + Repository(err) => { + let conflict_error = err + .source() + .and_then(|e| e.downcast_ref::()); + + match conflict_error { + None => tonic::Status::internal(error_msg), + Some(_) => tonic::Status::already_exists(error_msg), + } + } + _ => tonic::Status::internal(error_msg), + } + }) + } + + #[instrument(skip(self))] + async fn get_todo_list( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + todo!() + } + + #[instrument(skip(self))] + async fn add_todo_item( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + todo!() + } + + #[instrument(skip(self))] + async fn toggle_todo_item( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + todo!() + } + + #[instrument(skip(self))] + async fn delete_todo_item( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + todo!() + } +} diff --git a/examples/todo-list-app/src/lib.rs b/examples/todo-list-app/src/lib.rs new file mode 100644 index 00000000..c65e28b1 --- /dev/null +++ b/examples/todo-list-app/src/lib.rs @@ -0,0 +1,14 @@ +pub mod command; +pub mod domain; +pub mod grpc; +pub mod tracing; +pub mod query; + +#[allow(unused_qualifications)] +#[allow(clippy::all)] // Cannot really check the sanity of generated code :shrugs: +pub mod proto { + tonic::include_proto!("todolist.v1"); + + pub const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("todolist.v1_descriptor"); +} diff --git a/examples/todo-list-app/src/main.rs b/examples/todo-list-app/src/main.rs new file mode 100644 index 00000000..b075a19a --- /dev/null +++ b/examples/todo-list-app/src/main.rs @@ -0,0 +1,68 @@ +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use anyhow::anyhow; +use eventually::{ + aggregate, event, + tracing::{AggregateRepositoryExt, EventStoreExt}, +}; +use tower_http::trace::TraceLayer; + +use todo_list_app::{command, grpc, proto}; +use uuid::Uuid; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + todo_list_app::tracing::initialize("todo-list-app")?; + + let event_store = event::store::InMemory::default().with_tracing(); + let event_sourced_repository = + aggregate::EventSourcedRepository::from(event_store).with_tracing(); + + tracing::info!("Service is starting up..."); + + let addr = "0.0.0.0:10437" + .parse() + .map_err(|e| anyhow!("failed to parse grpc address: {}", e))?; + + let (_, health_svc) = tonic_health::server::health_reporter(); + + let reflection_svc = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set( + tonic_health::proto::GRPC_HEALTH_V1_FILE_DESCRIPTOR_SET, + ) + .build() + .map_err(|e| anyhow!("failed to build grpc reflection service: {}", e))?; + + let bank_accounting_svc = + proto::todo_list_service_server::TodoListServiceServer::new(grpc::TodoListService { + id_generator: Arc::new(|| Uuid::new_v4().to_string()), + create_todo_list: command::create_todo_list::Handler::new( + Instant::now, + event_sourced_repository.clone(), + ), + add_todo_list_item: command::add_todo_list_item::Handler::new( + Instant::now, + event_sourced_repository, + ), + }); + + let layer = tower::ServiceBuilder::new() + .layer(TraceLayer::new_for_grpc()) + .timeout(Duration::from_secs(5)) + .into_inner(); + + tonic::transport::Server::builder() + .layer(layer) + .add_service(health_svc) + .add_service(reflection_svc) + .add_service(bank_accounting_svc) + .serve(addr) + .await + .map_err(|e| anyhow!("tonic server exited with error: {}", e))?; + + Ok(()) +} diff --git a/examples/todo-list-app/src/query/get_todo_list.rs b/examples/todo-list-app/src/query/get_todo_list.rs new file mode 100644 index 00000000..e69de29b diff --git a/examples/todo-list-app/src/query/list_todo_lists_by_owner.rs b/examples/todo-list-app/src/query/list_todo_lists_by_owner.rs new file mode 100644 index 00000000..e69de29b diff --git a/examples/todo-list-app/src/query/mod.rs b/examples/todo-list-app/src/query/mod.rs new file mode 100644 index 00000000..15dec63a --- /dev/null +++ b/examples/todo-list-app/src/query/mod.rs @@ -0,0 +1,2 @@ +pub mod get_todo_list; +pub mod list_todo_lists_by_owner; diff --git a/examples/todo-list-app/src/tracing.rs b/examples/todo-list-app/src/tracing.rs new file mode 100644 index 00000000..24b27c12 --- /dev/null +++ b/examples/todo-list-app/src/tracing.rs @@ -0,0 +1,22 @@ +use anyhow::anyhow; +use tracing_subscriber::{prelude::*, EnvFilter}; + +pub fn initialize(service_name: &str) -> anyhow::Result<()> { + let tracer = opentelemetry_jaeger::new_agent_pipeline() + .with_service_name(service_name) + .install_simple() + .map_err(|e| anyhow!("failed to initialize jaeger tracer: {}", e))?; + + let filter_layer = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("info")) + .map_err(|e| anyhow!("failed to initialize env filter: {}", e))?; + + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().compact()) + .with(tracing_opentelemetry::layer().with_tracer(tracer)) + .with(filter_layer) + .try_init() + .map_err(|e| anyhow!("failed to initialize subscribers: {}", e))?; + + Ok(()) +} From be96659182e27c418616410b3a62459280ed9449 Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Sun, 5 Mar 2023 20:12:59 +0100 Subject: [PATCH 2/2] fix(command::test): assert result is not an error --- eventually/src/command/test.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eventually/src/command/test.rs b/eventually/src/command/test.rs index 454d4402..572ab565 100644 --- a/eventually/src/command/test.rs +++ b/eventually/src/command/test.rs @@ -137,6 +137,7 @@ where where F: Fn(event::store::Tracking, Id, Evt>) -> H, H: command::Handler, + >::Error: Debug, { let event_store = event::store::InMemory::::default(); let tracking_event_store = event_store.clone().with_recorded_events_tracking(); @@ -159,6 +160,7 @@ where ScenarioThenCase::Produces(events) => { let recorded_events = tracking_event_store.recorded_events(); assert_eq!(events, recorded_events); + result.expect("should not be an error"); } ScenarioThenCase::Fails => assert!(result.is_err()), };