diff --git a/README.md b/README.md index 8f478799..2af0b54f 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ The related documentation and instructions are [here](hadoop-connector-examples) | [`turbineheatprocessor`](scenarios/turbine-heat-processor) | A Flink streaming application for processing temperature data from a Pravega stream produced by the `turbineheatsensor` app. The application computes a daily summary of the temperature range observed on that day by each sensor. | [Java](scenarios/turbine-heat-processor/src/main/java/io/pravega/turbineheatprocessor), [Scala](scenarios/turbine-heat-processor/src/main/scala/io/pravega/turbineheatprocessor) | [`anomaly-detection`](scenarios/anomaly-detection) | A Flink streaming application for detecting anomalous input patterns using a finite-state machine. | [Java](scenarios/anomaly-detection/src/main/java/io/pravega/anomalydetection) | [`pravega-flink-connector-sql-samples`](scenarios/pravega-flink-connector-sql-samples) | Flink connector table api/sql samples. | [Java](scenarios/pravega-flink-connector-sql-samples/src/main/java/io/pravega/connectors.nytaxi) +| [`mqtt-pravega-bridge`](scenarios/mqtt-pravega-bridge) | A sample application reads events from MQTT and writes them to a Pravega stream. | [Java](scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega) # Build Instructions diff --git a/scenarios/mqtt-pravega-bridge/README.md b/scenarios/mqtt-pravega-bridge/README.md new file mode 100644 index 00000000..feb83921 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/README.md @@ -0,0 +1,86 @@ +# MQTT to Pravega Bridge + +This sample application reads events from MQTT and writes them to a Pravega stream. + +## Components + +- Pravega: Pravega provides a new storage abstraction - a stream - for continuous and unbounded data. + A Pravega stream is a durable, elastic, append-only, unbounded sequence of bytes that has good performance and strong consistency. + + Pravega provides dynamic scaling that can increase and decrease parallelism to automatically respond + to changes in the event rate. + + See for more information. + +- Pravega Video Demo: This is a simple command-line application that demonstrates how to write video files to Pravega. + It also demonstrates how to read the video files from Pravega and decode them. + +- Docker: This demo uses Docker and Docker Compose to greatly simplify the deployment of the various + components on Linux and/or Windows servers, desktops, or even laptops. + For more information, see . + +## Building and Running the Demo + +### Install Java 8 + +``` +apt-get install openjdk-8-jdk +``` + +### Install IntelliJ + +Install from . +Enable the Lombok plugin. +Enable Annotations (settings -> build, execution, deployment, -> compiler -> annotation processors). + +### Install Docker and Docker Compose + +See +and . + +### Run Pravega + +This will run a development instance of Pravega locally. +In the command below, replace x.x.x.x with the IP address of a local network interface such as eth0. + +``` +cd +git clone https://github.com/pravega/pravega +cd pravega +export HOST_IP=x.x.x.x +docker-compose up -d +``` + +You can view the Pravega logs with `docker-compose logs --follow`. +You can view the stream files stored on HDFS with `docker-compose exec hdfs hdfs dfs -ls -h -R /`. + +### Usage + +- Install Mosquitto MQTT broker and clients. + ``` + sudo apt-get install mosquitto mosquitto-clients + ``` + +- If not automatically started, start Mosquitto broker. + ``` + mosquitto + ``` + +- Edit the file scenarios/mqtt-pravega-bridge/src/main/dist/conf + to specify your Pravega controller URI (controllerUri) as + `tcp://HOST_IP:9090`. + +- In IntelliJ, run the class com.dell.mqtt.pravega.ApplicationMain with the following parameters: + ``` + scenarios/mqtt-pravega-bridge/src/main/dist/conf + ``` + +- Publish a sample MQTT message. + ``` + mosquitto_pub -t center/0001 -m "12,34,56.78" + ``` + +- You should see the following application output: + ``` + [MQTT Call: CanDataReader] com.dell.mqtt.pravega.MqttListener: Writing Data Packet: CarID: 0001 Timestamp: 1551671403118 Payload: [B@2813d92f annotation: null + ``` diff --git a/scenarios/mqtt-pravega-bridge/build.gradle b/scenarios/mqtt-pravega-bridge/build.gradle new file mode 100644 index 00000000..8bf3c637 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/build.gradle @@ -0,0 +1,58 @@ +plugins { + id 'com.github.johnrengelman.shadow' version '1.2.4' +} + +group 'com.dell.mqtt' +version = samplesVersion + +task wrapper(type: Wrapper) { + gradleVersion = '3.1' + distributionUrl = "https://services.gradle.org/distributions/gradle-$gradleVersion-all.zip" +} + +apply plugin: 'java' +apply plugin: "distribution" +apply plugin: 'application' + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 +mainClassName = 'com.dell.mqtt.pravega.ApplicationMain' +applicationDefaultJvmArgs = ["-Dlog4j.configuration=file:conf/log4j.properties"] +archivesBaseName = 'pravega-mqtt-bridge' + +repositories { + mavenCentral() + maven { + url "https://repository.apache.org/snapshots" + } + maven { + url "https://oss.sonatype.org/content/repositories/snapshots" + } +} + +dependencies { + testCompile group: 'junit', name: 'junit', version: '4.11' + compile "org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0" + compile "org.slf4j:slf4j-api:1.6.6" + compile "ch.qos.logback:logback-classic:1.0.9" + compile "com.google.guava:guava:20.0" + compile "io.pravega:pravega-client:${pravegaVersion}" +} + +shadowJar { + dependencies { + include dependency("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0") + } +} + +distributions { + main { + baseName = archivesBaseName + contents { + into('lib') { + from shadowJar + from(project.configurations.shadow) + } + } + } +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/dist/conf/bridge.properties b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/bridge.properties new file mode 100644 index 00000000..107162e2 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/bridge.properties @@ -0,0 +1,14 @@ +# Pravega Properties +controllerUri=tcp://127.0.0.1:9090 +scope=examples +stream=mqtt-example +scaling.targetRate=100 +scaling.scaleFactor=3 +scaling.minNumSegments=3 + +# MQTT Properties +brokerUri=tcp://127.0.0.1:1883 +topic=center/# +clientId=CanDataReader +userName=admin +password=password diff --git a/scenarios/mqtt-pravega-bridge/src/main/dist/conf/log4j.properties b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/log4j.properties new file mode 100644 index 00000000..2614ad13 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/log4j.properties @@ -0,0 +1,13 @@ +log4j.rootLogger=INFO, stdout, file + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.File=workspace.log +log4j.appender.file.MaxFileSize=10MB +log4j.appender.file.MaxBackupIndex=10 +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n \ No newline at end of file diff --git a/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logback.xml b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logback.xml new file mode 100644 index 00000000..34f9c7c0 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logback.xml @@ -0,0 +1,24 @@ + + + + + System.out + + %-5level [%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %logger{36}: %msg%n + + + + + + + diff --git a/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logger.xml b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logger.xml new file mode 100644 index 00000000..896de023 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logger.xml @@ -0,0 +1,32 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + logFile.log + + + workspace.%d{yyyy-MM-dd}.log + + + 5 + + + + %d{HH:mm:ss.SSS} %-4relative [%thread] %-5level %logger{35} - %msg%n + + + + + + + + + + \ No newline at end of file diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/ApplicationArguments.java b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/ApplicationArguments.java new file mode 100644 index 00000000..e62fc782 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/ApplicationArguments.java @@ -0,0 +1,76 @@ +package com.dell.mqtt.pravega; + +import com.google.common.base.Preconditions; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; + +public class ApplicationArguments { + + private final PravegaArgs pravegaArgs = new PravegaArgs(); + private final MqttArgs mqttArgs = new MqttArgs(); + + public ApplicationArguments(String confDir) throws Exception { + loadProperties(confDir); + } + + private void loadProperties(String confDir) throws Exception{ + Properties prop = new Properties(); + try ( + InputStream inputStream = new FileInputStream(confDir + File.separator + "bridge.properties"); + ) + { + prop.load(inputStream); + + pravegaArgs.controllerUri = prop.getProperty("controllerUri"); + pravegaArgs.scope = prop.getProperty("scope"); + pravegaArgs.stream = prop.getProperty("stream"); + pravegaArgs.targetRate = Integer.parseInt(prop.getProperty("scaling.targetRate")); + pravegaArgs.scaleFactor = Integer.parseInt(prop.getProperty("scaling.scaleFactor")); + pravegaArgs.minNumSegments = Integer.parseInt(prop.getProperty("scaling.minNumSegments")); + + mqttArgs.brokerUri = prop.getProperty("brokerUri"); + mqttArgs.topic = prop.getProperty("topic"); + mqttArgs.clientId = prop.getProperty("clientId"); + mqttArgs.userName = prop.getProperty("userName"); + mqttArgs.password = prop.getProperty("password"); + + Preconditions.checkNotNull(pravegaArgs.controllerUri, "Pravega Controller URI is missing"); + Preconditions.checkNotNull(pravegaArgs.scope, "Pravega scope is missing"); + Preconditions.checkNotNull(pravegaArgs.stream, "Pravega stream is missing"); + + Preconditions.checkNotNull(mqttArgs.brokerUri, "MQTT Broker URI is missing"); + Preconditions.checkNotNull(mqttArgs.topic, "MQTT topic is missing"); + Preconditions.checkNotNull(mqttArgs.clientId, "MQTT clientId is missing"); + Preconditions.checkNotNull(mqttArgs.userName, "MQTT userName is missing"); + Preconditions.checkNotNull(mqttArgs.password, "MQTT password is missing"); + } + } + + public PravegaArgs getPravegaArgs() { + return pravegaArgs; + } + + public MqttArgs getMqttArgs() { + return mqttArgs; + } + + public static class PravegaArgs { + protected String controllerUri; + protected String scope; + protected String stream; + protected int targetRate; + protected int scaleFactor; + protected int minNumSegments; + } + + public static class MqttArgs { + protected String brokerUri; + protected String topic; + protected String clientId; + protected String userName; + protected String password; + } +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/ApplicationMain.java b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/ApplicationMain.java new file mode 100644 index 00000000..35935d08 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/ApplicationMain.java @@ -0,0 +1,56 @@ +package com.dell.mqtt.pravega; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; + +public class ApplicationMain { + + private static Logger log = LoggerFactory.getLogger( ApplicationMain.class ); + + public static void main(String ... args) { + + if (args.length != 1) { + log.error("Missing required arguments. Usage: java com.dell.mqtt.pravega.ApplicationMain "); + return; + } + + String confDir = args[0]; + log.info("loading configurations from {}", confDir); + + final CountDownLatch latch = new CountDownLatch(1); + + try { + ApplicationArguments applicationArguments = new ApplicationArguments(confDir); + MqttListener listener = new MqttListener(applicationArguments.getPravegaArgs()); + + MqttConnectionBuilder builder = new MqttConnectionBuilder(); + builder.brokerUri(applicationArguments.getMqttArgs().brokerUri); + builder.topic(applicationArguments.getMqttArgs().topic); + builder.clientId(applicationArguments.getMqttArgs().clientId); + builder.userName(applicationArguments.getMqttArgs().userName); + builder.password(applicationArguments.getMqttArgs().password); + builder.bridge(listener); + + MqttClient mqttClient = builder.connect(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Going to close the application"); + if (mqttClient != null) { + try { + mqttClient.close(); + } catch (MqttException e) { + log.error("Exception Occurred while closing MQTT client", e); + } + } + latch.countDown(); + })); + } catch (Exception e) { + log.error("Exception Occurred", e); + } + + } +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/DataPacket.java b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/DataPacket.java new file mode 100644 index 00000000..b798918a --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/DataPacket.java @@ -0,0 +1,59 @@ +package com.dell.mqtt.pravega; + +import java.io.Serializable; + +/** + * Wrapper class that holds raw data and its corresponding annotation info + */ +public class DataPacket implements Serializable { + + private String carId; + + private long timestamp; + + private byte[] payload; + + private byte[] annotation; + + public String getCarId() { + return carId; + } + + public void setCarId(String carId) { + this.carId = carId; + } + + public byte[] getPayload() { + return payload; + } + + public void setPayload(byte[] payload) { + this.payload = payload; + } + + public byte[] getAnnotation() { + return annotation; + } + + public void setAnnotation(byte[] annotation) { + this.annotation = annotation; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public String toString() { + + StringBuffer stringBuffer = new StringBuffer(); + stringBuffer.append("CarID: ").append(carId).append(" "); + stringBuffer.append("Timestamp: ").append(timestamp).append(" "); + stringBuffer.append("Payload: ").append(payload).append(" "); + stringBuffer.append("annotation: ").append(annotation).append(" "); + return stringBuffer.toString(); + } +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/MqttConnectionBuilder.java b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/MqttConnectionBuilder.java new file mode 100644 index 00000000..31c02e18 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/MqttConnectionBuilder.java @@ -0,0 +1,82 @@ +package com.dell.mqtt.pravega; + +import com.google.common.base.Preconditions; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; + +public class MqttConnectionBuilder { + + private String brokerUri; + private String topic; + private String clientId; + private String userName; + private String password; + private MqttCallback mqttCallback; + private MqttClientPersistence persistence; + + public MqttConnectionBuilder brokerUri(String brokerUri) { + this.brokerUri = brokerUri; + return this; + } + + public MqttConnectionBuilder topic(String topic) { + this.topic = topic; + return this; + } + + public MqttConnectionBuilder clientId(String clientId) { + this.clientId = clientId; + return this; + } + + public MqttConnectionBuilder userName(String userName) { + this.userName = userName; + return this; + } + + public MqttConnectionBuilder password(String password) { + this.password = password; + return this; + } + + public MqttConnectionBuilder bridge(MqttCallback mqttCallback) { + this.mqttCallback = mqttCallback; + return this; + } + + public MqttConnectionBuilder persistence(MqttClientPersistence persistence) { + this.persistence = persistence; + return this; + } + + public MqttClient connect() throws MqttException { + + Preconditions.checkNotNull(brokerUri, "Missing MQTT broker information"); + Preconditions.checkNotNull(topic, "Missing MQTT topic information"); + Preconditions.checkNotNull(clientId, "Missing MQTT clientId"); + Preconditions.checkNotNull(userName, "Missing MQTT userName"); + Preconditions.checkNotNull(password, "Missing MQTT password"); + Preconditions.checkNotNull(mqttCallback, "Missing MQTT callback handler"); + + + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(userName); + mqttConnectOptions.setPassword(password.toCharArray()); + + MqttClient mqttClient; + if (persistence != null) { + mqttClient = new MqttClient(brokerUri, clientId, persistence); + } else { + mqttClient = new MqttClient(brokerUri, clientId); + } + + mqttClient.setCallback(mqttCallback); + mqttClient.connect(mqttConnectOptions); + mqttClient.subscribe(topic); + + return mqttClient; + } +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/MqttListener.java b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/MqttListener.java new file mode 100644 index 00000000..62f34b20 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/MqttListener.java @@ -0,0 +1,48 @@ +package com.dell.mqtt.pravega; + +import io.pravega.client.stream.EventStreamWriter; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MQTT Listener call back handler that listens to specified MQTT topic and fetches the posted data + */ + +public class MqttListener implements MqttCallback { + + private static Logger log = LoggerFactory.getLogger( MqttListener.class ); + + private final EventStreamWriter writer; + + public MqttListener(ApplicationArguments.PravegaArgs pravegaArgs) { + writer = PravegaHelper.getStreamWriter(pravegaArgs); + } + + @Override + public void connectionLost(Throwable cause) { + log.debug("Received connection lost message. Reason: {}", cause); + writer.close(); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + log.debug("Received new message from the topic: {}", topic); + + String carId = topic.split("/")[1]; + DataPacket packet = new DataPacket(); + packet.setTimestamp(System.currentTimeMillis()); + packet.setCarId(carId); + packet.setPayload(message.getPayload()); + + log.info("Writing Data Packet: {}", packet); + + writer.writeEvent(carId, packet); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) {} + +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/PravegaHelper.java b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/PravegaHelper.java new file mode 100644 index 00000000..d34d41cf --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega/PravegaHelper.java @@ -0,0 +1,48 @@ +package com.dell.mqtt.pravega; + +import io.pravega.client.ClientFactory; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.EventStreamWriter; +import io.pravega.client.stream.EventWriterConfig; +import io.pravega.client.stream.ScalingPolicy; +import io.pravega.client.stream.StreamConfiguration; +import io.pravega.client.stream.impl.JavaSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; + +public class PravegaHelper { + + private static Logger log = LoggerFactory.getLogger( PravegaHelper.class ); + + public static EventStreamWriter getStreamWriter(ApplicationArguments.PravegaArgs pravegaArgs) { + + log.info("Connecting to Pravega URI: {}, Scope: {}, Stream: {}", + pravegaArgs.controllerUri, pravegaArgs.scope, pravegaArgs.stream); + + StreamManager streamManager = StreamManager.create(URI.create(pravegaArgs.controllerUri)); + + final boolean scopeIsNew = streamManager.createScope(pravegaArgs.scope); + if (!scopeIsNew) { + log.info("Scope: {} is already available", pravegaArgs.scope); + } + + StreamConfiguration streamConfig = StreamConfiguration.builder() + .scalingPolicy(ScalingPolicy.byEventRate(pravegaArgs.targetRate, pravegaArgs.scaleFactor, pravegaArgs.minNumSegments)) + .build(); + + final boolean streamIsNew = streamManager.createStream(pravegaArgs.scope, pravegaArgs.stream, streamConfig); + if (!streamIsNew) { + log.info("Stream: {} is already available", pravegaArgs.stream); + } + + ClientFactory clientFactory = ClientFactory.withScope(pravegaArgs.scope, URI.create(pravegaArgs.controllerUri)); + EventStreamWriter writer = clientFactory.createEventWriter(pravegaArgs.stream, + new JavaSerializer(), + EventWriterConfig.builder().build()); + + return writer; + + } +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/resources/logback.xml b/scenarios/mqtt-pravega-bridge/src/main/resources/logback.xml new file mode 100644 index 00000000..34f9c7c0 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/resources/logback.xml @@ -0,0 +1,24 @@ + + + + + System.out + + %-5level [%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %logger{36}: %msg%n + + + + + + + diff --git a/settings.gradle b/settings.gradle index 34daecb2..b4672502 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,3 +18,4 @@ include 'scenarios/turbine-heat-sensor' include 'scenarios/turbine-heat-processor' include 'scenarios/anomaly-detection' include 'scenarios/pravega-flink-connector-sql-samples' +include 'scenarios/mqtt-pravega-bridge'