Skip to content

Commit

Permalink
cm
Browse files Browse the repository at this point in the history
  • Loading branch information
S-Gabriele committed Jul 29, 2022
1 parent 6c849c3 commit 19867c3
Show file tree
Hide file tree
Showing 1,150 changed files with 3,280,470 additions and 1 deletion.
3 changes: 3 additions & 0 deletions Kibana/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM docker.elastic.co/kibana/kibana:8.2.0

#ADD kibana.yml /usr/share/kibana/config
Empty file added Kibana/dashboard.ndjson
Empty file.
166 changes: 166 additions & 0 deletions Kibana/elasticsearch/configs/kibana.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# For more configuration options see the configuration guide for Kibana in
# https://www.elastic.co/guide/index.html

# =================== System: Kibana Server ===================
# Kibana is served by a back end server. This setting specifies the port to use.
server.port: 5601

# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "0.0.0.0"

# Enables you to specify a path to mount Kibana at if you are running behind a proxy.
# Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath
# from requests it receives, and to prevent a deprecation warning at startup.
# This setting cannot end in a slash.
#server.basePath: ""

# Specifies whether Kibana should rewrite requests that are prefixed with
# `server.basePath` or require that they are rewritten by your reverse proxy.
# Defaults to `false`.
#server.rewriteBasePath: false

# Specifies the public URL at which Kibana is available for end users. If
# `server.basePath` is configured this URL should end with the same basePath.
#server.publicBaseUrl: ""

# The maximum payload size in bytes for incoming server requests.
server.maxPayload: 1048576

# The Kibana server's name. This is used for display purposes.
server.name: "kibana"

# =================== System: Kibana Server (Optional) ===================
# Enables SSL and paths to the PEM-format SSL certificate and SSL key files, respectively.
# These settings enable SSL for outgoing requests from the Kibana server to the browser.
#server.ssl.enabled: false
#server.ssl.certificate: /path/to/your/server.crt
#server.ssl.key: /path/to/your/server.key

# =================== System: Elasticsearch ===================
# The URLs of the Elasticsearch instances to use for all your queries.
elasticsearch.hosts: ["https://elasticsearch01:9200"]

# If your Elasticsearch is protected with basic authentication, these settings provide
# the username and password that the Kibana server uses to perform maintenance on the Kibana
# index at startup. Your Kibana users still need to authenticate with Elasticsearch, which
# is proxied through the Kibana server.
elasticsearch.username: "elastic"
elasticsearch.password: "energyanalyzer"

# Kibana can also authenticate to Elasticsearch via "service account tokens".
# Service account tokens are Bearer style tokens that replace the traditional username/password based configuration.
# Use this token instead of a username/password.
# elasticsearch.serviceAccountToken: "my_token"

# Time in milliseconds to wait for Elasticsearch to respond to pings. Defaults to the value of
# the elasticsearch.requestTimeout setting.
elasticsearch.pingTimeout: 15000

# Time in milliseconds to wait for responses from the back end or Elasticsearch. This value
# must be a positive integer.
elasticsearch.requestTimeout: 30000

# The maximum number of sockets that can be used for communications with elasticsearch.
# Defaults to `Infinity`.
elasticsearch.maxSockets: 1024

# Specifies whether Kibana should use compression for communications with elasticsearch
# Defaults to `false`.
elasticsearch.compression: false

# List of Kibana client-side headers to send to Elasticsearch. To send *no* client-side
# headers, set this value to [] (an empty list).
#elasticsearch.requestHeadersWhitelist: [ authorization ]

# Header names and values that are sent to Elasticsearch. Any custom headers cannot be overwritten
# by client-side headers, regardless of the elasticsearch.requestHeadersWhitelist configuration.
#elasticsearch.customHeaders: {}

# Time in milliseconds for Elasticsearch to wait for responses from shards. Set to 0 to disable.
elasticsearch.shardTimeout: 30000

