Skip to content
This repository was archived by the owner on Apr 14, 2023. It is now read-only.

Commit 6fad78a

Browse files
committed
Accommodating for HopsUtil renaming to Hops.
1 parent 48b5d23 commit 6fad78a

File tree

10 files changed

+63
-63
lines changed

10 files changed

+63
-63
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ mvn package
1414
Generates a jar for each module which can then either be used to create HopsWorks jobs (Spark/Flink) or execute Hive queries remotely.
1515

1616
# Helper Libraries
17-
Hops Examples makes use of **HopsUtil**, a set of Java and Python libraries which provide developers with tools that make programming on Hops easy. *HopsUtil* is automatically made available to all Jobs and Notebooks, without the user having to explicitely import it. Detailed documentation on HopsUtil is available [here](https://github.com/hopshadoop/hops-util).
17+
Hops Examples makes use of **Hops**, a set of Java and Python libraries which provide developers with tools that make programming on Hops easy. *Hops* is automatically made available to all Jobs and Notebooks, without the user having to explicitely import it. Detailed documentation on Hops is available [here](https://github.com/hopshadoop/hops-util).
1818

1919
# Spark
2020
## Structured Streaming with Kafka and HopsFS
@@ -35,7 +35,7 @@ Usage: <type>(producer|consumer)
3535
Data consumed is be default persisted to the `Resources` dataset of the Project where the job is running.
3636

3737
### Avro Records
38-
`StructuredStreamingKafka.java` generates *String <key,value>* pairs which are converted by **HopsUtil** into Avro records and serialized into bytes. Similarly, during consuming from a Kafka source, messages are deserialized into Avro records. **The default Avro schema used is the following**:
38+
`StructuredStreamingKafka.java` generates *String <key,value>* pairs which are converted by **Hops** into Avro records and serialized into bytes. Similarly, during consuming from a Kafka source, messages are deserialized into Avro records. **The default Avro schema used is the following**:
3939

4040
```json
4141
{
@@ -140,7 +140,7 @@ For Avro schemas with more fields, the application's SourceFunction should use a
140140

141141
2. For examples on customizing logging for Flink jobs on HopsWorks see [here](https://github.com/hopshadoop/hops-kafka-examples/tree/master/examples-flink).
142142

143-
3. *StreamingExample* makes use of [here](https://github.com/hopshadoop/hops-util). When building this project, HopsUtil is automatically included in the assembled jar file.
143+
3. *StreamingExample* makes use of [here](https://github.com/hopshadoop/hops-util). When building this project, Hops is automatically included in the assembled jar file.
144144

145145

146146
## Job Logging

flink/src/main/java/io/hops/examples/flink/kafka/FlinkKafkaExample.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import io.hops.util.HopsConsumer;
44
import io.hops.util.HopsProducer;
5-
import io.hops.util.HopsUtil;
5+
import io.hops.util.Hops;
66
import java.util.Map;
77
import org.apache.flink.api.common.functions.MapFunction;
88
import org.apache.flink.api.java.tuple.Tuple2;
@@ -75,15 +75,15 @@ public static void main(String[] args) throws Exception {
7575
// execute the program
7676
env.execute("Streaming Iteration Example");
7777

78-
HopsUtil.setup(HopsUtil.getFlinkKafkaProps(args[args.length - 1]));
78+
Hops.setup(Hops.getFlinkKafkaProps(args[args.length - 1]));
7979
if (args[1].equalsIgnoreCase("producer")) {
8080
Configuration hdConf = new Configuration();
8181
Path hdPath = new org.apache.hadoop.fs.Path(args[2]);
8282
FileSystem hdfs = hdPath.getFileSystem(hdConf);
8383
final FSDataOutputStream stream = hdfs.create(hdPath);
8484
stream.write("My first Flink program on Hops!".getBytes());
8585

86-
HopsProducer hopsKafkaProducer = HopsUtil.getHopsProducer(args[0]);
86+
HopsProducer hopsKafkaProducer = Hops.getHopsProducer(args[0]);
8787
Map<String, String> message;
8888
for (int i = 0; i < 30; i++) {
8989
message = new HashMap<>();
@@ -96,7 +96,7 @@ public static void main(String[] args) throws Exception {
9696
stream.close();
9797
hopsKafkaProducer.close();
9898
} else {
99-
final HopsConsumer hopsKafkaConsumer = HopsUtil.getHopsConsumer(args[0]);
99+
final HopsConsumer hopsKafkaConsumer = Hops.getHopsConsumer(args[0]);
100100
Thread t = new Thread() {
101101
@Override
102102
public void run() {

flink/src/main/java/io/hops/examples/flink/kafka/StreamingExample.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.hops.examples.flink.kafka;
22

33
import io.hops.util.Constants;
4-
import io.hops.util.HopsUtil;
4+
import io.hops.util.Hops;
55
import java.util.Arrays;
66
import java.util.logging.Level;
77
import java.util.logging.Logger;
@@ -41,7 +41,7 @@ public static void main(String[] args) throws Exception {
4141

4242
////////////////////////////////////////////////////////////////////////////
4343
//Hopsworks utility method to automatically set parameters for Kafka
44-
HopsUtil.setup(HopsUtil.getFlinkKafkaProps(parameterTool.get(
44+
Hops.setup(Hops.getFlinkKafkaProps(parameterTool.get(
4545
Constants.KAFKA_FLINK_PARAMS)));
4646
////////////////////////////////////////////////////////////////////////////
4747
if (parameterTool.get("type").equalsIgnoreCase("producer")) {
@@ -75,8 +75,8 @@ public void cancel() {
7575
});
7676

7777
// write data into Kafka
78-
for (String topic : HopsUtil.getTopics()) {
79-
messageStream.addSink(HopsUtil.getFlinkProducer(topic));
78+
for (String topic : Hops.getTopics()) {
79+
messageStream.addSink(Hops.getFlinkProducer(topic));
8080
}
8181
env.execute("Write into Kafka example");
8282
} else {
@@ -91,8 +91,8 @@ public void cancel() {
9191
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(
9292
Arrays.copyOf(args, args.length - 2)));
9393

94-
for (String topic : HopsUtil.getTopics()) {
95-
DataStream<String> messageStream = env.addSource(HopsUtil.
94+
for (String topic : Hops.getTopics()) {
95+
DataStream<String> messageStream = env.addSource(Hops.
9696
getFlinkConsumer(topic));
9797
String dateTimeBucketerFormat = "yyyy-MM-dd--HH";
9898
if (parameterTool.has("sink_path")) {

spark/src/main/java/io/hops/examples/spark/WorkflowExample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.hops.examples.spark;
22

33
import io.hops.util.exceptions.CredentialsNotFoundException;
4-
import io.hops.util.HopsUtil;
4+
import io.hops.util.Hops;
55
import io.hops.util.WorkflowManager;
66
import java.util.logging.Level;
77
import java.util.logging.Logger;
@@ -25,7 +25,7 @@ public static void main(String[] args)
2525

2626
SparkSession spark = SparkSession
2727
.builder()
28-
.appName(HopsUtil.getJobName())
28+
.appName(Hops.getJobName())
2929
.getOrCreate();
3030

3131
//if Start job with given ID

spark/src/main/java/io/hops/examples/spark/kafka/SimpleKafkaProcess.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import io.hops.util.HopsConsumer;
44
import io.hops.util.HopsProducer;
5-
import io.hops.util.HopsUtil;
5+
import io.hops.util.Hops;
66
import java.net.InetSocketAddress;
77
import java.util.ArrayList;
88
import java.util.HashMap;
@@ -69,7 +69,7 @@ public static void main(String[] args) throws Exception {
6969
"yourtopic")});
7070

7171
//Produce Kafka messages to topic
72-
HopsProducer hopsKafkaProducer = HopsUtil.getHopsProducer(topic);
72+
HopsProducer hopsKafkaProducer = Hops.getHopsProducer(topic);
7373

7474
Map<String, String> message;
7575
int i = 0;
@@ -86,7 +86,7 @@ public static void main(String[] args) throws Exception {
8686
hopsKafkaProducer.close();
8787
} else {
8888
//Consume kafka messages from topic
89-
HopsConsumer hopsKafkaConsumer = HopsUtil.getHopsConsumer(topic);
89+
HopsConsumer hopsKafkaConsumer = Hops.getHopsConsumer(topic);
9090
//Keep thread alive
9191
//THIS WILL CAUSE THE JOB TO HANG. USER HAS TO MANUALLY STOP THE JOB.
9292
while (true) {

spark/src/main/java/io/hops/examples/spark/kafka/StreamingExample.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import com.twitter.bijection.Injection;
55
import io.hops.util.exceptions.CredentialsNotFoundException;
66
import io.hops.util.HopsProducer;
7-
import io.hops.util.HopsUtil;
7+
import io.hops.util.Hops;
88
import io.hops.util.exceptions.SchemaNotFoundException;
99
import io.hops.util.spark.SparkConsumer;
1010
import io.hops.util.spark.SparkProducer;
@@ -59,7 +59,7 @@ public final class StreamingExample {
5959
private static final Pattern SPACE = Pattern.compile(" ");
6060
//Get HopsWorks Kafka Utility instance
6161
private static final Map<String, Injection<GenericRecord, byte[]>> recordInjections
62-
= HopsUtil.getRecordInjections();
62+
= Hops.getRecordInjections();
6363

6464
public static void main(final String[] args) throws Exception {
6565
if (args.length < 1) {
@@ -71,7 +71,7 @@ public static void main(final String[] args) throws Exception {
7171

7272
final String type = args[0];
7373
// Create context with a 2 ; batch interval
74-
Set<String> topicsSet = new HashSet<>(HopsUtil.getTopics());
74+
Set<String> topicsSet = new HashSet<>(Hops.getTopics());
7575
SparkConf sparkConf = new SparkConf().setAppName("StreamingExample");
7676
final List<HopsProducer> sparkProducers = new ArrayList<>();
7777

@@ -83,7 +83,7 @@ public static void main(final String[] args) throws Exception {
8383
@Override
8484
public void run() {
8585
try {
86-
SparkProducer sparkProducer = HopsUtil.getSparkProducer(topic);
86+
SparkProducer sparkProducer = Hops.getSparkProducer(topic);
8787
sparkProducers.add(sparkProducer);
8888
Map<String, String> message = new HashMap<>();
8989
int i = 0;
@@ -102,16 +102,16 @@ public void run() {
102102
}
103103
}.start();
104104
}//Keep application running
105-
HopsUtil.shutdownGracefully(jsc);
105+
Hops.shutdownGracefully(jsc);
106106
} else {
107107
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
108108
Durations.seconds(2));
109109
//Use applicationId for sink folder
110110
final String appId = jssc.sparkContext().getConf().getAppId();
111111

112112
//Get consumer groups
113-
List<String> consumerGroups = HopsUtil.getConsumerGroups();
114-
SparkConsumer consumer = HopsUtil.getSparkConsumer(jssc, topicsSet);
113+
List<String> consumerGroups = Hops.getConsumerGroups();
114+
SparkConsumer consumer = Hops.getSparkConsumer(jssc, topicsSet);
115115
// Create direct kafka stream with topics
116116
JavaInputDStream<ConsumerRecord<String, byte[]>> messages = consumer.
117117
createDirectStream();
@@ -186,7 +186,7 @@ public void call(JavaPairRDD<String, Integer> rdd, Time time) throws
186186
*/
187187
// Start the computation
188188
jssc.start();
189-
HopsUtil.shutdownGracefully(jssc);
189+
Hops.shutdownGracefully(jssc);
190190
}
191191
for (HopsProducer hopsProducer : sparkProducers) {
192192
hopsProducer.close();

spark/src/main/java/io/hops/examples/spark/kafka/StreamingKafkaElastic.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.google.common.base.Strings;
44
import io.hops.util.exceptions.CredentialsNotFoundException;
5-
import io.hops.util.HopsUtil;
5+
import io.hops.util.Hops;
66
import io.hops.util.exceptions.WorkflowManagerException;
77
import io.hops.util.spark.SparkConsumer;
88
import java.io.BufferedReader;
@@ -55,7 +55,7 @@ public final class StreamingKafkaElastic {
5555

5656
public static void main(final String[] args) throws Exception {
5757

58-
SparkConf sparkConf = new SparkConf().setAppName(HopsUtil.getJobName());
58+
SparkConf sparkConf = new SparkConf().setAppName(Hops.getJobName());
5959
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
6060

6161
//Use applicationId for sink folder
@@ -64,8 +64,8 @@ public static void main(final String[] args) throws Exception {
6464
//Get consumer groups
6565
Properties props = new Properties();
6666
props.put("value.deserializer", StringDeserializer.class.getName());
67-
props.put("client.id", HopsUtil.getJobName());
68-
SparkConsumer consumer = HopsUtil.getSparkConsumer(jssc, props);
67+
props.put("client.id", Hops.getJobName());
68+
SparkConsumer consumer = Hops.getSparkConsumer(jssc, props);
6969
//Store processed offsets
7070

7171
// Create direct kafka stream with topics
@@ -103,7 +103,7 @@ public void call(JavaRDD<LogEntryFilebeat> rdd, Time time) throws Exception {
103103

104104
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMdd");
105105
LocalDate localDate = LocalDate.now();
106-
row.write().mode(SaveMode.Append).parquet("/Projects/" + HopsUtil.getProjectName() + "/" + dataset + "/Logs-"
106+
row.write().mode(SaveMode.Append).parquet("/Projects/" + Hops.getProjectName() + "/" + dataset + "/Logs-"
107107
+ dtf.format(localDate));
108108
}
109109
}
@@ -118,7 +118,7 @@ public void call(JavaRDD<LogEntryFilebeat> rdd, Time time) throws Exception {
118118
*/
119119
// Start the computation
120120
jssc.start();
121-
HopsUtil.shutdownGracefully(jssc);
121+
Hops.shutdownGracefully(jssc);
122122
}
123123

124124
private static JSONObject parser(String logType, String line, String appId)
@@ -165,9 +165,9 @@ private static JSONObject parser(String logType, String line, String appId)
165165
index.put("file", jsonLog.getString("source"));
166166
index.put("application", appId);
167167
index.put("host", jsonLog.getJSONObject("beat").getString("hostname"));
168-
index.put("project", HopsUtil.getProjectName());
168+
index.put("project", Hops.getProjectName());
169169
index.put("method", "?");
170-
index.put("jobname", HopsUtil.getJobName());
170+
index.put("jobname", Hops.getJobName());
171171
} else if (logType.equalsIgnoreCase("custom")) {
172172
//Catch an error getting log attributes and set default ones in this case
173173
//Example message
@@ -228,9 +228,9 @@ private static JSONObject parser(String logType, String line, String appId)
228228
}
229229
index.put("application", appId);
230230
index.put("host", jsonLog.getJSONObject("beat").getString("hostname"));
231-
index.put("project", HopsUtil.getProjectName());
231+
index.put("project", Hops.getProjectName());
232232
index.put("method", "?");
233-
index.put("jobname", HopsUtil.getJobName());
233+
index.put("jobname", Hops.getJobName());
234234

235235
if (jsonLog.getString("source").contains("shrek")) {
236236
index.put("location", "59.32,18.06");
@@ -239,15 +239,15 @@ private static JSONObject parser(String logType, String line, String appId)
239239
}
240240
if (!Strings.isNullOrEmpty(priority) && priority.equalsIgnoreCase("TRACE")) {
241241
LOG.log(Level.INFO, "Sending email");
242-
HopsUtil.getWorkflowManager().sendEmail("[email protected]", "Error message received", timestamp + " :: " + message);
242+
Hops.getWorkflowManager().sendEmail("[email protected]", "Error message received", timestamp + " :: " + message);
243243
}
244244

245245
}
246246
URL obj;
247247
HttpURLConnection conn = null;
248248
BufferedReader br = null;
249249
try {
250-
obj = new URL("http://" + HopsUtil.getElasticEndPoint() + "/" + HopsUtil.getProjectName().toLowerCase()
250+
obj = new URL("http://" + Hops.getElasticEndPoint() + "/" + Hops.getProjectName().toLowerCase()
251251
+ "/logs");
252252
conn = (HttpURLConnection) obj.openConnection();
253253
conn.setDoOutput(true);

spark/src/main/java/io/hops/examples/spark/kafka/StreamingLogs.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.hops.examples.spark.kafka;
22

3-
import io.hops.util.HopsUtil;
3+
import io.hops.util.Hops;
44
import io.hops.util.exceptions.SchemaNotFoundException;
55
import io.hops.util.spark.SparkConsumer;
66
import java.io.IOException;
@@ -49,7 +49,7 @@ public final class StreamingLogs {
4949

5050
public static void main(final String[] args) throws Exception {
5151

52-
SparkConf sparkConf = new SparkConf().setAppName(HopsUtil.getJobName());
52+
SparkConf sparkConf = new SparkConf().setAppName(Hops.getJobName());
5353
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
5454

5555
//Use applicationId for sink folder
@@ -58,8 +58,8 @@ public static void main(final String[] args) throws Exception {
5858
//Get consumer groups
5959
Properties props = new Properties();
6060
props.put("value.deserializer", StringDeserializer.class.getName());
61-
props.put("client.id", HopsUtil.getJobName());
62-
SparkConsumer consumer = HopsUtil.getSparkConsumer(jssc, props);
61+
props.put("client.id", Hops.getJobName());
62+
SparkConsumer consumer = Hops.getSparkConsumer(jssc, props);
6363
//Store processed offsets
6464

6565
// Create direct kafka stream with topics
@@ -97,7 +97,7 @@ public void call(JavaRDD<NamenodeLogEntry> rdd, Time time) throws
9797
Dataset<Row> row = sparkSession.createDataFrame(rdd, NamenodeLogEntry.class);
9898
if (!rdd.isEmpty()) {
9999
row.write().mode(SaveMode.Append).
100-
parquet("/Projects/" + HopsUtil.getProjectName() + "/Resources/LogAnalysis");
100+
parquet("/Projects/" + Hops.getProjectName() + "/Resources/LogAnalysis");
101101
}
102102
}
103103
});
@@ -111,7 +111,7 @@ public void call(JavaRDD<NamenodeLogEntry> rdd, Time time) throws
111111
*/
112112
// Start the computation
113113
jssc.start();
114-
HopsUtil.shutdownGracefully(jssc);
114+
Hops.shutdownGracefully(jssc);
115115
}
116116

117117
private static JSONObject parser(String line, String appId) {
@@ -152,8 +152,8 @@ private static JSONObject parser(String line, String appId) {
152152
index.put("timestamp", timestamp);
153153
index.put("application", appId);
154154
index.put("host", jsonLog.getJSONObject("beat").getString("hostname"));
155-
index.put("project", HopsUtil.getProjectName());
156-
index.put("jobname", HopsUtil.getJobName());
155+
index.put("project", Hops.getProjectName());
156+
index.put("jobname", Hops.getJobName());
157157
if (jsonLog.getString("source").contains("/")) {
158158
index.put("file", jsonLog.getString("source").substring(jsonLog.getString("source").lastIndexOf("/") + 1));
159159
} else {

0 commit comments

Comments
 (0)