Skip to content

Commit 292e342

Browse files
Support using state builder in event appliers.
1 parent 3f4d7d2 commit 292e342

File tree

14 files changed

+161
-88
lines changed

14 files changed

+161
-88
lines changed

client/src/test/java/org/spine3/client/test/TestCommandFactory.java renamed to client/src/main/java/org/spine3/client/test/TestCommandFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.google.protobuf.Message;
2424
import com.google.protobuf.Timestamp;
25+
import org.spine3.Internal;
2526
import org.spine3.base.Command;
2627
import org.spine3.base.CommandContext;
2728
import org.spine3.base.UserId;
@@ -35,6 +36,7 @@
3536
*
3637
* @author Alexaner Yevsyukov
3738
*/
39+
@Internal
3840
public class TestCommandFactory extends CommandFactory {
3941

4042
public static TestCommandFactory newInstance(String actor, ZoneOffset zoneOffset) {

examples/src/main/java/org/spine3/examples/aggregate/server/OrderAggregate.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
* @author Alexander Yevsyukov
3939
*/
4040
@SuppressWarnings("TypeMayBeWeakened" /* Use command and event classes for parameters as messages instead of SomethingOrBuilder */)
41-
public class OrderAggregate extends Aggregate<OrderId, Order> {
41+
public class OrderAggregate extends Aggregate<OrderId, Order, Order.Builder> {
4242

4343
public OrderAggregate(OrderId id) {
4444
super(id);
@@ -77,36 +77,26 @@ public OrderPaid handle(PayForOrder cmd, CommandContext ctx) {
7777

7878
@Apply
7979
private void event(OrderCreated event) {
80-
final Order newState = Order.newBuilder(getState())
80+
getBuilder()
8181
.setOrderId(event.getOrderId())
82-
.setStatus(Order.Status.NEW)
83-
.build();
84-
85-
incrementState(newState);
82+
.setStatus(Order.Status.NEW);
8683
}
8784

8885
@Apply
8986
private void event(OrderLineAdded event) {
9087
final OrderLine orderLine = event.getOrderLine();
9188
final Order currentState = getState();
92-
final Order newState = Order.newBuilder(currentState)
89+
getBuilder()
9390
.setOrderId(event.getOrderId())
9491
.addOrderLine(orderLine)
95-
.setTotal(currentState.getTotal() + orderLine.getTotal())
96-
.build();
97-
98-
incrementState(newState);
92+
.setTotal(currentState.getTotal() + orderLine.getTotal());
9993
}
10094

10195
@Apply
10296
private void event(OrderPaid event) {
103-
final Order currentState = getState();
104-
final Order newState = Order.newBuilder(currentState)
97+
getBuilder()
10598
.setBillingInfo(event.getBillingInfo())
106-
.setStatus(Order.Status.PAID)
107-
.build();
108-
109-
incrementState(newState);
99+
.setStatus(Order.Status.PAID);
110100
}
111101

112102
private static void validateCommand(AddOrderLine cmd) {

examples/src/main/java/org/spine3/examples/failure/TaskAggregate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* @author Alexander Yevsyukov
3535
*/
3636
@SuppressWarnings("unused")
37-
public class TaskAggregate extends Aggregate<TaskId, Task> {
37+
public class TaskAggregate extends Aggregate<TaskId, Task, Task.Builder> {
3838

3939
public TaskAggregate(TaskId id) {
4040
super(id);

server/src/main/java/org/spine3/server/aggregate/Aggregate.java

Lines changed: 114 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.spine3.server.reflect.MethodMap;
4444

4545
import javax.annotation.CheckReturnValue;
46+
import javax.annotation.Nullable;
4647
import java.lang.reflect.InvocationTargetException;
4748
import java.lang.reflect.Method;
4849
import java.util.List;
@@ -86,22 +87,35 @@
8687
* must have applier methods for <em>all</em> event types it produces.
8788
*
8889
* <h2>Handling snapshots</h2>
89-
* In order to optimise the restoration of aggregates, an {@link AggregateRepository} can periodically store
90+
* <p>In order to optimise the restoration of aggregates, an {@link AggregateRepository} can periodically store
9091
* snapshots of aggregate state.
9192
*
9293
* <p>The {@code Aggregate} restores its state in {@link #restore(Snapshot)} method.
9394
*
9495
* @param <I> the type for IDs of this class of aggregates
95-
* @param <M> the type of the state held by the aggregate
96+
* @param <S> the type of the state held by the aggregate
97+
* @param <B> the type of the aggregate state builder
9698
* @author Mikhail Melnik
9799
* @author Alexander Yevsyukov
98100
*/
99-
public abstract class Aggregate<I, M extends Message> extends Entity<I, M> implements CommandHandler {
101+
@SuppressWarnings("ClassWithTooManyMethods")
102+
public abstract class Aggregate<I, S extends Message, B extends Message.Builder>
103+
extends Entity<I, S>
104+
implements CommandHandler {
100105

101106
/* package */ static final Predicate<Method> IS_AGGREGATE_COMMAND_HANDLER = CommandHandlerMethod.PREDICATE;
102107

103108
/* package */ static final Predicate<Method> IS_EVENT_APPLIER = new EventApplier.FilterPredicate();
104109

110+
/**
111+
* The builder for the aggregate state.
112+
*
113+
* <p>This field is non-null only in when the aggregate changes its state during the command handling,
114+
* or during playing events.
115+
*/
116+
@Nullable
117+
private volatile B builder;
118+
105119
/**
106120
* Cached value of the ID in the form of Any instance.
107121
*/
@@ -134,16 +148,26 @@ public abstract class Aggregate<I, M extends Message> extends Entity<I, M> imple
134148
private final List<Event> uncommittedEvents = Lists.newLinkedList();
135149

136150
/**
137-
* Creates a new instance.
151+
* Creates a new aggregate instance.
138152
*
139-
* @param id the ID for the new instance
153+
* @param id the ID for the new aggregate
140154
* @throws IllegalArgumentException if the ID is not of one of the supported types
141155
*/
142156
public Aggregate(I id) {
143157
super(id);
144158
this.idAsAny = idToAny(id);
145159
}
146160

161+
/**
162+
* Check if the type of the builder passed as a generic parameter matches the type of the message.
163+
*
164+
* @throws ClassCastException if the builder type does not match the type of the state
165+
*/
166+
private void checkBuilderType() {
167+
@SuppressWarnings("unchecked")
168+
final B ignored = (B) getState().toBuilder();
169+
}
170+
147171
/**
148172
* Returns the set of the command classes handled by the passed aggregate class.
149173
*
@@ -179,8 +203,10 @@ private void init() {
179203
final Registry registry = Registry.getInstance();
180204
final Class<? extends Aggregate> thisClass = getClass();
181205

182-
// Register this aggregate root class if it wasn't.
206+
// Register this aggregate class if it wasn't before.
183207
if (!registry.contains(thisClass)) {
208+
// Check if the generic types of the class are compatible. Do it once per aggregate class.
209+
checkBuilderType();
184210
registry.register(thisClass);
185211
}
186212

@@ -194,6 +220,44 @@ private Any getIdAsAny() {
194220
return idAsAny;
195221
}
196222

223+
/**
224+
* This method starts the phase of updating the aggregate state.
225+
*
226+
* <p>The update phase is closed by the {@link #updateState()}.
227+
*/
228+
private void createBuilder() {
229+
@SuppressWarnings("unchecked") // It is safe as we checked the type on the construction.
230+
final B builder = (B) getState().toBuilder();
231+
this.builder = builder;
232+
}
233+
234+
/**
235+
* Obtains the instance of the state builder.
236+
*
237+
* <p>This method must be called only from within an event applier.
238+
*
239+
* @return the instance of the new state builder
240+
* @throws IllegalStateException if the method is called from outside an event applier
241+
*/
242+
protected B getBuilder() {
243+
if (this.builder == null) {
244+
throw new IllegalStateException(
245+
"Builder is not available. Make sure to call the method only from an event applier.");
246+
}
247+
return builder;
248+
}
249+
250+
/**
251+
* Updates the aggregate state and closes the update phase of the aggregate.
252+
*/
253+
private void updateState() {
254+
@SuppressWarnings("unchecked") // It is safe to cast as we checked compatibility in init();
255+
final S newState = (S) getBuilder().build();
256+
setState(newState, getVersion(), whenModified());
257+
258+
this.builder = null;
259+
}
260+
197261
/**
198262
* Dispatches the passed command to appropriate handler.
199263
*
@@ -286,44 +350,45 @@ private void invokeApplier(Message eventMessage) throws InvocationTargetExceptio
286350
*/
287351
public void play(Iterable<Event> events) {
288352
init();
289-
290-
for (Event event : events) {
291-
final Message message = Events.getMessage(event);
292-
try {
293-
apply(message);
294-
} catch (InvocationTargetException e) {
295-
propagate(e.getCause());
353+
createBuilder();
354+
try {
355+
for (Event event : events) {
356+
final Message message = Events.getMessage(event);
357+
final EventContext context = event.getContext();
358+
try {
359+
apply(message);
360+
setVersion(context.getVersion(), context.getTimestamp());
361+
} catch (InvocationTargetException e) {
362+
propagate(e.getCause());
363+
}
296364
}
365+
} finally {
366+
updateState();
297367
}
298368
}
299369

300370
/**
301-
* Applies events to the aggregate unless they are state-neutral.
371+
* Applies events to the aggregate.
302372
*
303373
* @param messages the event message to apply
304374
* @param commandContext the context of the command, execution of which produces the passed events
305375
* @throws InvocationTargetException if an exception occurs during event applying
306376
*/
307377
private void apply(Iterable<? extends Message> messages, CommandContext commandContext) throws InvocationTargetException {
378+
createBuilder();
379+
try {
380+
for (Message message : messages) {
381+
apply(message);
308382

309-
//TODO:2016-03-24:alexander.yevsyukov: Init the builder of the state. Add getBuilder() method.
310-
// Assume that the code of event appliers would call getBuilder() instead of getState().
311-
// Throw IllegalStateException in the getBuilder() method if it's called from other stages of
312-
// the aggregate lifecycle.
313-
314-
for (Message message : messages) {
315-
apply(message);
316-
317-
final int currentVersion = getVersion();
318-
final M state = getState();
319-
final EventContext eventContext = createEventContext(commandContext, state, whenModified(), currentVersion, message);
320-
final Event event = Events.createEvent(message, eventContext);
321-
putUncommitted(event);
383+
final int currentVersion = getVersion();
384+
final S state = getState();
385+
final EventContext eventContext = createEventContext(commandContext, state, whenModified(), currentVersion, message);
386+
final Event event = Events.createEvent(message, eventContext);
387+
putUncommitted(event);
388+
}
389+
} finally {
390+
updateState();
322391
}
323-
324-
325-
//TODO:2016-03-24:alexander.yevsyukov: set new state
326-
//TODO:2016-03-24:alexander.yevsyukov: Clean builder.
327392
}
328393

329394
/**
@@ -343,19 +408,29 @@ private void apply(Message eventMessage) throws InvocationTargetException {
343408
}
344409
invokeApplier(eventMessage);
345410

346-
//TODO:2016-03-24:alexander.yevsyukov: increment version
347-
//TODO:2016-03-24:alexander.yevsyukov: set new date/time
411+
incrementVersion(); // This will also update whenModified field.
348412
}
349413

350414
/**
351415
* Restores state from the passed snapshot.
352416
*
353417
* @param snapshot the snapshot with the state to restore
354418
*/
355-
public void restore(Snapshot snapshot) {
356-
final M stateToRestore = Messages.fromAny(snapshot.getState());
357-
358-
setState(stateToRestore, snapshot.getVersion(), snapshot.getWhenModified());
419+
/* package */ void restore(Snapshot snapshot) {
420+
final S stateToRestore = Messages.fromAny(snapshot.getState());
421+
422+
// See if we're in the state update cycle.
423+
final B builder = this.builder;
424+
425+
// If the call to restore() is made during a reply (because the snapshot event was encountered)
426+
// use the currently initialized builder.
427+
if (builder != null) {
428+
builder.clear();
429+
builder.mergeFrom(stateToRestore);
430+
setVersion(snapshot.getVersion(), snapshot.getWhenModified());
431+
} else {
432+
setState(stateToRestore, snapshot.getVersion(), snapshot.getWhenModified());
433+
}
359434
}
360435

361436
private void putUncommitted(Event record) {
@@ -400,7 +475,7 @@ public List<Event> commitEvents() {
400475
*/
401476
@CheckReturnValue
402477
protected EventContext createEventContext(CommandContext commandContext,
403-
M currentState,
478+
S currentState,
404479
Timestamp whenModified,
405480
int currentVersion,
406481
Message event) {
@@ -429,7 +504,7 @@ protected EventContext createEventContext(CommandContext commandContext,
429504
*/
430505
@SuppressWarnings({"NoopMethodInAbstractClass", "UnusedParameters"}) // Have no-op method to avoid forced overriding.
431506
protected void addEventContextAttributes(EventContext.Builder builder,
432-
CommandId commandId, Message event, M currentState, int currentVersion) {
507+
CommandId commandId, Message event, S currentState, int currentVersion) {
433508
// Do nothing.
434509
}
435510

server/src/main/java/org/spine3/server/aggregate/AggregateRepository.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
* @author Mikhail Melnik
6464
* @author Alexander Yevsyukov
6565
*/
66-
public abstract class AggregateRepository<I, A extends Aggregate<I, ?>>
66+
public abstract class AggregateRepository<I, A extends Aggregate<I, ?, ?>>
6767
extends Repository<I, A>
6868
implements CommandDispatcher {
6969

@@ -99,7 +99,7 @@ protected AutoCloseable createStorage(StorageFactory factory) {
9999
*
100100
* @return the class of the aggregates
101101
*/
102-
public Class<? extends Aggregate<I, ?>> getAggregateClass() {
102+
public Class<? extends Aggregate<I, ?, ?>> getAggregateClass() {
103103
return getEntityClass();
104104
}
105105

server/src/main/java/org/spine3/server/entity/Entity.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,21 @@ protected void validate(S state) throws IllegalStateException {
175175
protected void setState(S state, int version, Timestamp whenLastModified) {
176176
validate(state);
177177
this.state = checkNotNull(state);
178+
setVersion(version, whenLastModified);
179+
}
180+
181+
/**
182+
* Sets version information of the entity.
183+
*
184+
* @param version the version number of the entity
185+
* @param whenLastModified the time of the last modification of the entity
186+
*/
187+
protected void setVersion(int version, Timestamp whenLastModified) {
178188
this.version = version;
179189
this.whenModified = checkNotNull(whenLastModified);
180190
}
181191

192+
182193
/**
183194
* Updates the state incrementing the version number and recording time of the modification.
184195
*

server/src/main/java/org/spine3/server/storage/StorageFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public interface StorageFactory extends AutoCloseable {
4848
* @param <I> the type of aggregate IDs
4949
* @param aggregateClass the class of aggregates to store
5050
*/
51-
<I> AggregateStorage<I> createAggregateStorage(Class<? extends Aggregate<I, ?>> aggregateClass);
51+
<I> AggregateStorage<I> createAggregateStorage(Class<? extends Aggregate<I, ?, ?>> aggregateClass);
5252

5353
/**
5454
* Creates a new {@link EntityStorage} instance.

server/src/main/java/org/spine3/server/storage/memory/InMemoryStorageFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public EventStorage createEventStorage() {
5050
* NOTE: the parameter is unused.
5151
*/
5252
@Override
53-
public <I> AggregateStorage<I> createAggregateStorage(Class<? extends Aggregate<I, ?>> unused) {
53+
public <I> AggregateStorage<I> createAggregateStorage(Class<? extends Aggregate<I, ?, ?>> unused) {
5454
return new InMemoryAggregateStorage<>();
5555
}
5656

0 commit comments

Comments
 (0)