Skip to content

Commit

Permalink
Issue 187: Add MQTT to Pravega bridge sample
Browse files Browse the repository at this point in the history
Signed-off-by: Claudio Fahey <[email protected]>

# Conflicts:
#	README.md
#	settings.gradle
  • Loading branch information
Claudio Fahey committed Apr 18, 2019
1 parent e0d152c commit c736fd4
Show file tree
Hide file tree
Showing 15 changed files with 622 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ The related documentation and instructions are [here](hadoop-connector-examples)
| [`turbineheatprocessor`](scenarios/turbine-heat-processor) | A Flink streaming application for processing temperature data from a Pravega stream produced by the `turbineheatsensor` app. The application computes a daily summary of the temperature range observed on that day by each sensor. | [Java](scenarios/turbine-heat-processor/src/main/java/io/pravega/turbineheatprocessor), [Scala](scenarios/turbine-heat-processor/src/main/scala/io/pravega/turbineheatprocessor)
| [`anomaly-detection`](scenarios/anomaly-detection) | A Flink streaming application for detecting anomalous input patterns using a finite-state machine. | [Java](scenarios/anomaly-detection/src/main/java/io/pravega/anomalydetection)
| [`pravega-flink-connector-sql-samples`](scenarios/pravega-flink-connector-sql-samples) | Flink connector table api/sql samples. | [Java](scenarios/pravega-flink-connector-sql-samples/src/main/java/io/pravega/connectors.nytaxi)
| [`mqtt-pravega-bridge`](scenarios/mqtt-pravega-bridge) | A sample application reads events from MQTT and writes them to a Pravega stream. | [Java](scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega)


# Build Instructions
Expand Down
86 changes: 86 additions & 0 deletions scenarios/mqtt-pravega-bridge/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# MQTT to Pravega Bridge

This sample application reads events from MQTT and writes them to a Pravega stream.

## Components

- Pravega: Pravega provides a new storage abstraction - a stream - for continuous and unbounded data.
A Pravega stream is a durable, elastic, append-only, unbounded sequence of bytes that has good performance and strong consistency.

Pravega provides dynamic scaling that can increase and decrease parallelism to automatically respond
to changes in the event rate.

See <http://pravega.io> for more information.

- Pravega Video Demo: This is a simple command-line application that demonstrates how to write video files to Pravega.
It also demonstrates how to read the video files from Pravega and decode them.

- Docker: This demo uses Docker and Docker Compose to greatly simplify the deployment of the various
components on Linux and/or Windows servers, desktops, or even laptops.
For more information, see <https://en.wikipedia.org/wiki/Docker_(software)>.

## Building and Running the Demo

### Install Java 8

```
apt-get install openjdk-8-jdk
```

### Install IntelliJ

Install from <https://www.jetbrains.com/idea>.
Enable the Lombok plugin.
Enable Annotations (settings -> build, execution, deployment, -> compiler -> annotation processors).

### Install Docker and Docker Compose

See <https://docs.docker.com/install/linux/docker-ce/ubuntu/>
and <https://docs.docker.com/compose/install/>.

### Run Pravega

This will run a development instance of Pravega locally.
In the command below, replace x.x.x.x with the IP address of a local network interface such as eth0.

```
cd
git clone https://github.com/pravega/pravega
cd pravega
export HOST_IP=x.x.x.x
docker-compose up -d
```

You can view the Pravega logs with `docker-compose logs --follow`.
You can view the stream files stored on HDFS with `docker-compose exec hdfs hdfs dfs -ls -h -R /`.

### Usage

- Install Mosquitto MQTT broker and clients.
```
sudo apt-get install mosquitto mosquitto-clients
```

- If not automatically started, start Mosquitto broker.
```
mosquitto
```

- Edit the file scenarios/mqtt-pravega-bridge/src/main/dist/conf
to specify your Pravega controller URI (controllerUri) as
`tcp://HOST_IP:9090`.

- In IntelliJ, run the class com.dell.mqtt.pravega.ApplicationMain with the following parameters:
```
scenarios/mqtt-pravega-bridge/src/main/dist/conf
```

- Publish a sample MQTT message.
```
mosquitto_pub -t center/0001 -m "12,34,56.78"
```

- You should see the following application output:
```
[MQTT Call: CanDataReader] com.dell.mqtt.pravega.MqttListener: Writing Data Packet: CarID: 0001 Timestamp: 1551671403118 Payload: [B@2813d92f annotation: null
```
58 changes: 58 additions & 0 deletions scenarios/mqtt-pravega-bridge/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
plugins {
id 'com.github.johnrengelman.shadow' version '1.2.4'
}

group 'com.dell.mqtt'
version = samplesVersion

task wrapper(type: Wrapper) {
gradleVersion = '3.1'
distributionUrl = "https://services.gradle.org/distributions/gradle-$gradleVersion-all.zip"
}

apply plugin: 'java'
apply plugin: "distribution"
apply plugin: 'application'

sourceCompatibility = 1.8
targetCompatibility = 1.8
mainClassName = 'com.dell.mqtt.pravega.ApplicationMain'
applicationDefaultJvmArgs = ["-Dlog4j.configuration=file:conf/log4j.properties"]
archivesBaseName = 'pravega-mqtt-bridge'

repositories {
mavenCentral()
maven {
url "https://repository.apache.org/snapshots"
}
maven {
url "https://oss.sonatype.org/content/repositories/snapshots"
}
}

