Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes concept #26

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
14 changes: 14 additions & 0 deletions vungochoan/forward/configure.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# IP rabbitMQ server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use key=value or key: value format

172.17.0.2

# name exchange forwarder
transmit_receive_data

# name queue forwarder received message
medial_queue

# name binding_key route exchange forwarder
forward_write_data

# name exchange write data in database
write_data_in_database
52 changes: 52 additions & 0 deletions vungochoan/forward/forwarder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pika
import sys

file = open("configure.cfg", "r")
data = file.readlines()

# Connection rabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters(host = data[1].replace('\n', '')))
channel = connection.channel()

def get(channel):
# Create exchange type topic
channel.exchange_declare(exchange = data[4].replace('\n', ''), exchange_type = 'direct')

# Create queue
channel.queue_declare(queue = data[7].replace('\n', ''))

binding_keys = [data[10].replace('\n', '')]

for binding_key in binding_keys:
# Link exchange to queue with routing_key
channel.queue_bind(
exchange = data[4].replace('\n', ''),
queue = data[7].replace('\n', ''),
routing_key = binding_key
)

print(' [+] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
routing_data(method.routing_key, body)

channel.basic_consume(callback, queue = data[7].replace('\n', ''), no_ack = True)
channel.start_consuming()

def routing_data(routing_key, body):
connection = pika.BlockingConnection(pika.ConnectionParameters(host = data[1].replace('\n', '')))
channel = connection.channel()

# Create exchange type direct
channel.exchange_declare(exchange = data[13].replace('\n', ''), exchange_type = 'direct')

routing = routing_key
message = body

channel.basic_publish(exchange = data[13].replace('\n', ''), routing_key = routing, body = message)

print(" [x] Sent %r:%r" % (routing, message))

connection.close()

get(channel)
32 changes: 32 additions & 0 deletions vungochoan/writeDB/configure.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# IP rabbitMQ server
172.17.0.2

# name binding_key route exchange forwarder
forward_write_data

# name exchange write data in database
write_data_in_database

# name queue write data in database
write_data

# host server influxdb
172.17.0.3

# port influxdb
8086

# user influxdb
root

# password user influxdb
root

# name database
dataIoT

# user database
user1

# password user database
123456
102 changes: 102 additions & 0 deletions vungochoan/writeDB/writeDB.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import pika
import sys
import argparse
import json

from influxdb import InfluxDBClient

file = open("configure.cfg", "r")
data = file.readlines()

# Connection rabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters(host = data[1].replace('\n', '')))
channel = connection.channel()

def getData(channel):
# Create exchange type direct
channel.exchange_declare(exchange = data[7].replace('\n', ''), exchange_type = 'direct')

# Create queue
channel.queue_declare(queue = data[10].replace('\n', ''))

# Link exchange to queue with routing_key
routing_keys = [data[4].replace('\n', '')]
for routing_key in routing_keys:
channel.queue_bind(
exchange = data[7].replace('\n', ''),
queue = data[10].replace('\n', ''),
routing_key = routing_key
)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
# print(body)
writeDB(body)

channel.basic_consume(callback, queue = data[10].replace('\n', ''), no_ack = True)
channel.start_consuming()

def writeDB(body):
def main(host = data[13].replace('\n', ''), port = data[16].replace('\n', '')):
"""Instantiate a connection to the InfluxDB."""
user = data[19].replace('\n', '')
password = data[22].replace('\n', '')
dbname = data[25].replace('\n', '')
dbuser = data[28].replace('\n', '')
dbuser_password = data[31].replace('\n', '')
query = 'SELECT * FROM "temperature"'
json_body = json.loads(body)

client = InfluxDBClient(host, port, user, password, dbname)

# print("Drop user: " + dbuser)
# client.drop_user(dbuser)

# print("Drop database: " + dbname)
# client.drop_database(dbname)

# print("Create database: " + dbname)
client.create_database(dbname)

# # # print("Create a retention policy")
# # # client.create_retention_policy('awesome_policy', '3d', 3, default=True)

# print("Create user: " + dbuser)
client.create_user(dbuser, dbuser_password)

# print("Switch user: " + dbuser)
client.switch_user(dbuser, dbuser_password)

# print("Write points: {0}".format(json_body))
client.write_points(json_body)

# print("Querying data: " + query)
result = client.query(query)

print("Result: {0}".format(result))

# print("Switch user: " + user)
# client.switch_user(user, password)

# print("Drop database: " + dbname)
# client.drop_database(dbname)


def parse_args():
"""Parse the args."""
parser = argparse.ArgumentParser(
description='example code to play with InfluxDB')
parser.add_argument('--host', type=str, required=False,
default=data[13].replace('\n', ''),
help='hostname of InfluxDB http API')
parser.add_argument('--port', type=int, required=False, default=data[16].replace('\n', ''),
help='port of InfluxDB http API')
return parser.parse_args()


if __name__ == '__main__':
args = parse_args()
main(host=args.host, port=args.port)

getData(channel)