Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move register callback to non builder inference #94

Merged
merged 3 commits into from
Mar 26, 2024

Conversation

kishansairam9
Copy link

No description provided.

@kishansairam9 kishansairam9 requested a review from a team as a code owner March 26, 2024 05:39
Copy link

codecov bot commented Mar 26, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 75.25%. Comparing base (df4f796) to head (17f50bf).

Additional details and impacted files
@@             Coverage Diff              @@
##               main      #94      +/-   ##
============================================
- Coverage     75.40%   75.25%   -0.15%     
- Complexity      238      240       +2     
============================================
  Files            44       44              
  Lines          1061     1063       +2     
  Branches         88       87       -1     
============================================
  Hits            800      800              
- Misses          226      228       +2     
  Partials         35       35              
Flag Coverage Δ
unit 75.25% <100.00%> (-0.15%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

github-actions bot commented Mar 26, 2024

Test Results

15 files  ±0  15 suites  ±0   30s ⏱️ -1s
66 tests ±0  66 ✅ ±0  0 💤 ±0  0 ❌ ±0 
84 runs  ±0  84 ✅ ±0  0 💤 ±0  0 ❌ ±0 

Results for commit 17f50bf. ± Comparison against base commit df4f796.

This pull request removes 5 and adds 5 tests. Note that renamed tests count towards both.
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [1] {rocksdb.periodic.compaction.seconds=60, rocksdb.compaction.style=UNIVERSAL, rocksdb.compaction.universal.compression.size.percent=40, rocksdb.compaction.universal.max.size.amplification.percent=50, application.id=app-2}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [1] {rocksdb.write.buffer.size=8388608, rocksdb.direct.reads.enabled=true, rocksdb.max.write.buffers=2, rocksdb.compaction.style=LEVEL, application.id=app-1, rocksdb.log.level=INFO_LEVEL, rocksdb.compression.type=SNAPPY_COMPRESSION, rocksdb.block.size=8388608}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [2] {rocksdb.write.buffer.size=8388607, rocksdb.direct.reads.enabled=true, rocksdb.max.write.buffers=3, rocksdb.compaction.style=UNIVERSAL, application.id=app-2, rocksdb.log.level=DEBUG_LEVEL, rocksdb.compression.type=SNAPPY_COMPRESSION, rocksdb.block.size=8388609}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [3] {rocksdb.write.buffer.size=8388609, rocksdb.direct.reads.enabled=false, rocksdb.max.write.buffers=4, rocksdb.compaction.style=FIFO, application.id=app-3, rocksdb.log.level=ERROR_LEVEL, rocksdb.compression.type=SNAPPY_COMPRESSION, rocksdb.block.size=8388607}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [5] {rocksdb.cache.high.priority.pool.ratio=0.2, rocksdb.cache.write.buffers.ratio=0.9, application.id=app-5}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [1] {rocksdb.compaction.style=UNIVERSAL, rocksdb.periodic.compaction.seconds=60, application.id=app-2, rocksdb.compaction.universal.max.size.amplification.percent=50, rocksdb.compaction.universal.compression.size.percent=40}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [1] {rocksdb.log.level=INFO_LEVEL, application.id=app-1, rocksdb.compaction.style=LEVEL, rocksdb.max.write.buffers=2, rocksdb.direct.reads.enabled=true, rocksdb.write.buffer.size=8388608, rocksdb.block.size=8388608, rocksdb.compression.type=SNAPPY_COMPRESSION}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [2] {rocksdb.log.level=DEBUG_LEVEL, application.id=app-2, rocksdb.compaction.style=UNIVERSAL, rocksdb.max.write.buffers=3, rocksdb.direct.reads.enabled=true, rocksdb.write.buffer.size=8388607, rocksdb.block.size=8388609, rocksdb.compression.type=SNAPPY_COMPRESSION}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [3] {rocksdb.log.level=ERROR_LEVEL, application.id=app-3, rocksdb.compaction.style=FIFO, rocksdb.max.write.buffers=4, rocksdb.direct.reads.enabled=false, rocksdb.write.buffer.size=8388609, rocksdb.block.size=8388607, rocksdb.compression.type=SNAPPY_COMPRESSION}
org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetterTest ‑ [5] {rocksdb.cache.high.priority.pool.ratio=0.2, application.id=app-5, rocksdb.cache.write.buffers.ratio=0.9}

♻️ This comment has been updated with latest results.

@@ -30,20 +30,29 @@
* for sample usage and test. Note that testing requires Thread.sleep > poll timeout in between
*/
public class KafkaLiveEventListener<K, V> implements AutoCloseable {
Queue<BiConsumer<? super K, ? super V>> callbacks;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be private final? Also suggest making it a Collection - the queue is an impl detail that could just as easily be satisfied with a different collection implementation.

@@ -54,13 +63,15 @@ public void close() throws Exception {
}

public static final class Builder<K, V> {
List<BiConsumer<? super K, ? super V>> callbacks = new ArrayList<>();
Queue<BiConsumer<? super K, ? super V>> deprecatedCallbacksFlow = new ConcurrentLinkedQueue<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - looks like these were these exposed to package before? Since we're technically breaking it here anyway, does it need to be?

naming - this isn't really a flow, it's used by the flow. I'd just leave it as callbacks and mark it deprecated

@@ -73,17 +84,17 @@ public Builder<K, V> withExecutorService(

public KafkaLiveEventListener<K, V> build(
String consumerName, Config kafkaConfig, Consumer<K, V> kafkaConsumer) {
assertCallbacksPresent();
Queue<BiConsumer<? super K, ? super V>> callbacks;
if (deprecatedCallbacksFlow.size() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these size check here? if nothing was added to it, then it's already an empty queue.

ExecutorService executorService,
boolean cleanupExecutor) {
this.callbacks = callbacks;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we had discussed the other day either here or in the builder (ideally here), we should copy off the array.

The usage of a concurrent queue is an impl detail of this class only, so shouldn't rely on a concurrent impl being passed in - the builder didn't even need to change since it doesn't have its own requirement for concurrency.

ExecutorService executorService = Executors.newSingleThreadExecutor();
boolean cleanupExecutor =
private final Collection<BiConsumer<? super K, ? super V>> callbacks =
new ConcurrentLinkedQueue<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - no reason to use a concurrent queue here. No concurrency requirements in the builder and this will be copied into a concurrent queue when the listener is built.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I realised it, felt no harm to have a concurrent safe data structure because in deprecated method we suggested to have singleton instance of builder

@laxmanchekka
Copy link
Contributor

Approved. I would like to revisit this listener framework and usage once.
Will go through the wiki available once again.

@kishansairam9 kishansairam9 merged commit f0f565e into main Mar 26, 2024
7 checks passed
@kishansairam9 kishansairam9 deleted the interface-change-2 branch March 26, 2024 07:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants