Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,32 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
}
}

// Create pulsar consumer.
// Create pulsar consumer with retry.
try {
this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition());
} catch (PulsarClientException e) {
throw new FlinkRuntimeException(e);
LOG.warn("Failed to create consumer on partition {} on first attempt, will retry after 2 seconds",
registeredSplit.getPartition(), e);

try {
Thread.sleep(2000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new FlinkRuntimeException(
String.format("Interrupted while waiting to retry consumer creation on partition %s",
registeredSplit.getPartition()), ie);
}

// Retry consumer creation
try {
this.pulsarConsumer = createPulsarConsumer(registeredSplit.getPartition());
LOG.info("Successfully created consumer on partition {} on second attempt",
registeredSplit.getPartition());
} catch (PulsarClientException retryException) {
throw new FlinkRuntimeException(
String.format("Failed to create consumer on partition %s after retry",
registeredSplit.getPartition()), retryException);
}
}

LOG.info("Register split {} consumer for current reader.", registeredSplit);
Expand Down
Loading