Skip to content

Conversation

@hachikuji
Copy link
Contributor

@hachikuji hachikuji commented May 16, 2025

We use two paths to find the committed offset sent through BatchFlusher.flushWriteBatch. One of them is through the offset committed through the consumer, and the other is through the producer's send callback. In the first case, the offset we forward through is exclusive (one more than the last consumed record). In the second, it is inclusive (matches the offset of the last written record).

Later when we are restoring in ResponsiveRestoreConsumer, we seek to the last committed offset in the remote store. This is correct in the case of the exclusive consumer committed offset. However, for the inclusive produced offset, we would replay the last written record. This is probably not a big deal for most stores, but RS3 has strict offset validation. If we are just replaying a single record, then we would send an expected written offset which exactly matches the (inclusive) offset of the record. This leads to an IllegalWalSegment error from RS3.

You can see this chain of events from the following logs captured from the window soak:

// Recover last committed offset 8663
[2025-05-16 20:09:00,115] INFO Restore RS3 table from offset 8663 for LssId{id=4}. recorded written offsets: 4 -> 8663 (dev.responsive.kafka.internal.db.rs3.client.RS3ClientUtil:65)

// Find range of offsets to recover (8663 to 8665)
[2025-05-16 20:09:00,377] INFO Beginning restoration from offset 8663 to 8665 at 1747426140377ms (epoch time) for partition rs3-window-aggregator-dev-agg-window-store-changelog-4 of state store agg-window-store (dev.responsive.kafka.internal.metrics.ResponsiveRestoreLis\
tener:110)

// Seek to last committed offset 8663
[2025-05-16 20:09:00,377] INFO [Consumer clientId=rs3-window-aggregator-dev-cbf2b455-b7db-4e42-bbfa-0cee71b34d56-StreamThread-1-restore-consumer, groupId=null] Seeking to offset 8663 for partition rs3-window-aggregator-dev-agg-window-store-changelog-4 (org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer:784)


// Flush exactly one record at offset 8663
[2025-05-16 20:09:00,589] INFO commit-buffer [agg_window_store-4] Flushing 1 records with batchSize=1 to remote (offset=8663, writer=dev.responsive.kafka.internal.db.BatchFlusher@4c92229f) (dev.responsive.kafka.internal.stores.CommitBuffer:399)

// RS3 returns `IllegalWalSegment`
[2025-05-16 20:09:00,702] ERROR Hit StreamsException on task Optional.empty (dev.responsive.soak.Main:362)
org.apache.kafka.streams.errors.ProcessorStateException: stream-thread [rs3-window-aggregator-dev-cbf2b455-b7db-4e42-bbfa-0cee71b34d56-StreamThread-1] stream-task [0_4] Exception caught while trying to restore state from rs3-window-aggregator-dev-agg-window-store-changelog-4
...
Caused by: dev.responsive.kafka.internal.db.rs3.client.RS3Exception: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: illegal wal segment
        at dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRs3Util.wrapThrowable(GrpcRs3Util.java:51)
        at dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcMessageReceiver.onError(GrpcMessageReceiver.java:35)
        at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)

The patch fixes the problem by incrementing the offset passed through offsets written by the producer. This ensures that the notifyCommit callback always receives an exclusive offset.

@hachikuji hachikuji force-pushed the fix-commit-off-by-one branch from 61a3216 to dcc34f2 Compare May 21, 2025 18:43
changelog.topic()
);
table.init(0);
table = cassandraClient.kvFactory()
Copy link
Contributor Author

@hachikuji hachikuji May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman I had to partially revert #383 here. One of the tests shouldRestoreUnflushedChangelog used a cassandra fault injector which did nothing when mongo was used as the backend.

}

@ParameterizedTest
@EnumSource(KVSchema.class)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fault injector doesn't actually work with KVSchema.FACT because it uses a different statement type. I think the off-by-one error was allowing the assertions to pass because we always reconsumed one extra record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found a good way to target the update from flush when using the FACT table. It uses a DefaultBoundStatement which is used by several other queries prior to flush. Without the targeted fault, the task crashes before it reads any input.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants