Skip to content

Commit 4d46c53

Browse files
author
Alexander Preuß
authored
Add spring-pulsar example (#98)
1 parent 4cfb362 commit 4d46c53

File tree

13 files changed

+419
-1
lines changed

13 files changed

+419
-1
lines changed

.DS_Store

-6 KB
Binary file not shown.

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,6 @@ target/
3131
# Intellij
3232
.idea
3333
*.iml
34+
35+
# macOS
36+
.DS_Store

README.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,8 @@ This is a curated list of demos that showcase Apache Pulsar® messaging and even
1616

1717
# Flink
1818

19-
- [Pulsar Flink Connector](pulsar-flink/README.md)
19+
- [Pulsar Flink Connector](pulsar-flink/README.md)
20+
21+
# Spring for Apache Pulsar
22+
23+
- [Spring for Apache Pulsar Example](spring-pulsar/README.md)

spring-pulsar/.gitignore

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
*.iml
2+
*.ipr
3+
*.iws
4+
.classpath
5+
.gradle
6+
.idea
7+
.project
8+
.settings
9+
.sts4-cache
10+
.checkstyle
11+
bin
12+
build
13+
!/**/src/**/bin
14+
!/**/src/**/build
15+
build.log
16+
out
17+
target
18+
.DS_Store

spring-pulsar/README.md

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Spring Pulsar Integration Example
2+
3+
This is an example application demonstrating how to use [Spring for Apache Pulsar](https://github.com/spring-projects-experimental/spring-pulsar).
4+
5+
## Prerequisites
6+
7+
* [Java 17 or higher](https://jdk.java.net/17/) to run
8+
* [Maven](https://maven.apache.org/) to compile
9+
* [StreamNative Cloud](https://streamnative.io) instance or your own Pulsar cluster
10+
11+
12+
If you are using StreamNative Cloud you need to configure [your Pulsar service URLs and authentication](https://docs.streamnative.io/cloud/stable/connect/overview) in the [application.yaml file](src/main/resources/application.yml) as detailed below:
13+
```yaml
14+
spring:
15+
pulsar:
16+
client:
17+
service-url: pulsar+ssl://free.o-j8r1u.snio.cloud:6651
18+
auth-plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
19+
authentication:
20+
private-key: file:///Users/user/Downloads/o-j8r1u-free.json
21+
audience: urn:sn:pulsar:o-j8r1u:free
22+
issuer-url: https://auth.streamnative.cloud/
23+
```
24+
25+
## Build the example
26+
27+
You can build the example application with the following command:
28+
29+
```bash
30+
mvn clean install
31+
```
32+
33+
## Run the example
34+
35+
You can run the example application with the following command:
36+
37+
```bash
38+
mvn spring-boot:run
39+
```

spring-pulsar/pom.xml

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>org.springframework.boot</groupId>
8+
<artifactId>spring-boot-starter-parent</artifactId>
9+
<version>3.0.0-M4</version>
10+
<relativePath/> <!-- lookup parent from repository -->
11+
</parent>
12+
<groupId>io.streamnative.example</groupId>
13+
<artifactId>spring-pulsar-example</artifactId>
14+
<version>1.0.0</version>
15+
<name>Spring Pulsar Example</name>
16+
<description>Demo project for Spring integration with Apache Pulsar</description>
17+
<properties>
18+
<java.version>17</java.version>
19+
<jfairy.version>0.6.5</jfairy.version>
20+
<spring-pulsar.version>0.1.0-M1</spring-pulsar.version>
21+
</properties>
22+
<dependencies>
23+
<dependency>
24+
<groupId>org.springframework.pulsar</groupId>
25+
<artifactId>spring-pulsar-spring-boot-starter</artifactId>
26+
<version>${spring-pulsar.version}</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.springframework.boot</groupId>
30+
<artifactId>spring-boot-configuration-processor</artifactId>
31+
</dependency>
32+
<dependency>
33+
<groupId>com.devskiller</groupId>
34+
<artifactId>jfairy</artifactId>
35+
<version>${jfairy.version}</version>
36+
</dependency>
37+
</dependencies>
38+
<build>
39+
<plugins>
40+
<plugin>
41+
<groupId>org.springframework.boot</groupId>
42+
<artifactId>spring-boot-maven-plugin</artifactId>
43+
</plugin>
44+
</plugins>
45+
</build>
46+
<repositories>
47+
<repository>
48+
<id>spring-milestones</id>
49+
<name>Spring Milestones</name>
50+
<url>https://repo.spring.io/milestone</url>
51+
<snapshots>
52+
<enabled>false</enabled>
53+
</snapshots>
54+
</repository>
55+
<repository>
56+
<id>spring-snapshots</id>
57+
<name>Spring Snapshots</name>
58+
<url>https://repo.spring.io/snapshot</url>
59+
<releases>
60+
<enabled>false</enabled>
61+
</releases>
62+
</repository>
63+
</repositories>
64+
<pluginRepositories>
65+
<pluginRepository>
66+
<id>spring-milestones</id>
67+
<name>Spring Milestones</name>
68+
<url>https://repo.spring.io/milestone</url>
69+
<snapshots>
70+
<enabled>false</enabled>
71+
</snapshots>
72+
</pluginRepository>
73+
<pluginRepository>
74+
<id>spring-snapshots</id>
75+
<name>Spring Snapshots</name>
76+
<url>https://repo.spring.io/snapshot</url>
77+
<releases>
78+
<enabled>false</enabled>
79+
</releases>
80+
</pluginRepository>
81+
</pluginRepositories>
82+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
3+
* in compliance with the License. You may obtain a copy of the License at
4+
*
5+
* <p>http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
8+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
9+
* express or implied. See the License for the specific language governing permissions and
10+
* limitations under the License.
11+
*/
12+
package io.streamnative.example;
13+
14+
import io.streamnative.example.datagen.SignupGenerator;
15+
import io.streamnative.example.model.Customer;
16+
import io.streamnative.example.model.Signup;
17+
import io.streamnative.example.model.SignupTier;
18+
import org.apache.pulsar.client.api.PulsarClientException;
19+
import org.apache.pulsar.client.impl.schema.JSONSchema;
20+
import org.apache.pulsar.common.schema.SchemaType;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
import org.springframework.beans.factory.annotation.Autowired;
24+
import org.springframework.boot.SpringApplication;
25+
import org.springframework.boot.autoconfigure.SpringBootApplication;
26+
import org.springframework.pulsar.annotation.PulsarListener;
27+
import org.springframework.pulsar.core.PulsarTemplate;
28+
import org.springframework.scheduling.annotation.EnableScheduling;
29+
import org.springframework.scheduling.annotation.Scheduled;
30+
31+
@EnableScheduling
32+
@SpringBootApplication
33+
public class SignupApplication {
34+
35+
private static final Logger log = LoggerFactory.getLogger(SignupApplication.class);
36+
37+
@Autowired private PulsarTemplate<Signup> signupTemplate;
38+
39+
@Autowired private PulsarTemplate<Customer> customerTemplate;
40+
41+
@Autowired private SignupGenerator signupGenerator;
42+
43+
public static void main(String[] args) {
44+
SpringApplication.run(SignupApplication.class, args);
45+
}
46+
47+
@Scheduled(initialDelay = 5000, fixedRate = 5000)
48+
void publishSignupData() throws PulsarClientException {
49+
Signup signup = signupGenerator.generate();
50+
signupTemplate.setSchema(JSONSchema.of(Signup.class));
51+
signupTemplate.send(signup);
52+
}
53+
54+
@PulsarListener(
55+
subscriptionName = "signup-consumer",
56+
topics = "signups-topic",
57+
schemaType = SchemaType.JSON)
58+
void filterSignups(Signup signup) throws PulsarClientException {
59+
log.info(
60+
"{} {} ({}) just signed up for {} tier",
61+
signup.firstName(),
62+
signup.lastName(),
63+
signup.companyEmail(),
64+
signup.signupTier());
65+
66+
if (signup.signupTier() == SignupTier.ENTERPRISE) {
67+
Customer customer = Customer.from(signup);
68+
customerTemplate.setSchema(JSONSchema.of(Customer.class));
69+
customerTemplate.send("customer-success", customer);
70+
}
71+
}
72+
73+
@PulsarListener(
74+
subscriptionName = "customer-consumer",
75+
topics = "customer-success",
76+
schemaType = SchemaType.JSON)
77+
void alertCustomerSuccess(Customer customer) {
78+
log.info(
79+
"## Start the onboarding for {} - {} {} ({}) - {} ##",
80+
customer.companyName(),
81+
customer.firstName(),
82+
customer.lastName(),
83+
customer.phoneNumber(),
84+
customer.companyEmail());
85+
}
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
3+
* in compliance with the License. You may obtain a copy of the License at
4+
*
5+
* <p>http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
8+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
9+
* express or implied. See the License for the specific language governing permissions and
10+
* limitations under the License.
11+
*/
12+
package io.streamnative.example;
13+
14+
import io.streamnative.example.datagen.SignupGenerator;
15+
import org.apache.pulsar.client.api.Message;
16+
import org.apache.pulsar.client.api.MessageId;
17+
import org.apache.pulsar.client.api.Producer;
18+
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
import org.springframework.context.annotation.Bean;
22+
import org.springframework.context.annotation.Configuration;
23+
24+
@Configuration(proxyBeanMethods = false)
25+
class SignupConfiguration {
26+
27+
@Bean
28+
SignupGenerator signupGenerator() {
29+
return new SignupGenerator();
30+
}
31+
32+
@Bean
33+
ProducerInterceptor loggingInterceptor() {
34+
return new LoggingInterceptor();
35+
}
36+
37+
static class LoggingInterceptor implements ProducerInterceptor {
38+
private static final Logger log = LoggerFactory.getLogger(LoggingInterceptor.class);
39+
40+
@Override
41+
public void close() {
42+
// no-op
43+
}
44+
45+
@Override
46+
public boolean eligible(Message message) {
47+
return true;
48+
}
49+
50+
@Override
51+
public Message beforeSend(Producer producer, Message message) {
52+
return message;
53+
}
54+
55+
@Override
56+
public void onSendAcknowledgement(
57+
Producer producer, Message message, MessageId msgId, Throwable exception) {
58+
log.debug("MessageId: {}, Value: {}", message.getMessageId(), message.getValue());
59+
}
60+
}
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
3+
* in compliance with the License. You may obtain a copy of the License at
4+
*
5+
* <p>http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
8+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
9+
* express or implied. See the License for the specific language governing permissions and
10+
* limitations under the License.
11+
*/
12+
package io.streamnative.example.datagen;
13+
14+
import com.devskiller.jfairy.Fairy;
15+
import com.devskiller.jfairy.producer.person.Person;
16+
import io.streamnative.example.model.Signup;
17+
import io.streamnative.example.model.SignupTier;
18+
import java.util.Random;
19+
import org.springframework.stereotype.Component;
20+
21+
public class SignupGenerator {
22+
23+
private static final Random random = new Random();
24+
25+
private static final Fairy fairy = Fairy.create();
26+
27+
public Signup generate() {
28+
Person person = fairy.person();
29+
return new Signup(
30+
generateSignupTier(),
31+
person.getFirstName(),
32+
person.getLastName(),
33+
person.getTelephoneNumber(),
34+
person.getCompany().getName(),
35+
person.getCompanyEmail(),
36+
System.currentTimeMillis());
37+
}
38+
39+
private static SignupTier generateSignupTier() {
40+
return SignupTier.values()[random.nextInt(SignupTier.values().length)];
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.example.model;
15+
16+
public record Customer(String firstName, String lastName, String phoneNumber, String companyName, String companyEmail, long signupTimestamp) {
17+
18+
public static Customer from(Signup signup) {
19+
return new Customer(
20+
signup.firstName(),
21+
signup.lastName(),
22+
signup.phoneNumber(),
23+
signup.companyName(),
24+
signup.companyEmail(),
25+
signup.signupTimestamp()
26+
);
27+
}
28+
}

0 commit comments

Comments
 (0)