-
Notifications
You must be signed in to change notification settings - Fork 164
/
Copy pathWikimediaChangesProducer.java
32 lines (27 loc) · 1.14 KB
/
WikimediaChangesProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package net.javaguides.springboot;
import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.EventSource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
@RequiredArgsConstructor
public class WikimediaChangesProducer {
@Value("${source.stream.url}")
private String sourceStreamUrl;
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage() throws InterruptedException {
// to read real time stream data from wikimedia, we use event source
log.info("Sending data from {} stream", sourceStreamUrl);
EventHandler eventHandler = new WikimediaChangesHandler(kafkaTemplate, kafkaTemplate.getDefaultTopic());
try(EventSource eventSource = new EventSource.Builder(eventHandler, URI.create(sourceStreamUrl)).build()) {
eventSource.start();
TimeUnit.MINUTES.sleep(10);
}
}
}