Skip to content

Commit ddc3237

Browse files
author
AlexanderLitus
authored
Merge pull request #151 from SpineEventEngine/observer-on-error-for-integr-events-posting
Use `StreamObserver.onError()` while posting integration events
2 parents d9f9fc0 + d53910a commit ddc3237

File tree

18 files changed

+615
-328
lines changed

18 files changed

+615
-328
lines changed

client/src/main/java/org/spine3/SPI.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
*
3232
* <p>SPI is used to enable framework extension and replaceable components (implement a new storage, etc).
3333
*
34-
* <p>See Effective Java 2nd Edition, chapter 2, item 1 for more info about service provider framework pattern.
34+
* <p>See "Effective Java 2nd Edition", chapter 2, item 1 for more info about service provider framework pattern.
3535
*
3636
* @author Alexander Litus
3737
*/

client/src/main/java/org/spine3/base/Responses.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -58,39 +58,4 @@ public static boolean isOk(Response response) {
5858
final boolean result = response.getStatusCase() == OK;
5959
return result;
6060
}
61-
62-
/**
63-
* Checks if the response is `unsupported event`.
64-
*
65-
* @return {@code true} if the passed response represents `unsupported event` error,
66-
* {@code false} otherwise
67-
*/
68-
public static boolean isUnsupportedEvent(Response response) {
69-
if (isError(response)) {
70-
final Error error = response.getError();
71-
final boolean isUnsupported = error.getCode() == EventValidationError.UNSUPPORTED_EVENT.getNumber();
72-
return isUnsupported;
73-
}
74-
return false;
75-
}
76-
77-
/**
78-
* Checks if the response is `invalid message`.
79-
*
80-
* @return {@code true} if the passed response represents `invalid message` error,
81-
* {@code false} otherwise
82-
*/
83-
public static boolean isInvalidMessage(Response response) {
84-
if (isError(response)) {
85-
final ValidationError error = response.getError().getValidationError();
86-
final boolean isInvalid = !error.getConstraintViolationList().isEmpty();
87-
return isInvalid;
88-
}
89-
return false;
90-
}
91-
92-
private static boolean isError(Response response) {
93-
final boolean isError = response.getStatusCase() == ERROR;
94-
return isError;
95-
}
9661
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
//
2+
// Copyright 2016, TeamDev Ltd. All rights reserved.
3+
//
4+
// Redistribution and use in source and/or binary forms, with or without
5+
// modification, must retain the above copyright notice and the following
6+
// disclaimer.
7+
//
8+
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
9+
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
10+
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
11+
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
12+
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
13+
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
14+
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
15+
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
16+
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
17+
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
18+
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
19+
//
20+
syntax = "proto3";
21+
22+
// We do not define the package for this file to allow shorter options for user-defined types.
23+
// This would allow to write:
24+
//
25+
// option (internal) = true;
26+
//
27+
// instead of:
28+
//
29+
// option (spine.base.internal) = true;
30+
//
31+
32+
option java_generate_equals_and_hash = false;
33+
option java_multiple_files = true;
34+
option java_outer_classname = "AnnotationsProto";
35+
option java_package = "org.spine3.annotations";
36+
37+
import "google/protobuf/descriptor.proto";
38+
39+
// TODO:2016-02-18:alexander.litus: obtain globally unique field numbers for opts from Google.
40+
41+
42+
extend google.protobuf.FieldOptions {
43+
// Indicates a field which is internal to Spine, not part of the public API, and should not be used by users of the framework.
44+
//
45+
// If you plan to implement an extension of the framework, which is going to be
46+
// wired into the framework, you may use the internal parts. Please consult with the Spine
47+
// team, as the internal APIs do not have the same stability API guarantee as public ones.
48+
//
49+
// See `SPI` option if you plan to write an extension of the framework.
50+
bool internal = 58000;
51+
52+
// Indicates a file which contains elements of Service Provider Interface (SPI).
53+
// SPI is used to enable framework extension and replaceable components (implement a new storage, etc).
54+
// See "Effective Java 2nd Edition", chapter 2, item 1 for more info about service provider framework pattern.
55+
bool SPI = 58001;
56+
57+
// Indicates a field that can change at any time, and has no guarantee of API stability and backward-compatibility.
58+
//
59+
// Usage guidelines:
60+
// 1. This annotation is used only on public API. Internal interfaces should not use it.
61+
// 2. This annotation can only be added to new API. Adding it to an existing API is considered API-breaking.
62+
// 3. Removing this annotation from an API gives it stable status.
63+
bool experimental = 58002;
64+
65+
// Signifies that a public API is subject to incompatible changes, or even removal, in a future release.
66+
// An API bearing this annotation is exempt from any compatibility guarantees made by its containing library.
67+
// Note that the presence of this annotation implies nothing about the quality of the API in question,
68+
// only the fact that it is not "API-frozen."
69+
// It is generally safe for applications to depend on beta APIs, at the cost of some extra work during upgrades.
70+
bool beta = 58003;
71+
}
72+
73+
extend google.protobuf.MessageOptions {
74+
// Indicates a file which is internal to Spine, not part of the public API, and should not be used by users of the framework.
75+
// See more info in `annotations.proto`.
76+
bool internal_type = 58004;
77+
78+
// Indicates a file which contains elements of Service Provider Interface (SPI).
79+
// See more info in `annotations.proto`.
80+
bool SPI_type = 58005;
81+
82+
// Indicates a public API that can change at any time, and has no guarantee of API stability and backward-compatibility.
83+
// See more info in `annotations.proto`.
84+
bool experimental_type = 58006;
85+
86+
// Signifies that a public API is subject to incompatible changes, or even removal, in a future release.
87+
// See more info in `annotations.proto`.
88+
bool beta_type = 58007;
89+
}
90+
91+
extend google.protobuf.FileOptions {
92+
// Indicates a file which should not be used by users of the framework.
93+
// See more info in `annotations.proto`.
94+
bool internal_all = 58008;
95+
96+
// Indicates a file which contains elements of Service Provider Interface (SPI).
97+
// See more info in `annotations.proto`.
98+
bool SPI_all = 58009;
99+
100+
// Indicates a public API that can change at any time, and has no guarantee of API stability and backward-compatibility.
101+
// See more info in `annotations.proto`.
102+
bool experimental_all = 58010;
103+
104+
// Signifies that a public API is subject to incompatible changes, or even removal, in a future release.
105+
// See more info in `annotations.proto`.
106+
bool beta_all = 58011;
107+
}

