4545import static org .mockito .Mockito .never ;
4646import static org .mockito .Mockito .spy ;
4747import static org .mockito .Mockito .verify ;
48+ import static org .spine3 .test .Verify .assertInstanceOf ;
4849import static org .spine3 .test .Verify .assertSize ;
4950import static org .spine3 .testdata .TestBoundedContextFactory .newBoundedContext ;
5051
@@ -158,6 +159,22 @@ public void subscribe_to_topic() {
158159 assertTrue (observer .isCompleted );
159160 }
160161
162+ @ Test
163+ public void handle_subscription_process_exceptions_and_call_observer_error_callback () {
164+ final BoundedContext boundedContext = setupBoundedContextForAggregateRepo ();
165+
166+ final SubscriptionService subscriptionService = SubscriptionService .newBuilder ()
167+ .addBoundedContext (boundedContext )
168+ .build ();
169+ final MemoizeStreamObserver <Subscription > observer = new MemoizeStreamObserver <>();
170+ // Causes NPE
171+ subscriptionService .subscribe (null , observer );
172+ assertNull (observer .streamFlowValue );
173+ assertFalse (observer .isCompleted );
174+ assertNotNull (observer .throwable );
175+ assertInstanceOf (NullPointerException .class , observer .throwable );
176+ }
177+
161178 @ Test
162179 public void activate_subscription () {
163180 final BoundedContext boundedContext = setupBoundedContextForAggregateRepo ();
@@ -170,7 +187,6 @@ public void activate_subscription() {
170187 final Topic topic = Topic .newBuilder ()
171188 .setTarget (target )
172189 .build ();
173-
174190 // Subscribe on the topic
175191 final MemoizeStreamObserver <Subscription > subscriptionObserver = new MemoizeStreamObserver <>();
176192 subscriptionService .subscribe (topic , subscriptionObserver );
@@ -194,6 +210,22 @@ public void activate_subscription() {
194210 activationObserver .verifyState (false );
195211 }
196212
213+ @ Test
214+ public void handle_activation_process_exceptions_and_call_observer_error_callback () {
215+ final BoundedContext boundedContext = setupBoundedContextForAggregateRepo ();
216+
217+ final SubscriptionService subscriptionService = SubscriptionService .newBuilder ()
218+ .addBoundedContext (boundedContext )
219+ .build ();
220+ final MemoizeStreamObserver <SubscriptionUpdate > observer = new MemoizeStreamObserver <>();
221+ // Causes NPE
222+ subscriptionService .activate (null , observer );
223+ assertNull (observer .streamFlowValue );
224+ assertFalse (observer .isCompleted );
225+ assertNotNull (observer .throwable );
226+ assertInstanceOf (NullPointerException .class , observer .throwable );
227+ }
228+
197229 @ Test
198230 public void cancel_subscription_on_topic () {
199231 final BoundedContext boundedContext = setupBoundedContextForAggregateRepo ();
@@ -234,6 +266,36 @@ public void cancel_subscription_on_topic() {
234266 verify (activateSubscription , never ()).onCompleted ();
235267 }
236268
269+ @ Test
270+ public void handle_cancellation_process_exceptions_and_call_observer_error_callback () {
271+ final BoundedContext boundedContext = setupBoundedContextForAggregateRepo ();
272+
273+ final SubscriptionService subscriptionService = SubscriptionService .newBuilder ()
274+ .addBoundedContext (boundedContext )
275+ .build ();
276+ final Target target = getProjectQueryTarget ();
277+ final Topic topic = Topic .newBuilder ()
278+ .setTarget (target )
279+ .build ();
280+ final MemoizeStreamObserver <Subscription > subscriptionObserver = new MemoizeStreamObserver <>();
281+ subscriptionService .subscribe (topic , subscriptionObserver );
282+
283+ final String failureMessage = "Execution breaking exception" ;
284+ final MemoizeStreamObserver <Response > observer = new MemoizeStreamObserver <Response >() {
285+ @ Override
286+ public void onNext (Response value ) {
287+ super .onNext (value );
288+ throw new RuntimeException (failureMessage );
289+ }
290+ };
291+ subscriptionService .cancel (subscriptionObserver .streamFlowValue , observer );
292+ assertNotNull (observer .streamFlowValue );
293+ assertFalse (observer .isCompleted );
294+ assertNotNull (observer .throwable );
295+ assertInstanceOf (RuntimeException .class , observer .throwable );
296+ assertEquals (observer .throwable .getMessage (), failureMessage );
297+ }
298+
237299 private static BoundedContext setupBoundedContextForAggregateRepo () {
238300 final Stand stand = Stand .newBuilder ()
239301 .build ();
0 commit comments