diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java index 6e76e98b..c700f888 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java @@ -234,11 +234,32 @@ public void handleSplitsChanges(SplitsChange splitsChanges } } - // Create pulsar consumer. + // Create pulsar consumer with retry. try { this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); } catch (PulsarClientException e) { - throw new FlinkRuntimeException(e); + LOG.warn("Failed to create consumer on partition {} on first attempt, will retry after 2 seconds", + registeredSplit.getPartition(), e); + + try { + Thread.sleep(2000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new FlinkRuntimeException( + String.format("Interrupted while waiting to retry consumer creation on partition %s", + registeredSplit.getPartition()), ie); + } + + // Retry consumer creation + try { + this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition()); + LOG.info("Successfully created consumer on partition {} on second attempt", + registeredSplit.getPartition()); + } catch (PulsarClientException retryException) { + throw new FlinkRuntimeException( + String.format("Failed to create consumer on partition %s after retry", + registeredSplit.getPartition()), retryException); + } } LOG.info("Register split {} consumer for current reader.", registeredSplit);