Skip to content

Latest commit

 

History

History
75 lines (58 loc) · 2.63 KB

README.md

File metadata and controls

75 lines (58 loc) · 2.63 KB

Maven Central build codecov

Pulsar Reactive Client

This is a wrapper around asyncronous facilities provided by the official Apache Pulsar Java Client using Reactor Core interfaces.

How to use

Maven

<dependency>
  <groupId>com.rpuch.pulsar-reactive-client</groupId>
  <artifactId>pulsar-client-reactor</artifactId>
  <version>1.1.0</version>
</dependency>

Gradle

implementation 'com.rpuch.pulsar-reactive-client:pulsar-client-reactor:1.1.0'

Create a reactive client

PulsarClient coreClient = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).build();
ReactivePulsarClient client = ReactivePulsarClient.from(coreClient);

Produce a message

MessageId messageId = client.newProducer()
        .topic("my-topic")
        .forOne(producer -> producer.send("Hello!".bytes()))
        .block();

Consume an infinite stream of messaging acknowledging each after processing it starting at the very beginning of a topic

client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .forMany(consumer -> consumer.messages().concatMap(message -> {
            String str = new String(msg.getData());
            System.out.println(str);
            return consumer.acknowledge(message);
        }))
        .subscribe();

Receive an infinite stream of messages starting at the very beginning of a topic

Flux<Message<byte[]>> messages = client.newReader()
        .topic("my-topic")
        .startMessageId(MessageId.earliest)
        .messages();
messages.map(msg -> new String(msg.getData())).subscribe(System.out::println);

Missing features, coming soon

  • Support for fast message publishing, batches and chunked messages
  • Support for transactions
  • Addition of support for RxJava and alternatives Reactive Streams implementations (like Mutiny) is under consideration