@@ -187,7 +187,6 @@ public void activate_subscription() {
187187 final Topic topic = Topic .newBuilder ()
188188 .setTarget (target )
189189 .build ();
190-
191190 // Subscribe on the topic
192191 final MemoizeStreamObserver <Subscription > subscriptionObserver = new MemoizeStreamObserver <>();
193192 subscriptionService .subscribe (topic , subscriptionObserver );
@@ -267,6 +266,36 @@ public void cancel_subscription_on_topic() {
267266 verify (activateSubscription , never ()).onCompleted ();
268267 }
269268
269+ @ Test
270+ public void handle_cancellation_process_exceptions_and_call_observer_error_callback () {
271+ final BoundedContext boundedContext = setupBoundedContextForAggregateRepo ();
272+
273+ final SubscriptionService subscriptionService = SubscriptionService .newBuilder ()
274+ .addBoundedContext (boundedContext )
275+ .build ();
276+ final Target target = getProjectQueryTarget ();
277+ final Topic topic = Topic .newBuilder ()
278+ .setTarget (target )
279+ .build ();
280+ final MemoizeStreamObserver <Subscription > subscriptionObserver = new MemoizeStreamObserver <>();
281+ subscriptionService .subscribe (topic , subscriptionObserver );
282+
283+ final String failureMessage = "Execution breaking exception" ;
284+ final MemoizeStreamObserver <Response > observer = new MemoizeStreamObserver <Response >() {
285+ @ Override
286+ public void onNext (Response value ) {
287+ super .onNext (value );
288+ throw new RuntimeException (failureMessage );
289+ }
290+ };
291+ subscriptionService .cancel (subscriptionObserver .streamFlowValue , observer );
292+ assertNotNull (observer .streamFlowValue );
293+ assertFalse (observer .isCompleted );
294+ assertNotNull (observer .throwable );
295+ assertInstanceOf (RuntimeException .class , observer .throwable );
296+ assertEquals (observer .throwable .getMessage (), failureMessage );
297+ }
298+
270299 private static BoundedContext setupBoundedContextForAggregateRepo () {
271300 final Stand stand = Stand .newBuilder ()
272301 .build ();
0 commit comments