From 4bacee3468c446f0e8985f76b9a2b77e053d0372 Mon Sep 17 00:00:00 2001 From: Victor Babenko Date: Wed, 29 Oct 2025 13:51:15 -0700 Subject: [PATCH 1/2] Add partition name to the error message when failing to create a consumer --- .../pulsar/source/reader/PulsarPartitionSplitReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java index 6e76e98b..8eebb328 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java @@ -238,7 +238,7 @@ public void handleSplitsChanges(SplitsChange splitsChanges try { this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); } catch (PulsarClientException e) { - throw new FlinkRuntimeException(e); + throw new FlinkRuntimeException(String.format("Failed to create consumer on partition %s", registeredSplit.getPartition()), e); } LOG.info("Register split {} consumer for current reader.", registeredSplit); From aee6fba037dc29b104183d3798e230d6e10d7f34 Mon Sep 17 00:00:00 2001 From: Victor Babenko Date: Wed, 29 Oct 2025 19:51:50 -0700 Subject: [PATCH 2/2] Fix a race condition in the split reader by retrying the subscription after a 2-second delay --- .../reader/PulsarPartitionSplitReader.java | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java index 8eebb328..c700f888 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java @@ -234,11 +234,32 @@ public void handleSplitsChanges(SplitsChange splitsChanges } } - // Create pulsar consumer. + // Create pulsar consumer with retry. try { this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); } catch (PulsarClientException e) { - throw new FlinkRuntimeException(String.format("Failed to create consumer on partition %s", registeredSplit.getPartition()), e); + LOG.warn("Failed to create consumer on partition {} on first attempt, will retry after 2 seconds", + registeredSplit.getPartition(), e); + + try { + Thread.sleep(2000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new FlinkRuntimeException( + String.format("Interrupted while waiting to retry consumer creation on partition %s", + registeredSplit.getPartition()), ie); + } + + // Retry consumer creation + try { + this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); + LOG.info("Successfully created consumer on partition {} on second attempt", + registeredSplit.getPartition()); + } catch (PulsarClientException retryException) { + throw new FlinkRuntimeException( + String.format("Failed to create consumer on partition %s after retry", + registeredSplit.getPartition()), retryException); + } } LOG.info("Register split {} consumer for current reader.", registeredSplit);