Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.
This package uses Storage Blobs as a persistent store for maintaining checkpoints and partition ownership information.
The BlobPartitionManager
provided in this package can be plugged in to EventProcessor
.
Source code | API reference documentation | Product documentation | Samples
- Java Development Kit (JDK) with version 8 or above
- Maven
- Microsoft Azure subscription
- You can create a free account at: https://azure.microsoft.com
- Azure Event Hubs instance
- Step-by-step guide for [creating an Event Hub using the Azure Portal][event_hubs_create]
- Azure Storage account
- Step-by-step guide for creating a Storage account using the Azure Portal
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.0.0-preview.3</version>
</dependency>
In order to create an instance of BlobPartitionManager
, a ContainerAsyncClient
should first be created with
appropriate SAS token with write access and connection string. To make this possible you'll need the Account SAS
(shared access signature) string of Storage account. Learn more at SAS Token.
Checkpointing is a process by which readers mark or commit their position within a partition event sequence. Checkpointing is the responsibility of the consumer and occurs on a per-partition basis within a consumer group. This responsibility means that for each consumer group, each partition reader must keep track of its current position in the event stream, and can inform the service when it considers the data stream complete. If a reader disconnects from a partition, when it reconnects it begins reading at the checkpoint that was previously submitted by the last reader of that partition in that consumer group. When the reader connects, it passes the offset to the event hub to specify the location at which to start reading. In this way, you can use checkpointing to both mark events as "complete" by downstream applications, and to provide resiliency if a failover between readers running on different machines occurs. It is possible to return to older data by specifying a lower offset from this checkpointing process. Through this mechanism, checkpointing enables both failover resiliency and event stream replay.
Both offset & sequence number refer to the position of an event within a partition. You can think of them as a client-side cursor. The offset is a byte numbering of the event. The offset/sequence number enables an event consumer (reader) to specify a point in the event stream from which they want to begin reading events. You can specify the a timestamp such that you receive events that were enqueued only after the given timestamp. Consumers are responsible for storing their own offset values outside of the Event Hubs service. Within a partition, each event includes an offset, sequence number and the timestamp of when it was enqueued.
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();
To consume events for all partitions of an Event Hub, you'll create an EventProcessor
for a
specific consumer group. When an Event Hub is created, it provides a default consumer group that can be used to get
started.
The EventProcessor
will delegate processing of events to a
PartitionProcessor
implementation that you provide, allowing your application to focus on the
business logic needed to provide value while the processor holds responsibility for managing the underlying consumer operations.
In our example, we will focus on building the EventProcessor
, use the
BlobPartitionManager
, and a simple PartitionProcessor
implementation that logs
events received from Event Hubs to console.
class Program {
public static void main(String[] args) {
EventProcessor eventProcessor = new EventHubClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE EVENT HUB INSTANCE >>")
.consumerGroupName("<< CONSUMER GROUP NAME>>")
.partitionProcessorFactory(SimplePartitionProcessor::new)
.partitionManager(new BlobPartitionManager(blobContainerAsyncClient))
.buildEventProcessor();
// This will start the processor. It will start processing events from all partitions.
eventProcessor.start();
// When the user wishes to stop processing events, they can call `stop()`.
eventProcessor.stop();
}
}
class SimplePartitionProcessor extends PartitionProcessor {
// @Override
Mono<Void> processEvent(PartitionContext partitionContext, EventData eventData) {
System.out.printf("Event received. Sequence number: %s%n.", eventData.sequenceNumber());
return partitionContext.updateCheckpoint(eventData);
}
}
You can set the AZURE_LOG_LEVEL
environment variable to view logging statements made in the client library. For
example, setting AZURE_LOG_LEVEL=2
would show all informational, warning, and error log messages. The log levels can
be found here: log levels.
Get started by exploring the following samples:
If you would like to become an active contributor to this project please refer to our Contribution Guidelines for more information.