-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[connector] Fix partition state update and filter non-existent partitions on restore #359
base: main
Are you sure you want to change the base?
Conversation
...k/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceFailOverITCase.java
Outdated
Show resolved
Hide resolved
- Mark active bounded splits as finished on partition removal. - Remove pending snapshot splits and unsubscribe dropped log buckets. - Catch PartitionNotExistException during subscribeLog to log a warning.
3814713
to
06d5104
Compare
// Traverse the exception chain to check for PartitionNotExistException. | ||
Throwable cause = e; | ||
boolean partitionNotExist = false; | ||
while (cause != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can replace the while loop using
boolean partitionNotExist = ExceptionUtils.findThrowable(e, PartitionNotExistException.class).isPresent();
} catch (IOException e) { | ||
LOG.warn( | ||
"Failed to close current bounded split for removed partition {}.", | ||
currentBucket.getPartitionId(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can also use removedPartitions.get(currentBucket.getPartitionId())
to print the partition name rather than id.
LOG.warn( | ||
"Partition {} does not exist when subscribing to log for split {}. Skipping subscription.", | ||
partitionId, | ||
split.splitId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should return
to avoid unnecessary subscribe the partition.
Thanks @codope for the contribution! The fixing looks good to me. I pushed a commit to improve the IT case so that:
Could you help to review the changes @luoyuxia ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -262,17 +287,42 @@ public Set<TableBucket> removePartitions(Map<Long, String> removedPartitions) { | |||
// todo, may consider to close the current snapshot reader if |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The todo comment can be removed now since in this pr, we wil close currentBoundedSplit
@@ -145,6 +149,12 @@ public RecordsWithSplitIds<RecordAndPos> fetch() throws IOException { | |||
new HashSet<>(emptyLogSplits), flinkSourceReaderMetrics); | |||
emptyLogSplits.clear(); | |||
return records; | |||
} else if (!removedSplits.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can check !removedSplits.isEmpty()
at the beginning so that the flink enumerate can remove the state of removed splits as soon as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can move the code block at the beginning of fetch()
method, because the currentBoundedSplitReader
may still work in progress.
try { | ||
if (partitionId != null) { | ||
// Try to subscribe using the partition id. | ||
logScanner.subscribe(partitionId, bucket, startingOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
I think we can limit the try catch scope to only this line since only subscribe a partition, the partitionNotExist will be thrown.
I'd like suggest to be:
if (partitionId != null) {
// Try to subscribe using the partition id.
try {
logScanner.subscribe(partitionId, bucket, startingOffset);
} catch (Exception e) {
// the PartitionNotExistException may still happens when partition is removed
// but Flink source reader failover before aware of it
// Traverse the exception chain to check for PartitionNotExistException.
boolean partitionNotExist =
ExceptionUtils.findThrowable(e, PartitionNotExistException.class)
.isPresent();
if (partitionNotExist) {
// mark the not exist partition to be removed
removedSplits.add(split.splitId());
LOG.warn(
"Partition {} does not exist when subscribing to log for split {}. Skipping subscription.",
partitionId,
split.splitId());
return;
}
}
} else {
// If no partition id, subscribe by bucket only.
logScanner.subscribe(bucket, startingOffset);
}
ExceptionUtils.findThrowable(e, PartitionNotExistException.class) | ||
.isPresent(); | ||
if (partitionNotExist) { | ||
LOG.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also should mark split as removed since the partition is removed and so that flink enumerator can remove this split state.
Purpose
Linked issue: close #348
This PR fixes an issue where removed partitions were still retained in the source reader’s state. Upon recovery, the source attempted to re-subscribe to non-existent partitions, leading to a
PartitionNotExistException
.Changes
Mark Finished Splits on Partition Removal:
Active bounded splits are now marked as finished when a partition removal event is received. This prevents removed partitions from being restored.
Remove Pending Snapshot Splits and Unsubscribe Buckets:
Pending snapshot splits and subscribed log buckets associated with removed partitions are removed and unsubscribed respectively.
Filter Non-existent Partitions During Restore:
The
subscribeLog
method now catchesPartitionNotExistException
and logs a warning instead of failing, filtering out any non-existent partitions on restore.Tests
Added
FlinkTableSourceFailOverITCase
integration test.API and Format
None
Documentation
NA