Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize and refactor project #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>kafka-consumer-database</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand All @@ -22,9 +23,5 @@
<scope>runtime</scope>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

</project>
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
package net.javaguides.springboot;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javaguides.springboot.entity.WikimediaData;
import net.javaguides.springboot.repository.WikimediaDataRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaDatabaseConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDatabaseConsumer.class);

private WikimediaDataRepository dataRepository;

public KafkaDatabaseConsumer(WikimediaDataRepository dataRepository) {
this.dataRepository = dataRepository;
}
private final WikimediaDataRepository dataRepository;

@KafkaListener(
topics = "${spring.kafka.topic.name}",
groupId = "${spring.kafka.consumer.group-id}"
)
public void consume(String eventMessage){

LOGGER.info(String.format("Event message received -> %s", eventMessage));
public void consume(String eventMessage) {
log.info("Event message received -> {}", eventMessage);

WikimediaData wikimediaData = new WikimediaData();
wikimediaData.setWikiEventData(eventMessage);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
spring.kafka.consumer.boostrap-servers: localhost:9092
spring.kafka.consumer.group-id: myGroup
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.boostrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.datasource.url=jdbc:mysql://localhost:3306/wikimedia
spring.datasource.username=root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>kafka-producer-wikimedia</artifactId>

<dependencies>
<!-- https://mvnrepository.com/artifact/com.launchdarkly/okhttp-eventsource -->
<dependency>
Expand All @@ -25,23 +26,10 @@
<version>4.9.3</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.2.2</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

</project>

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package net.javaguides.springboot;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class SpringBootProducerApplication implements CommandLineRunner {
public class SpringBootProducerApplication {

public static void main(String[] args) {
SpringApplication.run(SpringBootProducerApplication.class);
}

@Autowired
private WikimediaChangesProducer wikimediaChangesProducer;

@Override
public void run(String... args) throws Exception {
wikimediaChangesProducer.sendMessage();
@Bean
ApplicationRunner applicationRunner(WikimediaChangesProducer wikimediaChangesProducer) {
return args -> {
wikimediaChangesProducer.sendMessage();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,15 @@

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;

@Slf4j
@RequiredArgsConstructor
public class WikimediaChangesHandler implements EventHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaChangesHandler.class);

private KafkaTemplate<String, String> kafkaTemplate;
private String topic;

public WikimediaChangesHandler(KafkaTemplate<String, String> kafkaTemplate, String topic) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
private final KafkaTemplate<String, String> kafkaTemplate;
private final String topic;

@Override
public void onOpen() throws Exception {
Expand All @@ -29,14 +23,13 @@ public void onClosed() throws Exception {
}

@Override
public void onMessage(String s, MessageEvent messageEvent) throws Exception {
LOGGER.info(String.format("event data -> %s", messageEvent.getData()));

public void onMessage(String event, MessageEvent messageEvent) throws Exception {
log.info("event data -> {}", messageEvent.getData());
kafkaTemplate.send(topic, messageEvent.getData());
}

@Override
public void onComment(String s) throws Exception {
public void onComment(String event) throws Exception {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.EventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
Expand All @@ -12,27 +12,21 @@
import java.util.concurrent.TimeUnit;

@Service
@Slf4j
@RequiredArgsConstructor
public class WikimediaChangesProducer {

@Value("${spring.kafka.topic.name}")
private String topicName;

private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaChangesProducer.class);

private KafkaTemplate<String, String> kafkaTemplate;

public WikimediaChangesProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Value("${source.stream.url}")
private String sourceStreamUrl;
private final KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage() throws InterruptedException {
// to read real time stream data from wikimedia, we use event source
EventHandler eventHandler = new WikimediaChangesHandler(kafkaTemplate, topicName);
String url = "https://stream.wikimedia.org/v2/stream/recentchange";
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
EventSource eventSource = builder.build();
eventSource.start();
log.info("Sending data from {} stream", sourceStreamUrl);
EventHandler eventHandler = new WikimediaChangesHandler(kafkaTemplate, kafkaTemplate.getDefaultTopic());

TimeUnit.MINUTES.sleep(10);
try(EventSource eventSource = new EventSource.Builder(eventHandler, URI.create(sourceStreamUrl)).build()) {
eventSource.start();
TimeUnit.MINUTES.sleep(10);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.template.default-topic=wikimedia_recentchange
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.topic.name=wikimedia_recentchange
source.stream.url=https://stream.wikimedia.org/v2/stream/recentchange
10 changes: 9 additions & 1 deletion springboot-kafka-real-world-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
<name>springboot-kafka-real-world-project</name>
<description>Demo project for Spring Boot and Kafka</description>
<packaging>pom</packaging>

<properties>
<java.version>11</java.version>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand All @@ -30,6 +34,10 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.