Skip to content

Commit 3d1e967

Browse files
Add Kafka Java client examples for KoP with token authentication (#71)
NOTE: The token authentication in KoP is just the PLAIN mechanism.
1 parent 7fff3c6 commit 3d1e967

File tree

8 files changed

+292
-0
lines changed

8 files changed

+292
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,4 @@ target/
3030

3131
# Intellij
3232
.idea
33+
*.iml

kop/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Overview
2+
3+
This directory includes examples of how Kafka clients connect to a KoP cluster through the Token authentication plugin.
4+
5+
- Supported Kafka clients
6+
- [Java client](https://github.com/streamnative/pulsar-examples/tree/master/kop/java)
7+
- TODO: examples of other clients should be added

kop/java/README.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Overview
2+
3+
This document describes how to produce messages to and consume messages from a KoP cluster using the Kafka client.
4+
5+
# Prerequisites
6+
7+
- Java 1.8 or higher version
8+
- Maven
9+
10+
> **NOTE**
11+
>
12+
> This example uses Kafka client 2.0.0. If you want to use another version of Kafka client, you can change the `kafka.version` property in `pom.xml` file. Kafka client version 1.0.0 - 2.6.0 are supported.
13+
14+
# Example
15+
16+
See [KoP Security](https://github.com/streamnative/kop/blob/master/docs/security.md) for how to configure KoP with token authentication. This example takes a topic named `my-topic` under `public/default` namespace as reference.
17+
18+
1. Grant produce and consume permissions to the specific role.
19+
20+
```bash
21+
bin/pulsar-admin namespaces grant-permission public/default \
22+
--role <role> \
23+
--actions produce,consume
24+
```
25+
26+
> **NOTE**
27+
>
28+
> The `conf/client.conf` should be configured. For details, see [Configure CLI Tools](http://pulsar.apache.org/docs/en/security-jwt/#cli-tools).
29+
30+
2. Configure the token in [token.properties](src/main/resources/token.properties).
31+
32+
```properties
33+
topic=persistent://public/default/my-topic
34+
namespace=public/default
35+
token=token:<token-of-the-role>
36+
```
37+
38+
3. Compile the project.
39+
40+
```
41+
mvn clean compile
42+
```
43+
44+
4. Run a Kafka producer to produce a `hello` message.
45+
46+
```bash
47+
mvn exec:java -Dexec.mainClass=io.streamnative.examples.kafka.TokenProducer
48+
```
49+
50+
**Output:**
51+
52+
```
53+
Send hello to persistent://public/default/my-topic-0@0
54+
```
55+
56+
5. Run a Kafka consumer to consume some messages.
57+
58+
```bash
59+
mvn exec:java -Dexec.mainClass=io.streamnative.examples.kafka.TokenConsumer
60+
```
61+
62+
**Output:**
63+
64+
```
65+
Receive record: hello from persistent://public/default/my-topic-0@0
66+
```

kop/java/pom.xml

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
16+
-->
17+
<project xmlns="http://maven.apache.org/POM/4.0.0"
18+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
22+
<groupId>org.example</groupId>
23+
<artifactId>java</artifactId>
24+
<version>1.0-SNAPSHOT</version>
25+
26+
<properties>
27+
<kafka.version>2.0.0</kafka.version>
28+
<slf4j.simple.version>1.7.30</slf4j.simple.version>
29+
</properties>
30+
31+
<dependencies>
32+
<dependency>
33+
<groupId>org.apache.kafka</groupId>
34+
<artifactId>kafka-clients</artifactId>
35+
<version>${kafka.version}</version>
36+
</dependency>
37+
38+
<dependency>
39+
<groupId>org.slf4j</groupId>
40+
<artifactId>slf4j-simple</artifactId>
41+
<version>${slf4j.simple.version}</version>
42+
<scope>runtime</scope>
43+
</dependency>
44+
</dependencies>
45+
46+
<build>
47+
<plugins>
48+
<plugin>
49+
<groupId>org.apache.maven.plugins</groupId>
50+
<artifactId>maven-compiler-plugin</artifactId>
51+
<configuration>
52+
<source>8</source>
53+
<target>8</target>
54+
</configuration>
55+
</plugin>
56+
</plugins>
57+
</build>
58+
</project>
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.examples.kafka;
15+
16+
import java.io.IOException;
17+
import java.time.Duration;
18+
import java.util.Collections;
19+
import java.util.Properties;
20+
import org.apache.kafka.clients.consumer.ConsumerConfig;
21+
import org.apache.kafka.clients.consumer.ConsumerRecords;
22+
import org.apache.kafka.clients.consumer.KafkaConsumer;
23+
import org.apache.kafka.common.serialization.StringDeserializer;
24+
25+
/**
26+
* A token authentication example of Kafka consumer.
27+
*/
28+
public class TokenConsumer {
29+
public static void main(String[] args) throws IOException {
30+
// 1. Get the configured parameters from token.properties
31+
final Properties tokenProps = new Properties();
32+
tokenProps.load(TokenProducer.class.getClassLoader().getResourceAsStream("token.properties"));
33+
final String bootstrapServers = tokenProps.getProperty("bootstrap.servers");
34+
final String topic = tokenProps.getProperty("topic");
35+
final String group = tokenProps.getProperty("group");
36+
final String namespace = tokenProps.getProperty("namespace");
37+
final String token = tokenProps.getProperty("token");
38+
39+
// 2. Create a consumer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
40+
final Properties props = new Properties();
41+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
42+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
43+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
44+
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
45+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
46+
props.put("security.protocol", "SASL_PLAINTEXT");
47+
props.put("sasl.mechanism", "PLAIN");
48+
props.put("sasl.jaas.config", String.format(
49+
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
50+
namespace, token));
51+
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
52+
consumer.subscribe(Collections.singleton(topic));
53+
54+
// 3. Consume some messages and quit immediately
55+
boolean running = true;
56+
while (running) {
57+
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
58+
if (!records.isEmpty()) {
59+
records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
60+
+ record.topic() + "-" + record.partition() + "@" + record.offset()));
61+
running = false;
62+
}
63+
}
64+
consumer.close();
65+
}
66+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.examples.kafka;
15+
16+
import java.io.IOException;
17+
import java.util.Properties;
18+
import java.util.concurrent.ExecutionException;
19+
import java.util.concurrent.Future;
20+
import org.apache.kafka.clients.producer.KafkaProducer;
21+
import org.apache.kafka.clients.producer.ProducerConfig;
22+
import org.apache.kafka.clients.producer.ProducerRecord;
23+
import org.apache.kafka.clients.producer.RecordMetadata;
24+
import org.apache.kafka.common.serialization.StringSerializer;
25+
26+
/**
27+
* A token authentication example of Kafka producer.
28+
*/
29+
public class TokenProducer {
30+
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
31+
// 1. Get the configured parameters from token.properties
32+
final Properties tokenProps = new Properties();
33+
tokenProps.load(TokenProducer.class.getClassLoader().getResourceAsStream("token.properties"));
34+
final String bootstrapServers = tokenProps.getProperty("bootstrap.servers");
35+
final String topic = tokenProps.getProperty("topic");
36+
final String namespace = tokenProps.getProperty("namespace");
37+
final String token = tokenProps.getProperty("token");
38+
39+
// 2. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
40+
final Properties props = new Properties();
41+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
42+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
43+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
44+
props.put("security.protocol", "SASL_PLAINTEXT");
45+
props.put("sasl.mechanism", "PLAIN");
46+
props.put("sasl.jaas.config", String.format(
47+
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
48+
namespace, token));
49+
final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
50+
51+
// 3. Produce one message
52+
final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topic, "hello"));
53+
final RecordMetadata recordMetadata = recordMetadataFuture.get();
54+
System.out.println("Send hello to " + recordMetadata);
55+
producer.close();
56+
}
57+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
#
14+
15+
# http://www.slf4j.org/api/org/slf4j/impl/SimpleLogger.html
16+
org.slf4j.simpleLogger.defaultLogLevel=info
17+
org.slf4j.simpleLogger.showDateTime=true
18+
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
#
14+
15+
bootstrap.servers=localhost:9092
16+
topic=persistent://public/default/my-topic
17+
group=my-group
18+
namespace=public/default
19+
token=token:<token>

0 commit comments

Comments
 (0)