-
Notifications
You must be signed in to change notification settings - Fork 5
Introduce origin event tracking mechanism #424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
7643036
635abbd
bb70e30
0bb5c84
45758e3
d6d0578
36b89ea
e60e991
9fcb71b
08fdf12
0cea25e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| +20 −0 | .github/workflows/fossa.yml | |
| +7 −3 | docs/specification.md |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| /* | ||
| * Copyright 2024 Responsive Computing, Inc. | ||
agavra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * | ||
| * This source code is licensed under the Responsive Business Source License Agreement v1.0 | ||
| * available at: | ||
| * | ||
| * https://www.responsive.dev/legal/responsive-bsl-10 | ||
| * | ||
| * This software requires a valid Commercial License Key for production use. Trial and commercial | ||
| * licenses can be obtained at https://www.responsive.dev | ||
| */ | ||
|
|
||
| package dev.responsive.kafka.internal.clients; | ||
|
|
||
| import java.util.Collection; | ||
| import java.util.Map; | ||
| import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
| import org.apache.kafka.clients.consumer.OffsetAndMetadata; | ||
| import org.apache.kafka.clients.producer.ProducerRecord; | ||
| import org.apache.kafka.common.TopicPartition; | ||
|
|
||
| public class NoopOriginEventRecorder implements OriginEventRecorder { | ||
| @Override | ||
| public <K, V> ConsumerRecords<K, V> onPoll(final ConsumerRecords<K, V> records) { | ||
| return records; | ||
| } | ||
|
|
||
| @Override | ||
| public <K, V> ProducerRecord<K, V> onSend(final ProducerRecord<K, V> record) { | ||
| return record; | ||
| } | ||
|
|
||
| @Override | ||
| public void onConsumerCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) { | ||
| } | ||
|
|
||
| @Override | ||
| public void onProducerCommit() { | ||
| } | ||
|
|
||
| @Override | ||
| public void onSendOffsetsToTransaction( | ||
| final Map<TopicPartition, OffsetAndMetadata> offsets, | ||
| final String consumerGroupId | ||
| ) { | ||
| } | ||
|
|
||
| @Override | ||
| public void onPartitionsLost(final Collection<TopicPartition> partitions) { | ||
| } | ||
|
|
||
| @Override | ||
| public void onUnsubscribe() { | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| /* | ||
| * Copyright 2025 Responsive Computing, Inc. | ||
| * | ||
| * This source code is licensed under the Responsive Business Source License Agreement v1.0 | ||
| * available at: | ||
| * | ||
| * https://www.responsive.dev/legal/responsive-bsl-10 | ||
| * | ||
| * This software requires a valid Commercial License Key for production use. Trial and commercial | ||
| * licenses can be obtained at https://www.responsive.dev | ||
| */ | ||
|
|
||
| package dev.responsive.kafka.internal.clients; | ||
|
|
||
| import java.util.BitSet; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * This class allows us to efficiently count the number of events | ||
| * between two offsets that match a certain condition. This is somewhat | ||
| * memory efficient in that we can track 100K offsets with ~1.5K longs | ||
| * (64 bits per long), or roughly 12KB. | ||
|
Comment on lines
+19
to
+23
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little confused by the point of this class. This is just counting the number of origin events since the last commit, right? It doesn't seem like we ever expose what the actual origin event offsets are, so why do we need to save them in this Lmk if you'd rather sync online because I'm sure I'm missing something here 🙂
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class should definitely have javadoc explaining it, originally I had implemented what you're saying but Rohan pointed out these two concerns: #424 (comment) and #424 (comment) both fixed by this bitset tracker.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, I missed those discussions, but yeah he's right. thanks for filling in the javadocs |
||
| * <p> | ||
| * This is necessary for origin event tracking: since we mark records | ||
| * on poll but report aligned to commit it is possible that there are | ||
| * records that are polled but not included in the committed offsets. | ||
| * This class remedies that by allowing us to count the marked events | ||
| * up to the committed offset. | ||
| * <p> | ||
| * Additionally, there are some situations that would cause us to poll | ||
| * a record multiple times (e.g. a task is reassigned to the same thread | ||
| * after an EOS error). This class is idempotent to marking the same | ||
| * offsets multiple times. | ||
| */ | ||
| public class OffsetTracker { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(OffsetTracker.class); | ||
|
|
||
| private BitSet offsets = new BitSet(); | ||
| private long baseOffset; | ||
|
|
||
| public OffsetTracker(long baseOffset) { | ||
| this.baseOffset = baseOffset; | ||
| } | ||
|
|
||
| public void mark(final long offset) { | ||
| if (offset < baseOffset) { | ||
| LOG.error("Invalid offset {} lower than baseOffset {} marked", offset, baseOffset); | ||
| throw new IllegalArgumentException( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we log an error before throwing that also includes the value of |
||
| "Offset " + offset + " cannot be less than baseOffset " + baseOffset); | ||
| } | ||
|
|
||
| // assume that we won't be committing more than MAX_INT offsets | ||
| // in a single commit... | ||
| final int idx = Math.toIntExact(offset - baseOffset); | ||
| offsets.set(idx); | ||
| } | ||
|
|
||
| public int countAndShift(final long commitOffset) { | ||
| if (commitOffset < baseOffset) { | ||
| LOG.error("Invalid offset {} lower than baseOffset {} committed", commitOffset, baseOffset); | ||
| throw new IllegalArgumentException( | ||
| "Commit offset " + commitOffset + " cannot be less than baseOffset " + baseOffset); | ||
| } | ||
|
|
||
| final int shift = Math.toIntExact(commitOffset - baseOffset); | ||
| final int count = offsets.get(0, shift).cardinality(); | ||
|
|
||
| // shift the bitset so that commitOffset is the new base offset | ||
| // for which we track marked entries | ||
| final var old = this.offsets; | ||
agavra marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| this.offsets = new BitSet(); | ||
| for (int i = old.nextSetBit(shift); i != -1; i = old.nextSetBit(i + 1)) { | ||
| this.offsets.set(i - shift); | ||
| } | ||
|
|
||
| baseOffset = commitOffset; | ||
| return count; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| /* | ||
| * Copyright 2024 Responsive Computing, Inc. | ||
| * | ||
| * This source code is licensed under the Responsive Business Source License Agreement v1.0 | ||
| * available at: | ||
| * | ||
| * https://www.responsive.dev/legal/responsive-bsl-10 | ||
| * | ||
| * This software requires a valid Commercial License Key for production use. Trial and commercial | ||
| * licenses can be obtained at https://www.responsive.dev | ||
| */ | ||
|
|
||
| package dev.responsive.kafka.internal.clients; | ||
|
|
||
| import java.util.Collection; | ||
| import java.util.Map; | ||
| import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
| import org.apache.kafka.clients.consumer.OffsetAndMetadata; | ||
| import org.apache.kafka.clients.producer.ProducerRecord; | ||
| import org.apache.kafka.common.TopicPartition; | ||
|
|
||
| public interface OriginEventRecorder | ||
| extends ResponsiveConsumer.Listener, ResponsiveProducer.Listener { | ||
|
|
||
| @Override | ||
| <K, V> ConsumerRecords<K, V> onPoll(ConsumerRecords<K, V> records); | ||
|
|
||
| @Override | ||
| <K, V> ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); | ||
|
|
||
| @Override | ||
| void onConsumerCommit(Map<TopicPartition, OffsetAndMetadata> offsets); | ||
|
|
||
| @Override | ||
| void onProducerCommit(); | ||
|
|
||
| @Override | ||
| void onSendOffsetsToTransaction( | ||
| final Map<TopicPartition, OffsetAndMetadata> offsets, | ||
| final String consumerGroupId | ||
| ); | ||
|
|
||
| @Override | ||
| void onPartitionsLost(Collection<TopicPartition> partitions); | ||
|
|
||
| @Override | ||
| void onUnsubscribe(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of this License stuff was moved to
LicenseUtils