dependencies {
testCompile group: 'junit', name: 'junit', version: '4.11'
compile "org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0"
compile "org.slf4j:slf4j-api:1.6.6"
compile "ch.qos.logback:logback-classic:1.0.9"
compile "com.google.guava:guava:20.0"
compile "io.pravega:pravega-client:${pravegaVersion}"
}

shadowJar {
dependencies {
include dependency("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0")
}
}

distributions {
main {
baseName = archivesBaseName
contents {
into('lib') {
from shadowJar
from(project.configurations.shadow)
}
}
}
}
14 changes: 14 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/bridge.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Pravega Properties
controllerUri=tcp://127.0.0.1:9090
scope=examples
stream=mqtt-example
scaling.targetRate=100
scaling.scaleFactor=3
scaling.minNumSegments=3

# MQTT Properties
brokerUri=tcp://127.0.0.1:1883
topic=center/#
clientId=CanDataReader
userName=admin
password=password
13 changes: 13 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
log4j.rootLogger=INFO, stdout, file

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=workspace.log
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
24 changes: 24 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
-->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<encoder>
<pattern>%-5level [%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %logger{36}: %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
32 changes: 32 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/logger.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logFile.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- daily rollover -->
<fileNamePattern>workspace.%d{yyyy-MM-dd}.log</fileNamePattern>

<!-- keep 1 days' worth of history -->
<maxHistory>5</maxHistory>
</rollingPolicy>

<encoder>
<pattern>%d{HH:mm:ss.SSS} %-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
</encoder>
</appender>

<root level="info" additivity="false">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
</root>


</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.dell.mqtt.pravega;

import com.google.common.base.Preconditions;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;

public class ApplicationArguments {

private final PravegaArgs pravegaArgs = new PravegaArgs();
private final MqttArgs mqttArgs = new MqttArgs();

public ApplicationArguments(String confDir) throws Exception {
loadProperties(confDir);
}

private void loadProperties(String confDir) throws Exception{
Properties prop = new Properties();
try (
InputStream inputStream = new FileInputStream(confDir + File.separator + "bridge.properties");
)
{
prop.load(inputStream);

pravegaArgs.controllerUri = prop.getProperty("controllerUri");
pravegaArgs.scope = prop.getProperty("scope");
pravegaArgs.stream = prop.getProperty("stream");
pravegaArgs.targetRate = Integer.parseInt(prop.getProperty("scaling.targetRate"));
pravegaArgs.scaleFactor = Integer.parseInt(prop.getProperty("scaling.scaleFactor"));
pravegaArgs.minNumSegments = Integer.parseInt(prop.getProperty("scaling.minNumSegments"));

mqttArgs.brokerUri = prop.getProperty("brokerUri");
mqttArgs.topic = prop.getProperty("topic");
mqttArgs.clientId = prop.getProperty("clientId");
mqttArgs.userName = prop.getProperty("userName");
mqttArgs.password = prop.getProperty("password");

Preconditions.checkNotNull(pravegaArgs.controllerUri, "Pravega Controller URI is missing");
Preconditions.checkNotNull(pravegaArgs.scope, "Pravega scope is missing");
Preconditions.checkNotNull(pravegaArgs.stream, "Pravega stream is missing");

Preconditions.checkNotNull(mqttArgs.brokerUri, "MQTT Broker URI is missing");
Preconditions.checkNotNull(mqttArgs.topic, "MQTT topic is missing");
Preconditions.checkNotNull(mqttArgs.clientId, "MQTT clientId is missing");
Preconditions.checkNotNull(mqttArgs.userName, "MQTT userName is missing");
Preconditions.checkNotNull(mqttArgs.password, "MQTT password is missing");
}
}

public PravegaArgs getPravegaArgs() {
return pravegaArgs;
}

public MqttArgs getMqttArgs() {
return mqttArgs;
}

public static class PravegaArgs {
protected String controllerUri;
protected String scope;
protected String stream;
protected int targetRate;
protected int scaleFactor;
protected int minNumSegments;
}

public static class MqttArgs {
protected String brokerUri;
protected String topic;
protected String clientId;
protected String userName;
protected String password;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.dell.mqtt.pravega;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;

public class ApplicationMain {

private static Logger log = LoggerFactory.getLogger( ApplicationMain.class );

public static void main(String ... args) {

if (args.length != 1) {
log.error("Missing required arguments. Usage: java com.dell.mqtt.pravega.ApplicationMain <CONF_DIR_PATH>");
return;
}

String confDir = args[0];
log.info("loading configurations from {}", confDir);

final CountDownLatch latch = new CountDownLatch(1);

try {
ApplicationArguments applicationArguments = new ApplicationArguments(confDir);
MqttListener listener = new MqttListener(applicationArguments.getPravegaArgs());

MqttConnectionBuilder builder = new MqttConnectionBuilder();
builder.brokerUri(applicationArguments.getMqttArgs().brokerUri);
builder.topic(applicationArguments.getMqttArgs().topic);
builder.clientId(applicationArguments.getMqttArgs().clientId);
builder.userName(applicationArguments.getMqttArgs().userName);
builder.password(applicationArguments.getMqttArgs().password);
builder.bridge(listener);

MqttClient mqttClient = builder.connect();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Going to close the application");
if (mqttClient != null) {
try {
mqttClient.close();
} catch (MqttException e) {
log.error("Exception Occurred while closing MQTT client", e);
}
}
latch.countDown();
}));
} catch (Exception e) {
log.error("Exception Occurred", e);
}

}
}
Loading

0 comments on commit c736fd4

Please sign in to comment.