diff --git a/springboot-kafka-real-world-project/kafka-consumer-database/pom.xml b/springboot-kafka-real-world-project/kafka-consumer-database/pom.xml index 774c6bb..6a0de83 100644 --- a/springboot-kafka-real-world-project/kafka-consumer-database/pom.xml +++ b/springboot-kafka-real-world-project/kafka-consumer-database/pom.xml @@ -10,6 +10,7 @@ 4.0.0 jar kafka-consumer-database + org.springframework.boot @@ -22,9 +23,5 @@ runtime - - 17 - 17 - \ No newline at end of file diff --git a/springboot-kafka-real-world-project/kafka-consumer-database/src/main/java/net/javaguides/springboot/KafkaDatabaseConsumer.java b/springboot-kafka-real-world-project/kafka-consumer-database/src/main/java/net/javaguides/springboot/KafkaDatabaseConsumer.java index 51afd8b..8be08b9 100644 --- a/springboot-kafka-real-world-project/kafka-consumer-database/src/main/java/net/javaguides/springboot/KafkaDatabaseConsumer.java +++ b/springboot-kafka-real-world-project/kafka-consumer-database/src/main/java/net/javaguides/springboot/KafkaDatabaseConsumer.java @@ -1,30 +1,24 @@ package net.javaguides.springboot; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import net.javaguides.springboot.entity.WikimediaData; import net.javaguides.springboot.repository.WikimediaDataRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service +@Slf4j +@RequiredArgsConstructor public class KafkaDatabaseConsumer { - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDatabaseConsumer.class); - - private WikimediaDataRepository dataRepository; - - public KafkaDatabaseConsumer(WikimediaDataRepository dataRepository) { - this.dataRepository = dataRepository; - } + private final WikimediaDataRepository dataRepository; @KafkaListener( topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}" ) - public void consume(String eventMessage){ - - LOGGER.info(String.format("Event message received -> %s", eventMessage)); + public void consume(String eventMessage) { + log.info("Event message received -> {}", eventMessage); WikimediaData wikimediaData = new WikimediaData(); wikimediaData.setWikiEventData(eventMessage); diff --git a/springboot-kafka-real-world-project/kafka-consumer-database/src/main/resources/application.properties b/springboot-kafka-real-world-project/kafka-consumer-database/src/main/resources/application.properties index fc2bb97..2103dc8 100644 --- a/springboot-kafka-real-world-project/kafka-consumer-database/src/main/resources/application.properties +++ b/springboot-kafka-real-world-project/kafka-consumer-database/src/main/resources/application.properties @@ -1,8 +1,8 @@ -spring.kafka.consumer.boostrap-servers: localhost:9092 -spring.kafka.consumer.group-id: myGroup -spring.kafka.consumer.auto-offset-reset: earliest -spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer -spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.boostrap-servers=localhost:9092 +spring.kafka.consumer.group-id=myGroup +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.datasource.url=jdbc:mysql://localhost:3306/wikimedia spring.datasource.username=root diff --git a/springboot-kafka-real-world-project/kafka-producer-wikimedia/pom.xml b/springboot-kafka-real-world-project/kafka-producer-wikimedia/pom.xml index a93f386..d62fa1d 100644 --- a/springboot-kafka-real-world-project/kafka-producer-wikimedia/pom.xml +++ b/springboot-kafka-real-world-project/kafka-producer-wikimedia/pom.xml @@ -10,6 +10,7 @@ 4.0.0 jar kafka-producer-wikimedia + @@ -25,23 +26,10 @@ 4.9.3 - - - com.fasterxml.jackson.core - jackson-core - 2.13.2 - - com.fasterxml.jackson.core jackson-databind - 2.13.2.2 - - 17 - 17 - - \ No newline at end of file diff --git a/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/KafkaTopicConfig.java b/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/KafkaTopicConfig.java deleted file mode 100644 index 5361824..0000000 --- a/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/KafkaTopicConfig.java +++ /dev/null @@ -1,20 +0,0 @@ -package net.javaguides.springboot; - -import org.apache.kafka.clients.admin.NewTopic; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.TopicBuilder; - -@Configuration -public class KafkaTopicConfig { - - @Value("${spring.kafka.topic.name}") - private String topicName; - - @Bean - public NewTopic topic(){ - return TopicBuilder.name(topicName) - .build(); - } -} diff --git a/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/SpringBootProducerApplication.java b/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/SpringBootProducerApplication.java index ee02380..73aed01 100644 --- a/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/SpringBootProducerApplication.java +++ b/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/SpringBootProducerApplication.java @@ -1,22 +1,21 @@ package net.javaguides.springboot; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; @SpringBootApplication -public class SpringBootProducerApplication implements CommandLineRunner { +public class SpringBootProducerApplication { public static void main(String[] args) { SpringApplication.run(SpringBootProducerApplication.class); } - @Autowired - private WikimediaChangesProducer wikimediaChangesProducer; - - @Override - public void run(String... args) throws Exception { - wikimediaChangesProducer.sendMessage(); + @Bean + ApplicationRunner applicationRunner(WikimediaChangesProducer wikimediaChangesProducer) { + return args -> { + wikimediaChangesProducer.sendMessage(); + }; } } diff --git a/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/WikimediaChangesHandler.java b/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/WikimediaChangesHandler.java index 14d11b9..39151d9 100644 --- a/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/WikimediaChangesHandler.java +++ b/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/WikimediaChangesHandler.java @@ -2,21 +2,15 @@ import com.launchdarkly.eventsource.EventHandler; import com.launchdarkly.eventsource.MessageEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.core.KafkaTemplate; +@Slf4j +@RequiredArgsConstructor public class WikimediaChangesHandler implements EventHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaChangesHandler.class); - - private KafkaTemplate kafkaTemplate; - private String topic; - - public WikimediaChangesHandler(KafkaTemplate kafkaTemplate, String topic) { - this.kafkaTemplate = kafkaTemplate; - this.topic = topic; - } + private final KafkaTemplate kafkaTemplate; + private final String topic; @Override public void onOpen() throws Exception { @@ -29,14 +23,13 @@ public void onClosed() throws Exception { } @Override - public void onMessage(String s, MessageEvent messageEvent) throws Exception { - LOGGER.info(String.format("event data -> %s", messageEvent.getData())); - + public void onMessage(String event, MessageEvent messageEvent) throws Exception { + log.info("event data -> {}", messageEvent.getData()); kafkaTemplate.send(topic, messageEvent.getData()); } @Override - public void onComment(String s) throws Exception { + public void onComment(String event) throws Exception { } diff --git a/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/WikimediaChangesProducer.java b/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/WikimediaChangesProducer.java index 5f8e45e..4e7ff87 100644 --- a/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/WikimediaChangesProducer.java +++ b/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/java/net/javaguides/springboot/WikimediaChangesProducer.java @@ -2,8 +2,8 @@ import com.launchdarkly.eventsource.EventHandler; import com.launchdarkly.eventsource.EventSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @@ -12,27 +12,21 @@ import java.util.concurrent.TimeUnit; @Service +@Slf4j +@RequiredArgsConstructor public class WikimediaChangesProducer { - - @Value("${spring.kafka.topic.name}") - private String topicName; - - private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaChangesProducer.class); - - private KafkaTemplate kafkaTemplate; - - public WikimediaChangesProducer(KafkaTemplate kafkaTemplate) { - this.kafkaTemplate = kafkaTemplate; - } + @Value("${source.stream.url}") + private String sourceStreamUrl; + private final KafkaTemplate kafkaTemplate; public void sendMessage() throws InterruptedException { // to read real time stream data from wikimedia, we use event source - EventHandler eventHandler = new WikimediaChangesHandler(kafkaTemplate, topicName); - String url = "https://stream.wikimedia.org/v2/stream/recentchange"; - EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url)); - EventSource eventSource = builder.build(); - eventSource.start(); + log.info("Sending data from {} stream", sourceStreamUrl); + EventHandler eventHandler = new WikimediaChangesHandler(kafkaTemplate, kafkaTemplate.getDefaultTopic()); - TimeUnit.MINUTES.sleep(10); + try(EventSource eventSource = new EventSource.Builder(eventHandler, URI.create(sourceStreamUrl)).build()) { + eventSource.start(); + TimeUnit.MINUTES.sleep(10); + } } } diff --git a/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/resources/application.properties b/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/resources/application.properties index 1c03856..edae92f 100644 --- a/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/resources/application.properties +++ b/springboot-kafka-real-world-project/kafka-producer-wikimedia/src/main/resources/application.properties @@ -1,5 +1,6 @@ -spring.kafka.producer.bootstrap-servers: localhost:9092 -spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer -spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.bootstrap-servers=localhost:9092 +spring.kafka.template.default-topic=wikimedia_recentchange +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer -spring.kafka.topic.name=wikimedia_recentchange \ No newline at end of file +source.stream.url=https://stream.wikimedia.org/v2/stream/recentchange \ No newline at end of file diff --git a/springboot-kafka-real-world-project/pom.xml b/springboot-kafka-real-world-project/pom.xml index a20e2b3..78c5bdd 100644 --- a/springboot-kafka-real-world-project/pom.xml +++ b/springboot-kafka-real-world-project/pom.xml @@ -18,9 +18,13 @@ springboot-kafka-real-world-project Demo project for Spring Boot and Kafka pom + - 11 + 17 + 17 + 17 + org.springframework.boot @@ -30,6 +34,10 @@ org.springframework.kafka spring-kafka + + com.fasterxml.jackson.core + jackson-databind + org.projectlombok diff --git a/springboot-kafka-real-world-project/src/main/java/net/javaguides/springboot/SpringbootKafkaRealWorldProjectApplication.java b/springboot-kafka-real-world-project/src/main/java/net/javaguides/springboot/SpringbootKafkaRealWorldProjectApplication.java deleted file mode 100644 index 83a0c45..0000000 --- a/springboot-kafka-real-world-project/src/main/java/net/javaguides/springboot/SpringbootKafkaRealWorldProjectApplication.java +++ /dev/null @@ -1,13 +0,0 @@ -package net.javaguides.springboot; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - -@SpringBootApplication -public class SpringbootKafkaRealWorldProjectApplication { - - public static void main(String[] args) { - SpringApplication.run(SpringbootKafkaRealWorldProjectApplication.class, args); - } - -} diff --git a/springboot-kafka-real-world-project/src/main/resources/application.properties b/springboot-kafka-real-world-project/src/main/resources/application.properties deleted file mode 100644 index 8b13789..0000000 --- a/springboot-kafka-real-world-project/src/main/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ - diff --git a/springboot-kafka-real-world-project/src/test/java/net/javaguides/springboot/SpringbootKafkaRealWorldProjectApplicationTests.java b/springboot-kafka-real-world-project/src/test/java/net/javaguides/springboot/SpringbootKafkaRealWorldProjectApplicationTests.java deleted file mode 100644 index 6a5a839..0000000 --- a/springboot-kafka-real-world-project/src/test/java/net/javaguides/springboot/SpringbootKafkaRealWorldProjectApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package net.javaguides.springboot; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class SpringbootKafkaRealWorldProjectApplicationTests { - - @Test - void contextLoads() { - } - -}