Skip to content

Commit

Permalink
Added Project Files
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshit Jain committed May 3, 2020
1 parent dadb3bb commit 6cfe6e2
Show file tree
Hide file tree
Showing 5 changed files with 565 additions and 0 deletions.
4 changes: 4 additions & 0 deletions input/oAuth-tokens.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<INSERT CONSUMER KEY>
<INSERT CONSUMER SECRET>
<INSERT ACCESS TOKEN>
<INSERT ACCESS TOKEN SECRET>
151 changes: 151 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>Real-Time-Twitter-Trend-Sentiment</artifactId>
<version>1.0</version>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.4.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-core -->
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>4.0.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-stream -->
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-async -->
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-async</artifactId>
<version>4.0.4</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>

</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.1</version>

</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.1</version>

</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.1</version>

</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.1</version>

</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>2.4.0</version>
</dependency>

<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.4.1</version>
</dependency>

<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.4.1</version>
<classifier>models</classifier>
</dependency>

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>

</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<!-- See http://davidb.github.com/scala-maven-plugin -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
130 changes: 130 additions & 0 deletions src/main/java/org/streaming/KafkaTwitterProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package org.streaming;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import twitter4j.*;
import twitter4j.conf.ConfigurationBuilder;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;

public class KafkaTwitterProducer {

public static void main(String[] args) throws Exception {

final LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Status>(1000);

if (args.length < 3) {
System.out.println("Usage: KafkaTwitterProducer <token-file-path> <topic-name> <twitter-search-keywords>");
return;
}
List<String> tokens = getAuthTokens(args[0]);
String consumerKey = tokens.get(0);
String consumerSecret = tokens.get(1);
String accessToken = tokens.get(2);
String accessTokenSecret = tokens.get(3);

String topicName = args[1];
String[] searchWords = args[2].split(",");

// Set twitter oAuth tokens in the configuration
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret);

// Create twitter stream using the configuration
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
queue.offer(status);
}

public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
}

public void onTrackLimitationNotice(int i) {
System.out.println("Got track limitation notice:" + i);
}

public void onScrubGeo(long userId, long upToStatusId) {
System.out.println("Got scrub_geo event userId:" + userId + "upToStatusId:" + upToStatusId);
}

public void onStallWarning(StallWarning stallWarning) {
System.out.println("Got stall warning:" + stallWarning);
}

public void onException(Exception e) {
e.printStackTrace();
}
};
twitterStream.addListener(listener);
FilterQuery query = new FilterQuery(searchWords);
query.language("en");
twitterStream.filter(query);

configKafkaProducer(queue, topicName);
}
private static void configKafkaProducer(LinkedBlockingQueue<Status> q, String topic) throws Exception {

Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);

int j = 0;
String msg = null;
while (true) {
Status ret = q.poll();
if (ret == null) {
Thread.sleep(100);
}
else {
String location = "";
if (ret.getUser().getLocation() != null && ret.getHashtagEntities() != null) {
String tweet = ret.getText();
location = ret.getUser().getLocation();
System.out.println("Tweet:" + tweet);
System.out.println("Location: " + location);
msg = location + " /TLOC/ " + tweet;
}
producer.send(new ProducerRecord<String, String>(topic, Integer.toString(j++), msg));
}
Thread.sleep(100);
}
}

private static List<String> getAuthTokens(String input) {
final List<String> tokens = new ArrayList<String>();
try {
File f = new File(input);
BufferedReader rdr = new BufferedReader(new FileReader(f));
String readLine;
while ((readLine = rdr.readLine()) != null) {
tokens.add(readLine);
}
}
catch (IOException e) {
e.printStackTrace();
}
return tokens;
}
}
16 changes: 16 additions & 0 deletions src/main/resources/log4j.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration debug="true" xmlns:log4j='http://jakarta.apache.org/log4j/'>

<appender name="Console" class="org.apache.log4j.ConsoleAppender">
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{yyy-MM-dd HH:mm:ss} [%t] %-5p %c{1} - %m%n"/>
</layout>
</appender>

<root>
<level value="INFO" />
<appender-ref ref="Console" />
</root>

</log4j:configuration>
Loading

0 comments on commit 6cfe6e2

Please sign in to comment.