# =================== System: Elasticsearch (Optional) ===================
# These files are used to verify the identity of Kibana to Elasticsearch and are required when
# xpack.security.http.ssl.client_authentication in Elasticsearch is set to required.
#elasticsearch.ssl.certificate: /path/to/your/client.crt
#elasticsearch.ssl.key: /path/to/your/client.key

# Enables you to specify a path to the PEM file for the certificate
# authority for your Elasticsearch instance.
#elasticsearch.ssl.certificateAuthorities: [ "/path/to/your/CA.pem" ]

# To disregard the validity of SSL certificates, change this setting's value to 'none'.
#elasticsearch.ssl.verificationMode: full

# =================== System: Logging ===================
# Set the value of this setting to off to suppress all logging output, or to debug to log everything. Defaults to 'info'
logging.root.level: debug

# Enables you to specify a file where Kibana stores log output.
# logging.appenders.default:
# type: file
# fileName: /var/logs/kibana.log
# layout:
# type: json

# Logs queries sent to Elasticsearch.
# logging.loggers:
# - name: elasticsearch.query
# level: debug

# Logs http responses.
#logging.loggers:
# - name: http.server.response
# level: debug

# Logs system usage information.
#logging.loggers:
# - name: metrics.ops
# level: debug

# =================== System: Other ===================
# The path where Kibana stores persistent data not saved in Elasticsearch. Defaults to data
#path.data: data

# Specifies the path where Kibana creates the process ID file.
#pid.file: /run/kibana/kibana.pid

# Set the interval in milliseconds to sample system and process performance
# metrics. Minimum is 100ms. Defaults to 5000ms.
#ops.interval: 5000

# Specifies locale to be used for all localizable strings, dates and number formats.
# Supported languages are the following: English (default) "en", Chinese "zh-CN", Japanese "ja-JP", French "fr-FR".
#i18n.locale: "en"

# =================== Frequently used (Optional)===================

# =================== Saved Objects: Migrations ===================
# Saved object migrations run at startup. If you run into migration-related issues, you might need to adjust these settings.

# The number of documents migrated at a time.
# If Kibana can't start up or upgrade due to an Elasticsearch `circuit_breaking_exception`,
# use a smaller batchSize value to reduce the memory pressure. Defaults to 1000 objects per batch.
#migrations.batchSize: 1000

# The maximum payload size for indexing batches of upgraded saved objects.
# To avoid migrations failing due to a 413 Request Entity Too Large response from Elasticsearch.
# This value should be lower than or equal to your Elasticsearch cluster’s `http.max_content_length`
# configuration option. Default: 100mb
#migrations.maxBatchSizeBytes: 100mb

# The number of times to retry temporary migration failures. Increase the setting
# if migrations fail frequently with a message such as `Unable to complete the [...] step after
# 15 attempts, terminating`. Defaults to 15
#migrations.retryAttempts: 15

# =================== Search Autocomplete ===================
# Time in milliseconds to wait for autocomplete suggestions from Elasticsearch.
# This value must be a whole number greater than zero. Defaults to 1000ms
#unifiedSearch.autocomplete.valueSuggestions.timeout: 1000

# Maximum number of documents loaded by each shard to generate autocomplete suggestions.
# This value must be a whole number greater than zero. Defaults to 100_000
#unifiedSearch.autocomplete.valueSuggestions.terminateAfter: 100000
3 changes: 3 additions & 0 deletions Kibana/export.ndjson

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions Kibana/kibana.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# server.name: kibana
# server.host: "kibana"
# elastisearch.hosts: ["http://elasticsearch:9200"]
# vis_type_vega.enableExternalUrls: true
# xpack.monitoring.ui.container.elasticsearch.enabled: true
# xpack.encryptedSavedObjects.encryptionKey: iYU2RF{yT&&-]8D:ew#;b!*6/8=2q:Px

server.name: "kibana"
server.host: "0.0.0.0"
server.shutdownTimeout: "5s"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
vis_type_vega.enableExternalUrls: true
xpack.monitoring.ui.container.elasticsearch.enabled: true
xpack.encryptedSavedObjects.encryptionKey: iYU2RF{yT&&-]8D:ew#;b!*6/8=2q:Px
monitoring.ui.container.elasticsearch.enabled: true
75 changes: 74 additions & 1 deletion README.md
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1 +1,74 @@
# Energy-Analyzer
# <p style="text-align:center"> Energy Analyzer </p>