client/src/main/proto/spine/base/command.proto

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import "spine/users/user_id.proto";
3434
import "spine/users/tenant_id.proto";
3535
import "spine/time/zone_offset.proto";
3636

37+
import "spine/annotations.proto";
38+
3739
// Command identifier.
3840
message CommandId {
3941
string uuid = 1;
@@ -91,9 +93,8 @@ message CommandContext {
9193
// The recurrence pattern should be a separate type somewhere under `time` package.
9294

9395
// The time when the command was scheduled.
94-
// Is set automatically.
95-
//TODO:2016-05-27:alexander.yevsyukov: Set automatically by whom? How do we use this attribute?
96-
google.protobuf.Timestamp scheduling_time = 2;
96+
// Is set automatically and used only by Spine when re-scheduling a command.
97+
google.protobuf.Timestamp scheduling_time = 2 [(internal) = true];
9798
}
9899

99100
// The schedule to execute the command.

client/src/test/java/org/spine3/base/ResponsesShould.java

Lines changed: 4 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,15 @@
2121
package org.spine3.base;
2222

2323
import org.junit.Test;
24-
import org.spine3.validate.ConstraintViolation;
25-
26-
import java.util.List;
2724

2825
import static com.google.common.base.Preconditions.checkNotNull;
29-
import static com.google.common.collect.Lists.newArrayList;
3026
import static org.junit.Assert.assertFalse;
3127
import static org.junit.Assert.assertTrue;
3228
import static org.spine3.test.Tests.hasPrivateUtilityConstructor;
3329

3430
@SuppressWarnings("InstanceMethodNamingConvention")
3531
public class ResponsesShould {
3632

37-
private static final Response RESPONSE_UNSUPPORTED_EVENT = Response.newBuilder()
38-
.setError(Error.newBuilder()
39-
.setCode(EventValidationError.UNSUPPORTED_EVENT.getNumber()))
40-
.build();
41-
42-
private static final Response RESPONSE_INVALID_MESSAGE = newInvalidMessageResponse();
43-
4433
@Test
4534
public void have_private_constructor() {
4635
assertTrue(hasPrivateUtilityConstructor(Responses.class));
@@ -58,40 +47,9 @@ public void recognize_OK_response() {
5847

5948
@Test
6049
public void return_false_if_not_OK_response() {
61-
assertFalse(Responses.isOk(RESPONSE_INVALID_MESSAGE));
62-
}
63-
64-
@Test
65-
public void recognize_UNSUPPORTED_EVENT_response() {
66-
assertTrue(Responses.isUnsupportedEvent(RESPONSE_UNSUPPORTED_EVENT));
67-
}
68-
69-
@Test
70-
public void return_false_if_not_UNSUPPORTED_EVENT_response() {
71-
assertFalse(Responses.isUnsupportedEvent(Responses.ok()));
72-
}
73-
74-
@Test
75-
public void recognize_INVALID_MESSAGE_response() {
76-
assertTrue(Responses.isInvalidMessage(RESPONSE_INVALID_MESSAGE));
77-
}
78-
79-
@Test
80-
public void return_false_if_not_INVALID_MESSAGES_response() {
81-
assertFalse(Responses.isInvalidMessage(Responses.ok()));
82-
}
83-
84-
private static Response newInvalidMessageResponse() {
85-
final List<ConstraintViolation> violations = newArrayList(ConstraintViolation.getDefaultInstance());
86-
final ValidationError validationError = ValidationError.newBuilder()
87-
.addAllConstraintViolation(violations)
88-
.build();
89-
final Error error = Error.newBuilder()
90-
.setValidationError(validationError)
91-
.build();
92-
final Response response = Response.newBuilder()
93-
.setError(error)
94-
.build();
95-
return response;
50+
final Response error = Response.newBuilder()
51+
.setError(Error.getDefaultInstance())
52+
.build();
53+
assertFalse(Responses.isOk(error));
9654
}
9755
}

server/src/main/java/org/spine3/server/BoundedContext.java

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,8 @@
2626
import io.grpc.stub.StreamObserver;
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
29-
import org.spine3.base.Command;
3029
import org.spine3.base.Event;
3130
import org.spine3.base.EventContext;
32-
import org.spine3.base.Failure;
3331
import org.spine3.base.Response;
3432
import org.spine3.server.command.CommandBus;
3533
import org.spine3.server.command.CommandDispatcher;
@@ -52,7 +50,6 @@
5250

5351
import static com.google.common.base.Preconditions.checkNotNull;
5452
import static com.google.common.base.Preconditions.checkState;
55-
import static org.spine3.base.Responses.isOk;
5653
import static org.spine3.protobuf.Messages.fromAny;
5754
import static org.spine3.protobuf.Messages.toAny;
5855
import static org.spine3.protobuf.Values.newStringValue;
@@ -195,36 +192,14 @@ private void checkStorageAssigned(Repository repository) {
195192

196193
@Override
197194
public void notify(IntegrationEvent integrationEvent, StreamObserver<Response> responseObserver) {
198-
/**
199-
* TODO:2016-05-11:alexander.litus: use {@link StreamObserver#onError}
200-
* instead of returning responses, see {@link CommandBus#post(Command, StreamObserver)}.
201-
*/
202-
try {
203-
final Message message = fromAny(integrationEvent.getMessage());
204-
final Response response = validateIntegrationEventMessage(message);
205-
responseObserver.onNext(response);
206-
responseObserver.onCompleted();
207-
if (isOk(response)) {
208-
final Event event = toEvent(integrationEvent);
209-
eventBus.post(event);
210-
}
211-
} catch (RuntimeException e) {
212-
responseObserver.onError(e);
195+
final Message eventMsg = fromAny(integrationEvent.getMessage());
196+
final boolean isValid = eventBus.validate(eventMsg, responseObserver);
197+
if (isValid) {
198+
final Event event = toEvent(integrationEvent);
199+
eventBus.post(event);
213200
}
214201
}
215202

216-
/**
217-
* Validates an incoming integration event message.
218-
*
219-
* @param eventMessage a message to validate
220-
* @return a response with {@code OK} value if the command is valid, or
221-
* with {@link org.spine3.base.Error}/{@link Failure} value otherwise
222-
*/
223-
protected Response validateIntegrationEventMessage(Message eventMessage) {
224-
final Response response = eventBus.validate(eventMessage);
225-
return response;
226-
}
227-
228203
private static Event toEvent(IntegrationEvent integrationEvent) {
229204
final IntegrationEventContext sourceContext = integrationEvent.getContext();
230205
final StringValue producerId = newStringValue(sourceContext.getBoundedContextName());
@@ -346,6 +321,7 @@ public CommandBus getCommandBus() {
346321
*
347322
* @see #setEventStoreStreamExecutor(Executor)
348323
*/
324+
@SuppressWarnings("unused")
349325
public Builder setEventStore(EventStore eventStore) {
350326
checkState(eventStoreStreamExecutor == null, "eventStoreStreamExecutor already set.");
351327
this.eventStore = checkNotNull(eventStore);

0 commit comments

Comments
 (0)