Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package ycq

//AggregateRoot is the interface that all aggregates should implement
// AggregateRoot is the interface that all aggregates should implement
type AggregateRoot interface {
AggregateID() string
OriginalVersion() int
Expand All @@ -18,7 +18,7 @@ type AggregateRoot interface {
}

// AggregateBase is a type that can be embedded in an AggregateRoot
// implementation to handle common aggragate behaviour
// implementation to handle common aggregate behaviour
//
// All required methods to implement an aggregate are here, to implement the
// Aggregate root interface your aggregate will need to implement the Apply
Expand All @@ -29,7 +29,7 @@ type AggregateBase struct {
changes []EventMessage
}

// NewAggregateBase contructs a new AggregateBase.
// NewAggregateBase constructs a new AggregateBase.
func NewAggregateBase(id string) *AggregateBase {
return &AggregateBase{
id: id,
Expand Down Expand Up @@ -81,7 +81,7 @@ func (a *AggregateBase) GetChanges() []EventMessage {
return a.changes
}

//ClearChanges removes all unpersisted events from the aggregate.
// ClearChanges removes all unpersisted events from the aggregate.
func (a *AggregateBase) ClearChanges() {
a.changes = []EventMessage{}
}
10 changes: 5 additions & 5 deletions aggregatefactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ type DelegateAggregateFactory struct {
delegates map[string]func(string) AggregateRoot
}

// NewDelegateAggregateFactory contructs a new DelegateAggregateFactory
// NewDelegateAggregateFactory constructs a new DelegateAggregateFactory
func NewDelegateAggregateFactory() *DelegateAggregateFactory {
return &DelegateAggregateFactory{
delegates: make(map[string]func(string) AggregateRoot),
}
}

// RegisterDelegate is used to register a new funtion for instantiation of an
// RegisterDelegate is used to register a new function for instantiation of an
// aggregate instance.
//
// func(id string) AggregateRoot {return NewMyAggregateType(id)}
// func(id string) AggregateRoot { return &MyAggregateType{AggregateBase:NewAggregateBase(id)} }
// func(id string) AggregateRoot {return NewMyAggregateType(id)}
// func(id string) AggregateRoot { return &MyAggregateType{AggregateBase:NewAggregateBase(id)} }
func (t *DelegateAggregateFactory) RegisterDelegate(aggregate AggregateRoot, delegate func(string) AggregateRoot) error {
typeName := typeOf(aggregate)
if _, ok := t.delegates[typeName]; ok {
return fmt.Errorf("Factory delegate already registered for type: \"%s\"", typeName)
return fmt.Errorf("factory delegate already registered for type: \"%s\"", typeName)
}
t.delegates[typeName] = delegate
return nil
Expand Down
2 changes: 1 addition & 1 deletion aggregatefactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *DelegateAggregateFactorySuite) TestDuplicateAggregateFactoryRegistratio
c.Assert(err, NotNil)
c.Assert(err,
DeepEquals,
fmt.Errorf("Factory delegate already registered for type: \"%s\"",
fmt.Errorf("factory delegate already registered for type: \"%s\"",
typeOf(&SomeAggregate{})))
}

Expand Down
2 changes: 1 addition & 1 deletion event.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type EventMessage interface {
// SetHeader sets the value of the header specified by the key
SetHeader(string, interface{})

// Returns the actual event which is the payload of the event message.
// Event Returns the actual event which is the payload of the event message.
Event() interface{}

// EventType returns a string descriptor of the command name
Expand Down
4 changes: 2 additions & 2 deletions eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package ycq

// EventBus is the inteface that an event bus must implement.
// EventBus is the interface that an event bus must implement.
type EventBus interface {
PublishEvent(EventMessage)
AddHandler(EventHandler, ...interface{})
Expand Down Expand Up @@ -33,7 +33,7 @@ func (b *InternalEventBus) PublishEvent(event EventMessage) {
}
}

// AddHandler registers an event handler for all of the events specified in the
// AddHandler registers an event handler for all the events specified in the
// variadic events parameter.
func (b *InternalEventBus) AddHandler(handler EventHandler, events ...interface{}) {

Expand Down
2 changes: 1 addition & 1 deletion eventfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewDelegateEventFactory() *DelegateEventFactory {
func (t *DelegateEventFactory) RegisterDelegate(event interface{}, delegate func() interface{}) error {
typeName := typeOf(event)
if _, ok := t.eventFactories[typeName]; ok {
return fmt.Errorf("Factory delegate already registered for type: \"%s\"", typeName)
return fmt.Errorf("factory delegate already registered for type: \"%s\"", typeName)
}
t.eventFactories[typeName] = delegate
return nil
Expand Down
2 changes: 1 addition & 1 deletion eventfactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *DelegateEventFactorySuite) TestDuplicateEventFactoryRegistrationReturns
c.Assert(err, NotNil)
c.Assert(err,
DeepEquals,
fmt.Errorf("Factory delegate already registered for type: \"%s\"",
fmt.Errorf("factory delegate already registered for type: \"%s\"",
typeOf(&SomeEvent{})))
}

Expand Down
22 changes: 11 additions & 11 deletions repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (

// DomainRepository is the interface that all domain repositories should implement.
type DomainRepository interface {
//Loads an aggregate of the given type and ID
// Load Loads an aggregate of the given type and ID
Load(aggregateTypeName string, aggregateID string) (AggregateRoot, error)

//Saves the aggregate.
// Save Saves the aggregate.
Save(aggregate AggregateRoot, expectedVersion *int) error
}

Expand All @@ -34,11 +34,11 @@ type GetEventStoreCommonDomainRepo struct {
// NewCommonDomainRepository constructs a new CommonDomainRepository
func NewCommonDomainRepository(eventStore *goes.Client, eventBus EventBus) (*GetEventStoreCommonDomainRepo, error) {
if eventStore == nil {
return nil, fmt.Errorf("Nil Eventstore injected into repository.")
return nil, fmt.Errorf("nil Eventstore injected into repository")
}

if eventBus == nil {
return nil, fmt.Errorf("Nil EventBus injected into repository.")
return nil, fmt.Errorf("nil EventBus injected into repository")
}

d := &GetEventStoreCommonDomainRepo{
Expand All @@ -49,10 +49,10 @@ func NewCommonDomainRepository(eventStore *goes.Client, eventBus EventBus) (*Get
}

// SetAggregateFactory sets the aggregate factory that should be used to
// instantate aggregate instances
// instantiate aggregate instances
//
// Only one AggregateFactory can be registered at any one time.
// Any registration will overwrite the provious registration.
// Any registration will overwrite the previous registration.
func (r *GetEventStoreCommonDomainRepo) SetAggregateFactory(factory AggregateFactory) {
r.aggregateFactory = factory
}
Expand All @@ -79,20 +79,20 @@ func (r *GetEventStoreCommonDomainRepo) SetStreamNameDelegate(delegate StreamNam
func (r *GetEventStoreCommonDomainRepo) Load(aggregateType, id string) (AggregateRoot, error) {

if r.aggregateFactory == nil {
return nil, fmt.Errorf("The common domain repository has no Aggregate Factory.")
return nil, fmt.Errorf("the common domain repository has no Aggregate Factory")
}

if r.streamNameDelegate == nil {
return nil, fmt.Errorf("The common domain repository has no stream name delegate.")
return nil, fmt.Errorf("the common domain repository has no stream name delegate")
}

if r.eventFactory == nil {
return nil, fmt.Errorf("The common domain has no Event Factory.")
return nil, fmt.Errorf("the common domain has no Event Factory")
}

aggregate := r.aggregateFactory.GetAggregate(aggregateType, id)
if aggregate == nil {
return nil, fmt.Errorf("The repository has no aggregate factory registered for aggregate type: %s", aggregateType)
return nil, fmt.Errorf("the repository has no aggregate factory registered for aggregate type: %s", aggregateType)
}

streamName, err := r.streamNameDelegate.GetStreamName(aggregateType, id)
Expand Down Expand Up @@ -141,7 +141,7 @@ func (r *GetEventStoreCommonDomainRepo) Load(aggregateType, id string) (Aggregat
func (r *GetEventStoreCommonDomainRepo) Save(aggregate AggregateRoot, expectedVersion *int) error {

if r.streamNameDelegate == nil {
return fmt.Errorf("The common domain repository has no stream name delagate.")
return fmt.Errorf("the common domain repository has no stream name delagate")
}

resultEvents := aggregate.GetChanges()
Expand Down
20 changes: 10 additions & 10 deletions repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ func (s *ComDomRepoSuite) TestCreatingNewRepositoryWithNilEventStoreReturnsAnErr
repo, err := NewCommonDomainRepository(nil, eventBus)

c.Assert(repo, IsNil)
c.Assert(err, DeepEquals, fmt.Errorf("Nil Eventstore injected into repository."))
c.Assert(err, DeepEquals, fmt.Errorf("nil Eventstore injected into repository"))
}

func (s *ComDomRepoSuite) TestCreatingNewRepositoryWithNilEventBusReturnsAnError(c *C) {
store, _ := goes.NewClient(nil, "")
repo, err := NewCommonDomainRepository(store, nil)

c.Assert(repo, IsNil)
c.Assert(err, DeepEquals, fmt.Errorf("Nil EventBus injected into repository."))
c.Assert(err, DeepEquals, fmt.Errorf("nil EventBus injected into repository"))
}

func (s *ComDomRepoSuite) TestRepositoryCanLoadAggregateWithEvents(c *C) {
Expand Down Expand Up @@ -224,7 +224,7 @@ func (s *ComDomRepoSuite) TestNoAggregateFactoryReturnsErrorOnLoad(c *C) {
agg, err := s.repo.Load(typeOf(NewSomeAggregate(id)), id)

c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, "The common domain repository has no Aggregate Factory.")
c.Assert(err, ErrorMatches, "the common domain repository has no Aggregate Factory")
c.Assert(agg, IsNil)
}

Expand All @@ -238,7 +238,7 @@ func (s *ComDomRepoSuite) TestRepositoryReturnsAnErrorIfAggregateFactoryNotRegis
agg, err := s.repo.Load(aggregateTypeName, id)

c.Assert(err, DeepEquals,
fmt.Errorf("The repository has no aggregate factory registered for aggregate type: %s",
fmt.Errorf("the repository has no aggregate factory registered for aggregate type: %s",
aggregateTypeName))
c.Assert(agg, IsNil)
}
Expand All @@ -263,7 +263,7 @@ func (s *ComDomRepoSuite) TestReturnsErrorOnLoadIfStreamNameDelegateNotRegistere

c.Assert(agg, IsNil)
c.Assert(err, DeepEquals,
fmt.Errorf("There is no stream name delegate for aggregate of type \"%s\"",
fmt.Errorf("there is no stream name delegate for aggregate of type \"%s\"",
typeName))
}

Expand Down Expand Up @@ -317,7 +317,7 @@ func (s *ComDomRepoSuite) TestReturnsErrorOnSaveIfStreamNameDelegateNotRegistere
err := s.repo.Save(agg, nil)

c.Assert(err, DeepEquals,
fmt.Errorf("There is no stream name delegate for aggregate of type \"%s\"",
fmt.Errorf("there is no stream name delegate for aggregate of type \"%s\"",
typeOf(agg)))
}

Expand All @@ -328,7 +328,7 @@ func (s *ComDomRepoSuite) TestReturnsErrorOnSaveIfStreamNameDelegateIsNil(c *C)
err := s.repo.Save(agg, nil)

c.Assert(err, NotNil)
c.Assert(err, DeepEquals, fmt.Errorf("The common domain repository has no stream name delagate."))
c.Assert(err, DeepEquals, fmt.Errorf("the common domain repository has no stream name delagate"))
}

func (s *ComDomRepoSuite) TestLoadReturnErrUnauthorized(c *C) {
Expand Down Expand Up @@ -405,15 +405,15 @@ func (s *ComDomRepoSuite) TestReturnsErrorOnLoadIfStreamNameDelegateIsNil(c *C)
_, err := s.repo.Load("", NewUUID())

c.Assert(err, NotNil)
c.Assert(err, DeepEquals, fmt.Errorf("The common domain repository has no stream name delegate."))
c.Assert(err, DeepEquals, fmt.Errorf("the common domain repository has no stream name delegate"))
}

func (s *ComDomRepoSuite) TestReturnsErrorOnLoadIfEventFactoryNotRegistered(c *C) {
s.repo.eventFactory = nil

agg, err := s.repo.Load(typeOf(&SomeAggregate{}), NewUUID())

c.Assert(err, DeepEquals, fmt.Errorf("The common domain has no Event Factory."))
c.Assert(err, DeepEquals, fmt.Errorf("the common domain has no Event Factory"))
c.Assert(agg, IsNil)
}

Expand All @@ -440,7 +440,7 @@ func (s *ComDomRepoSuite) TestAggregateNotFoundError(c *C) {
c.Assert(err, FitsTypeOf, &ErrAggregateNotFound{AggregateID: id, AggregateType: typeOf(&SomeAggregate{})})
}

func (s *ComDomRepoSuite) TestSaveReturnsConncurrencyException(c *C) {
func (s *ComDomRepoSuite) TestSaveReturnsConcurrencyException(c *C) {

s.mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
c.Assert(r.Method, Equals, http.MethodPost)
Expand Down
4 changes: 2 additions & 2 deletions streamnamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (r *DelegateStreamNamer) RegisterDelegate(delegate func(string, string) str
for _, aggregate := range aggregates {
typeName := typeOf(aggregate)
if _, ok := r.delegates[typeName]; ok {
return fmt.Errorf("The stream name delegate for \"%s\" is already registered with the stream namer.",
return fmt.Errorf("the stream name delegate for \"%s\" is already registered with the stream namer",
typeName)
}
r.delegates[typeName] = delegate
Expand All @@ -46,6 +46,6 @@ func (r *DelegateStreamNamer) GetStreamName(aggregateTypeName string, id string)
if f, ok := r.delegates[aggregateTypeName]; ok {
return f(aggregateTypeName, id), nil
}
return "", fmt.Errorf("There is no stream name delegate for aggregate of type \"%s\"",
return "", fmt.Errorf("there is no stream name delegate for aggregate of type \"%s\"",
aggregateTypeName)
}
4 changes: 2 additions & 2 deletions streamnamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *DelegateStreamNamerSuite) TestGetStreamNameReturnsAnErrorIfNoDelegateRe
stream, err := s.namer.GetStreamName(typeOf(agg), agg.AggregateID())
c.Assert(err, NotNil)
c.Assert(stream, Equals, "")
c.Assert(err, DeepEquals, fmt.Errorf("There is no stream name delegate for aggregate of type \"%s\"",
c.Assert(err, DeepEquals, fmt.Errorf("there is no stream name delegate for aggregate of type \"%s\"",
typeOf(agg)))

}
Expand All @@ -91,6 +91,6 @@ func (s *DelegateStreamNamerSuite) TestRegisteringAStreamNameDelegateMoreThanOnc
&SomeAggregate{},
)
c.Assert(err, DeepEquals,
fmt.Errorf("The stream name delegate for \"%s\" is already registered with the stream namer.",
fmt.Errorf("the stream name delegate for \"%s\" is already registered with the stream namer",
typeOf(NewSomeAggregate(NewUUID()))))
}
2 changes: 1 addition & 1 deletion ycq.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// The implementation follows as much as possible the classic reference implementation
// m-r by Greg Young.
//
// The implmentation differs in a number of respects becasue the original is written
// The implementation differs in a number of respects because the original is written
// in C# and uses Generics where generics are not available in Go.
// This implementation instead uses interfaces to deal with types in a generic manner
// and used delegate functions to instantiate specific types.
Expand Down