Skip to content

Conversation

@artur-ciocanu
Copy link
Contributor

Description

This is PR is an attempt to create a new version of subscribeToEvents() that aligns better with Project Reactor APIs. It is really weird to have Project Reactor APIs mixed with callbacks like SubscriptionListener.

Issue reference

We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.

Please reference the issue this PR will close: N/A

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

  • Code compiles correctly
  • Created/updated tests
  • Extended the documentation

@artur-ciocanu artur-ciocanu requested review from a team as code owners November 28, 2025 20:13
@artur-ciocanu
Copy link
Contributor Author

@dapr/approvers-java-sdk and @dapr/maintainers-java-sdk could you please take a look at this PR. It tries to add a more ergonomic API for subscribeToEvents() that aligns with all the other DaprClient methods and is more natural for our Project Reactor based APIs that rely heavily on Flux and Mono.

Please take a look and let me know your thoughts.

@javier-aliaga as usual, I would be interested to hear your thoughts.

@salaboy I am wondering if we can pause the promotion of subscribeToEvents() that you mentioned in this PR #1597. I think this PR has a simpler approach to subscribe events.

Signed-off-by: Artur Ciocanu <[email protected]>
@codecov
Copy link

codecov bot commented Dec 1, 2025

Codecov Report

❌ Patch coverage is 90.14085% with 7 lines in your changes missing coverage. Please review.
✅ Project coverage is 78.63%. Comparing base (d759c53) to head (23c822d).
⚠️ Report is 239 commits behind head on master.

Files with missing lines Patch % Lines
...al/subscription/EventSubscriberStreamObserver.java 88.88% 2 Missing and 4 partials ⚠️
...k/src/main/java/io/dapr/client/DaprClientImpl.java 94.11% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1598      +/-   ##
============================================
+ Coverage     76.91%   78.63%   +1.71%     
- Complexity     1592     1950     +358     
============================================
  Files           145      217      +72     
  Lines          4843     5943    +1100     
  Branches        562      664     +102     
============================================
+ Hits           3725     4673     +948     
- Misses          821      929     +108     
- Partials        297      341      +44     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

* @return An active subscription.
* @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach.
*/
@Deprecated
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artur-ciocanu do we need to deprecate the old one? I haven't had time to try this out, but it feels like without flux, this will not be efficient, but does it work as expected?

@salaboy
Copy link
Collaborator

salaboy commented Dec 1, 2025

@artur-ciocanu yeah.. totally, we need the Flux approach to get this consistently. The promotion was needed because these APIs are already stable in the Dapr Sidecar, so I would vote for having the Flux approach in the main Dapr Client

@salaboy salaboy self-requested a review December 1, 2025 07:45
Copy link
Collaborator

@salaboy salaboy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -0,0 +1,446 @@
/*
* Copyright 2024 The Dapr Authors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2025

@@ -0,0 +1,196 @@
/*
* Copyright 2024 The Dapr Authors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2025

try {
requestStream.onCompleted();
} catch (Exception e) {
// Ignore cleanup errors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log at least in debug mode?

var ack = buildSuccessAck(id);

requestStream.onNext(ack);
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this catch is too agresive, probably we should treat deserialize errors in a different way as sink errors?

}
} catch (Exception ex) {
// If we can't send ack, propagate the error
sink.error(DaprException.propagate(ex));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this fails the original exception on line 112 is lost, maybe this chained catches can we rewritten to be sequentials?

ex = interlaNext
if ex != null {
  retry()
}

Or something similar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants