Framework for integrating java applications with Apache Pulsar.
While this is still an experimental project, it's a fully functional, high-throughput, robust messaging framework. Users are encouraged to try it out and provide feedback.
This project is not currently published to Maven Central - code can be checked out and built locally.
While more Consumers can easily be added to a topic via the concurrency setting, this framework also allows parallel processing based on key in a single or multiple consumers. In this mode, messages from the same key are processed sequentially, while being processed in parallel in relation to other keys.
This framework also supports full fan-out of messages.
Leaning on Apache Pulsar's CompletableFuture API, this framework is fully non-blocking and designed for high-throughput.
It supports both regular blocking and non-blocking components such as MessageListener, ErrorHandler, MessageInterceptor and AcknowledgementResultCallback.
All polling and acknowledgement actions are non-blocking, and blocking components are seamlessly adapted so no async complexity is required from the user (though it's encouraged at least for simple components).
This means application's resources are fully available for user logic, resulting in less costs and environmental impact.
@PulsarListenerannotation withSpELand property placeholder resolution- Automatic
Schemadetection from listener methods - Supports
blockingandasynccomponents Batchmessage processingErrorHandlerwith message recovery supportMessageInterceptorwith callbacks before and after processing- Accepts flexible listener arguments such as
Acknowledgement,BatchAcknowledgement,Consumer,MessageId,@Header, Spring'sMessage, original Pulsar'sMessage, and any user-provided resolvers Header Mapping- Manual
FactoryandContainercreation andlifecyclemanagement to use without annotations - Acknowledgement result callbacks
- MANUAL, ON_SUCCESS and ALWAYS acknowledgement modes
@EnablePulsarfor quick setup (autoconfiguration coming later)- ... and much more!
@SpringBootApplication
public class ApachePulsarSpringApplication {
private static final Logger logger = LoggerFactory.getLogger(ApachePulsarSpringApplication.class);
public static void main(String[] args) {
SpringApplication.run(KafkaParallelDemoApplication.class, args);
}
@PulsarListener(topics = "${my.topic}", id = "my-container",
subscription = "my-subscription", concurrency = "5")
void listen(MyPojo message) {
logger.info("Received message {}", message);
}
@PulsarListener(topics = "${my.batch.topic}", id = "my-batch-container", factory = "parallelPulsarContainerFactory")
CompletableFuture<Void> listen(List<Message<String>> messages) {
return CompletableFuture
.completedFuture(messages)
.thenAccept(msgs -> logger.info("Received {} messages", msgs.size()));
}
@EnablePulsar
@Configuration
static class PulsarConfiguration {
@Bean
MessageListenerContainerFactory<Object> defaultPulsarContainerFactory(ConsumerFactory<Object> consumerFactory) {
return PulsarMessageListenerContainerFactory
.createWith(consumerFactory);
}
@Bean
PulsarMessageListenerContainerFactory<Object> parallelPulsarContainerFactory(ConsumerFactory<Object> consumerFactory) {
return PulsarMessageListenerContainerFactory
.createWith(consumerFactory)
.configure(options -> options
.processingOrdering(ProcessingOrdering.PARALLEL_BY_KEY)
.subscriptionType(SubscriptionType.Key_Shared)
.concurrency(4)
.acknowledgementMode(AcknowledgementMode.MANUAL)
.acknowledgementResultCallback(getManualAckResultCallback()));
}
@Bean
ConsumerFactory<Object> consumerFactory(PulsarClientFactory clientFactory) {
return DefaultPulsarConsumerFactory
.createWith(clientFactory)
.configure(options -> options
.batchReceivePolicy(BatchReceivePolicy
.builder()
.maxNumMessages(10)
.timeout(1, TimeUnit.SECONDS)
.build())
.negativeAckRedeliveryDelay(1, TimeUnit.SECONDS));
}
@Bean
PulsarClientFactory clientFactory() {
return DefaultPulsarClientFactory
.create()
.configure(options -> options
.serviceUrl("http://localhost:6650"));
}
}
}Improvements coming soon include:
Templateclass for sending messages@SendTo@ReplyToProject Reactorsupport