### A data processing stream for energy production and Co2 intensity

![img](./book/images/pipeline.png?raw=true "Pipeline")
<br>
This project provides a simulated real-time visualization of the energy production of the main countries of the Euro-Asian continent and their CO2 emissions, the technologies used are the following:
* <a href="https://www.docker.com/">Docker</a> to create and manage containers
* <a href="https://app.electricitymaps.com/map">Electricity Map</a> as real time data source
* <a href=" https://www.elastic.co/logstash/">Logstash</a> for data ingestion
* <a href="https://zookeeper.apache.org/">ZooKeeper</a> + <a href="https://kafka.apache.org/">Kafka</a> for data stream processing
* <a href=" https://spark.apache.org/">Spark</a> to process data
* <a href=" https://www.elastic.co/elasticsearch/">Elastic Search</a> for data indexing
* <a href="https://www.elastic.co/kibana/">Kibana</a> for data visualization
<br>

### Run the project
```shell
$ git clone https://github.com/
$ cd
$ docker-compose up
```

### Credentials for Kibana and Elasticsearch
Elasticsearch:
>- user: elastic
>- password: energyanalyzer
Kibana:
>- user: kibana_system
>- password: energyanalyzer
### Import the dashboard into Kibana
Once the Dashboard has been built, it can be saved together with all the inserted objects (views and maps) in an ndjson type file. To do this, click on the menu on the left and go to Management> Stack Management, in the drop-down on the left click on Kibana> Saved Objects, find your Dashboard and export it, making sure to include the objects inside it. <br>
![img](./book/images/export.png?raw=true "Export")

To reload the saved Dashboard re-enter the Saved Objects section and click on import.

![img](./book/images/import.png?raw=true "Export")



### Exit status 78 of elasticsearch01 using WSL
The elasticsearch01 container could come out with exit status 78, going to see the errors you will probably see the message <br>
>- "Elasticsearch: Max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]". <br>
The error message states that the memory granted to the WSL is too low <br>
If this is the case, it should be sufficient to run these two commands using a prompt before compose up: <br>
```shell
$ wsl -d docker-desktop
$ sysctl -w vm.max_map_count=262144
```

## Useful links

|Service | Link | Note |
|------------------------|------------------------|---------------------------------------------------|
|KafkaUI |http://localhost:8080 | To check the status of topics and their messages |
|Cluster Elastic Search |https://localhost:9200/ | To view the ES index |
|Kibana |http://localhost:5601/ | To access the dashboard |

<br>
## Authors
- <a href="https://www.docker.com/">Gabriele Sanguedolce</a>
- <a href="https://www.docker.com/">Francesco Cristoforo Conti</a>









10 changes: 10 additions & 0 deletions Spark/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM jupyter/pyspark-notebook:spark-3.1.1

RUN pip3 install pyspark numpy elasticsearch pandas requests ndjson

WORKDIR /app

COPY ./model ./model
COPY ./code .

ENTRYPOINT ["spark-submit", "--packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1", "--master", "local[*]", "SparkClass.py"]
115 changes: 115 additions & 0 deletions Spark/code/SparkClass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from ast import Str
from functools import partial
import pandas as pd
import json
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import *
from pyspark.sql.types import StructType,StructField, StringType
from elasticsearch import Elasticsearch
from datetime import datetime
from time import sleep
from pyspark.sql.functions import from_json
import pyspark.sql.types as tp
from pyspark.sql import types as st

from pyspark.ml import PipelineModel



APP_NAME = 'prize_prediction-streaming'
APP_BATCH_INTERVAL = 1


kafkaServer="kafkaserver:9092"
topic = "dati_energetici"
def process_row(row):


elastic_host="https://elasticsearch01:9200"
elastic_index="energy_prize_es"

es = Elasticsearch(
elastic_host,
ca_certs="/app/certs/ca/ca.crt",
basic_auth=("elastic", "energyanalyzer"),
)

dati=row.asDict()
dati["total_co2"]=dati["co2intensity"]*dati["totalProduction"]
resp = es.index(index = elastic_index, document=dati)

print(resp)

'''
for idx, row in enumerate(batch_df.collect()):
row_dict = row.asDict()
id = f'{batch_id}-{idx}'
resp = es.index(index=elastic_index, id=id, document=row_dict)
print(resp)
batch_df.show()
'''


spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

model = PipelineModel.load("model")


# Define Training Set Structure
#Camabaire tutto un String
energyKafka = tp.StructType([

tp.StructField('countryCode', dataType= tp.StringType()),
tp.StructField('totalProduction', dataType= tp.DoubleType()),
tp.StructField('maxProduction', dataType= tp.DoubleType()),
tp.StructField('price', dataType= tp.DoubleType()),
tp.StructField('production_biomass', dataType= tp.DoubleType()),
tp.StructField('production_coal', dataType= tp.DoubleType()),
tp.StructField('production_gas', dataType= tp.DoubleType()),
tp.StructField('production_hydro', dataType= tp.DoubleType()),
tp.StructField('production_nuclear', dataType= tp.DoubleType()),
tp.StructField('production_oil', dataType= tp.DoubleType()),
tp.StructField('production_solar', dataType= tp.DoubleType()),
tp.StructField('production_unknown', dataType= tp.DoubleType()),
tp.StructField('production_wind', dataType= tp.DoubleType()),
tp.StructField('production_geothermal', dataType= tp.DoubleType()),
tp.StructField('co2intensity', dataType= tp.DoubleType()),
tp.StructField('stateDatetime', dataType= tp.StringType())

])




# Streaming Query
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafkaServer) \
.option("subscribe", topic) \
.load()




df = df.selectExpr("CAST(timestamp AS STRING)","CAST(value AS STRING)")\
.select(from_json("value", energyKafka).alias("data"))\
.select("data.*")\
.na.fill(0)


results = model.transform(df)

results = results.drop("features", "scaled_features")


results = results.writeStream \
.foreach(process_row).start()


results.awaitTermination()
Binary file added Spark/model/metadata/._SUCCESS.crc
Binary file not shown.
Binary file added Spark/model/metadata/.part-00000.crc
Binary file not shown.
Empty file added Spark/model/metadata/_SUCCESS
Empty file.
1 change: 1 addition & 0 deletions Spark/model/metadata/part-00000
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"class":"org.apache.spark.ml.PipelineModel","timestamp":1659052601714,"sparkVersion":"3.2.0","uid":"PipelineModel_051807366400","paramMap":{"stageUids":["VectorAssembler_3edba6aff9b2","StandardScaler_68d1436f7533","LinearRegression_71e1cbbb6b60"]},"defaultParamMap":{}}
Binary file not shown.
Binary file not shown.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"class":"org.apache.spark.ml.feature.VectorAssembler","timestamp":1659052602231,"sparkVersion":"3.2.0","uid":"VectorAssembler_3edba6aff9b2","paramMap":{"outputCol":"features","inputCols":["totalProduction","maxProduction","production_biomass","production_coal","production_gas","production_geothermal","production_hydro","production_nuclear","production_oil","production_solar","production_unknown","production_wind"]},"defaultParamMap":{"outputCol":"VectorAssembler_3edba6aff9b2__output","handleInvalid":"error"}}
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"class":"org.apache.spark.ml.feature.StandardScalerModel","timestamp":1659052602831,"sparkVersion":"3.2.0","uid":"StandardScaler_68d1436f7533","paramMap":{"withMean":true,"withStd":true,"outputCol":"scaled_features","inputCol":"features"},"defaultParamMap":{"withMean":false,"withStd":true,"outputCol":"StandardScaler_68d1436f7533__output"}}
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Loading

0 comments on commit 19867c3

Please sign in to comment.