Skip to content

Commit 50c4123

Browse files
committed
未知提交
1 parent 524fbb1 commit 50c4123

File tree

5 files changed

+88
-84
lines changed

5 files changed

+88
-84
lines changed

kafka/kafka-demo/pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@
4646
<scope>compile</scope>
4747
</dependency>
4848

49-
<dependency>
50-
<groupId>org.springframework.kafka</groupId>
51-
<artifactId>spring-kafka</artifactId>
52-
<version>2.6.4</version>
53-
</dependency>
49+
<!-- <dependency>-->
50+
<!-- <groupId>org.springframework.kafka</groupId>-->
51+
<!-- <artifactId>spring-kafka</artifactId>-->
52+
<!-- <version>2.6.4</version>-->
53+
<!-- </dependency>-->
5454
</dependencies>
5555

5656
<build>

kafka/kafka-demo/src/main/java/cn/jaminye/kafkademo/kafka/MsgConsumer.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import java.time.Duration;
1111
import java.util.Arrays;
12+
import java.util.Collections;
1213
import java.util.Properties;
1314

1415
/**
@@ -22,34 +23,34 @@ public class MsgConsumer {
2223

2324
public static void main(String[] args) {
2425
Properties properties = new Properties();
25-
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.4:9092");
26+
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.22:9092,10.211.55.22:9093,10.211.55.22:9094");
2627
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
27-
//自动提交offset时间间隔
28+
/* //自动提交offset时间间隔
2829
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
2930
//关闭自动提交
3031
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
31-
/*消费者会一直向协调者broker发送心跳数据,当协调者发现消费者不发送心跳了,并且时间超过了设置的时间,就认为该消费者挂了,通知同组下
32-
其他消费者进行rebalance操作*/
32+
*//*消费者会一直向协调者broker发送心跳数据,当协调者发现消费者不发送心跳了,并且时间超过了设置的时间,就认为该消费者挂了,通知同组下
33+
其他消费者进行rebalance操作*//*
3334
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
3435
//broker多久感知不到消费者认为其挂了
3536
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000 * 10);
3637
//两次poll时间都超过此时间,会认为其挂了
37-
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
38+
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);*/
3839
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
3940
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
4041
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);
41-
String topicName = "order-topic";
42+
String topicName = "test-topic";
4243
//消费主题
4344
// consumer.subscribe(Arrays.asList(topicName));
4445
//指定分区消费
45-
consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));
46+
consumer.assign(Collections.singletonList(new TopicPartition(topicName, 0)));
4647
//从头开始
47-
consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));
48-
//指定偏移量消费
48+
// consumer.seekToBeginning(Collections.singletonList(new TopicPartition(topicName, 0)));
49+
//指定偏移量消费 指定后不管当前offset从这开始消费
4950
consumer.seek(new TopicPartition(topicName, 0), 10);
5051
while (true) {
5152
//长轮询
52-
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(Integer.MAX_VALUE));
53+
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(1));
5354
consumerRecords.forEach(objectObjectConsumerRecord -> {
5455
System.out.println(objectObjectConsumerRecord.offset() + "======================"
5556
+ objectObjectConsumerRecord.key() + "---------------" + objectObjectConsumerRecord.value());

kafka/kafka-demo/src/main/java/cn/jaminye/kafkademo/kafka/MsgProducerTest.java

Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.junit.Test;
77

88
import java.util.Properties;
9+
import java.util.concurrent.CountDownLatch;
910
import java.util.concurrent.ExecutionException;
1011

1112
/**
@@ -14,64 +15,66 @@
1415
* 生产者
1516
*/
1617
public class MsgProducerTest {
17-
KafkaProducer<String, String> producer;
18-
19-
@Before
20-
public void init() {
21-
Properties properties = new Properties();
22-
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.4:9092");
23-
//0 不需要回复,1等待leader写入log 不需要follower -1 等待集群全部写入log
24-
properties.put(ProducerConfig.ACKS_CONFIG, "1");
25-
//失败重试时间
26-
properties.put(ProducerConfig.RETRIES_CONFIG, "3");
27-
//本地缓存区时间
28-
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
29-
//批量发送大小,批量满16k就发送
30-
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
31-
//100毫秒批量未填充完成也发送
32-
properties.put(ProducerConfig.LINGER_MS_CONFIG, "100");
33-
//发送key从字符串转字节数组
34-
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
35-
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
36-
producer = new KafkaProducer<>(properties);
37-
}
38-
39-
@Test
40-
public void sendMsg() {
41-
for (int i = 0; i < 10; i++) {
42-
producer.send(new ProducerRecord<String, String>("test-topic", "test-key" + i, "key-value" + i));
43-
}
44-
producer.close();
45-
}
18+
KafkaProducer<String, String> producer;
4619

20+
@Before
21+
public void init() {
22+
Properties properties = new Properties();
23+
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.22:9092,10.211.55.22:9093,10.211.55.22:9094");
24+
/**
25+
* 0 不需要回复
26+
* 1等待leader写入log不需要follower
27+
* -1 等待集群全部写入log
28+
*/
29+
properties.put(ProducerConfig.ACKS_CONFIG, "1");
30+
// 失败重试时间
31+
properties.put(ProducerConfig.RETRIES_CONFIG, "3");
32+
// 本地缓存区时间
33+
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
34+
// 批量发送大小,批量满16k就发送
35+
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
36+
//100毫秒批量未填充完成也发送
37+
properties.put(ProducerConfig.LINGER_MS_CONFIG, "10");
38+
//发送key从字符串转字节数组
39+
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
40+
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
41+
producer = new KafkaProducer<>(properties);
42+
}
4743

48-
@Test
49-
public void sendMsg1() throws ExecutionException, InterruptedException {
50-
for (int i = 0; i < 100; i++) {
51-
ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("order-topic", 0, "test" + i, "test" + i);
52-
RecordMetadata recordMetadata = producer.send(stringStringProducerRecord).get();
53-
System.out.println("发送成功:" + "topic:" + recordMetadata.topic() + "partition:" + recordMetadata.partition() + "offset:" + recordMetadata.offset());
54-
}
44+
@Test
45+
public void sendMsg() {
46+
for (int i = 0; i < 10; i++) {
47+
producer.send(new ProducerRecord<String, String>("test-topic", "test-key" + i, "key-value" + i));
48+
}
49+
producer.close();
50+
}
5551

56-
}
5752

53+
@Test
54+
public void sendMsg1() throws ExecutionException, InterruptedException {
55+
for (int i = 0; i < 100; i++) {
56+
ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("order-topic", 0, "test" + i, "test" + i);
57+
RecordMetadata recordMetadata = producer.send(stringStringProducerRecord).get();
58+
System.out.println("发送成功:" + "topic:" + recordMetadata.topic() + "partition:" + recordMetadata.partition() + "offset:" + recordMetadata.offset());
59+
}
5860

61+
}
5962

6063

61-
@Test
62-
public void sendMsg2() throws InterruptedException {
64+
@Test
65+
public void sendMsg2() throws InterruptedException {
66+
final CountDownLatch countDownLatch = new CountDownLatch(1);
6367
ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("yibu-topic", 0, "test", "test");
64-
producer.send(stringStringProducerRecord, new Callback() {
65-
@Override
66-
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
67-
if (e!=null){
68+
producer.send(stringStringProducerRecord, (recordMetadata, e) -> {
69+
if (e != null) {
6870
System.err.println("发送信息失败");
6971
}
70-
if (recordMetadata!=null){
71-
System.out.println("topic:"+recordMetadata.topic()+"partition"+recordMetadata.partition()+"offset:"+recordMetadata.offset());
72-
}
72+
if (recordMetadata != null) {
73+
System.out.println("topic:" + recordMetadata.topic() + "partition" + recordMetadata.partition() + "offset:" + recordMetadata.offset());
7374
}
75+
countDownLatch.countDown();
7476
});
75-
Thread.sleep(Integer.MAX_VALUE);
77+
countDownLatch.await();
78+
producer.close();
7679
}
7780
}
Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
1-
package cn.jaminye.kafkademo.springbootkafka;
2-
3-
import org.apache.kafka.clients.consumer.ConsumerRecord;
4-
import org.springframework.kafka.annotation.KafkaListener;
5-
import org.springframework.stereotype.Component;
6-
7-
/**
8-
* @author jaminye
9-
* @date 2020/12/13 下午8:50
10-
*/
11-
@Component
12-
public class MyConsumer {
13-
14-
@KafkaListener(topics = "mytopic", groupId = "testGroup")
15-
public void listen(ConsumerRecord<String, String> record) {
16-
String value = record.value();
17-
String key = record.key();
18-
System.out.println("key=======" + key + "value========" + value);
19-
}
20-
21-
}
1+
// package cn.jaminye.kafkademo.springbootkafka;
2+
//
3+
// import org.apache.kafka.clients.consumer.ConsumerRecord;
4+
// import org.springframework.kafka.annotation.KafkaListener;
5+
// import org.springframework.stereotype.Component;
6+
//
7+
// /**
8+
// * @author jaminye
9+
// * @date 2020/12/13 下午8:50
10+
// */
11+
// @Component
12+
// public class MyConsumer {
13+
//
14+
// @KafkaListener(topics = "mytopic", groupId = "testGroup")
15+
// public void listen(ConsumerRecord<String, String> record) {
16+
// String value = record.value();
17+
// String key = record.key();
18+
// System.out.println("key=======" + key + "value========" + value);
19+
// }
20+
//
21+
// }

kafka/kafka-demo/src/main/resources/application.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
spring:
22
kafka:
3-
bootstrap-servers: 10.211.55.4:9092
3+
bootstrap-servers: 10.211.55.22:9092,10.211.55.22:9093,10.211.55.22:9094
44
producer:
55
retries: 3
66
batch-size: 16384

0 commit comments

Comments
 (0)