diff --git a/README.md b/README.md index 87c15f83..830ba3da 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,9 @@ The beautiful network analyzer, mapper, and monitor. 1. `sudo bash install.sh` - this will walk you through the setup needed for Auth0 information. 2. If you are running docker as non-root, then remove the top section from `install.sh` and re-run. +## Redis notes +Redis is also going to be used a write cache for incoming metrics. This way, the load on the metric database server will be greatly reduced. We can tune the time to write the metrics as well. + ## Database Notes ### Attempt 2 diff --git a/backend/alive.py b/backend/alive.py index 97493f6c..e82b7c96 100644 --- a/backend/alive.py +++ b/backend/alive.py @@ -78,7 +78,11 @@ def check_all_hosts(): if not result: watcher.send_alert( - "Check Alive", alive_type, host_name, summary=summary + "Check Alive", + alive_type, + host_name, + summary=summary, + severity="warning" ) diff --git a/backend/serve.py b/backend/serve.py index f3c3a7b8..67dae3b6 100755 --- a/backend/serve.py +++ b/backend/serve.py @@ -32,6 +32,7 @@ from werkzeug.utils import secure_filename from flask_cors import CORS from PIL import Image +from pid import PidFile import ansible_helper @@ -1743,6 +1744,42 @@ def insert_metric(inp=""): return "Invalid data", 421 for item in data["metrics"]: + + if "tags" in item and "name" in item: + + a = redis.Redis(host=os.environ.get("REDIS_HOST") or "redis") + + invalid_names = ("agent_name") + parsed_tags = {x : item["tags"][x] for x in item["tags"] if x not in invalid_names} + + name = json.dumps({"name" : item["name"], "tags" : parsed_tags}, default=str) + a.set(f"METRIC-{name}", json.dumps(item, default=str)) + + return "Success", 200 + +@app.route("/bulk_insert/", methods=["GET"]) +@requires_auth_write +def bulk_insert(): + """ + Bulk insert of the redis entries into mongo + - Can be called manually or periodically + """ + metrics_latest_updates = [] + metrics_updates = [] + + a = redis.Redis(host=os.environ.get("REDIS_HOST") or "redis") + + # Find all metrics from Redis + metrics = a.keys(pattern="METRIC-*") + for metric in metrics: + print("Inserting", metric) + item = json.loads(a.get(metric)) + print("Item:", item) + last_time = a.get("last_metric_{}".format(item["tags"]["ip"])) + + if "tags" not in item or "name" not in item: + continue + if "timestamp" in item: try: # item["timestamp"] = datetime.datetime.fromtimestamp(item["timestamp"]) @@ -1750,32 +1787,63 @@ def insert_metric(inp=""): except Exception: print("Problem with timestamp - ", sys.exc_info()) - if "tags" in item and "name" in item: - a = redis.Redis(host=os.environ.get("REDIS_HOST") or "redis") - last_time = a.get("last_metric_{}".format(item["tags"]["ip"])) - - if type(item["tags"]) == type({}): - item["tags"]["labyrinth_name"] = item["name"] - item["tags"]["agent_name"] = socket.gethostname() - try: - if last_time and (time.time() - float(last_time)) <= 15: - pass - else: - mongo_client["labyrinth"]["metrics-latest"].replace_one( - {"tags": item["tags"], "name": item["name"]}, item, upsert=True - ) - except Exception: - raise Exception(item) + if type(item["tags"]) == type({}): + item["tags"]["labyrinth_name"] = item["name"] + item["tags"]["agent_name"] = socket.gethostname() - if last_time and (time.time() - float(last_time)) <= 120: + try: + if last_time and (time.time() - float(last_time)) <= 15: pass else: - mongo_client["labyrinth"]["metrics"].insert_one(item) + """ + mongo_client["labyrinth"]["metrics-latest"].replace_one( + {"tags": item["tags"], "name": item["name"]}, item, upsert=True + ) + """ + replacements = { + "name" : item["name"] + } + tags = ("agent_name") + for tag in [x for x in item["tags"] if x not in tags]: + replacements[f"tags.{tag}"] = item["tags"][tag] + + + + metrics_latest_updates.append( + pymongo.ReplaceOne( + replacements, item, upsert=True + ) + ) + except Exception: + raise Exception(item) + + if last_time and (time.time() - float(last_time)) <= 120: + pass + else: - a.set("last_metric_{}".format(item["tags"]["ip"]), time.time()) + """ + mongo_client["labyrinth"]["metrics"].insert_one(item) + """ + metrics_updates.append( + pymongo.InsertOne( + item + ) + ) + + a.set("last_metric_{}".format(item["tags"]["ip"]), time.time()) + + # Bulk writes + if metrics_latest_updates: + mongo_client["labyrinth"]["metrics-latest"].bulk_write(metrics_latest_updates) + + if metrics_updates: + mongo_client["labyrinth"]["metrics"].bulk_write(metrics_updates) + + + + return len(metrics), 200 - return "Success", 200 if __name__ == "__main__": # pragma: no cover @@ -1784,6 +1852,9 @@ def insert_metric(inp=""): if len(sys.argv) > 1 and sys.argv[1] == "watcher": unwrap(dashboard)(report=True) + elif len(sys.argv) > 1 and sys.argv[1] == "updater": + with PidFile("labyrinth-bulk-insert") as p: + unwrap(bulk_insert)() else: app.debug = True app.config["ENV"] = "development" diff --git a/backend/test/test_03_serve.py b/backend/test/test_03_serve.py index b5ac7cb3..3bc12524 100644 --- a/backend/test/test_03_serve.py +++ b/backend/test/test_03_serve.py @@ -7,6 +7,8 @@ import pytest import serve +import redis + from common.test import unwrap, delete_keys_recursive @@ -22,6 +24,10 @@ def tearDown(): serve.mongo_client["labyrinth"]["settings"].delete_many({}) serve.mongo_client["labyrinth"]["metrics"].delete_many({}) + a = redis.Redis(host=os.environ.get("REDIS_HOST")) + for key in a.keys(pattern="METRIC-*"): + a.delete(key) + @pytest.fixture def setup(): @@ -618,7 +624,7 @@ def test_insert_metric(setup): "diskio": 884284, }, "name": "check_hd", - "tags": {"host": "00-00-00-00-01", "ip": "172.19.0.6"}, + "tags": {"host": "00-00-00-00-01", "ip": "172.19.0.6", "random-tag" : 5}, "timestamp": 1625683390, }, ] @@ -632,6 +638,9 @@ def test_insert_metric(setup): a = unwrap(serve.insert_metric)(sample_data) assert a[1] == 200 + # NOTE: We are changing to redis write cache layer + + """ b = serve.mongo_client["labyrinth"]["metrics-latest"].find({}) c = [x for x in b] assert len(c) == 1 @@ -645,6 +654,69 @@ def test_insert_metric(setup): ][0][item].replace(microsecond=0, second=0) else: assert c[0][item] == sample_data["metrics"][0][item] + """ + a = redis.Redis(host=os.environ.get("REDIS_HOST")) + + parsed_tags = {x : sample_data["metrics"][0]["tags"][x] for x in sample_data["metrics"][0]["tags"] if x != "random-tag"} + + b = json.dumps({ + "name" : sample_data["metrics"][0]["name"], + "tags" : parsed_tags + }, default=str) + print("test key:", b) + c = json.loads(a.get(f"METRIC-{b}")) + print(c) + del c["timestamp"] + del sample_data["metrics"][0]["timestamp"] + assert c == sample_data["metrics"][0] + return sample_data + +def test_redis_bulk_insert(setup): + """ + Test for Redis bulk insert of metrics + - NOTE: I specifically do not use the dashboard to read from the Redis server directly, since we may need remote agents as well (who would not have access to this Redis instance) + """ + sample_data = { + "metrics": [ + { + "fields": { + "boot_time": 1625587759, + "context_switches": 4143261228, + "entropy_avail": 3760, + "interrupts": 1578002983, + "diskio": 884284, + }, + "name": "check_hd", + "tags": {"host": "00-00-00-00-01", "ip": "172.19.0.6"}, + "timestamp": 1625683390, + }, + ] + } + a = unwrap(serve.insert_metric)(sample_data) + assert a[1] == 200 + + sample_data["metrics"][0]["tags"]["new_tag"] = 7 + a = unwrap(serve.insert_metric)(sample_data) + assert a[1] == 200 + + sample_data["metrics"][0]["tags"]["new_tag"] = 234 + a = unwrap(serve.insert_metric)(sample_data) + assert a[1] == 200 + + + + # Check the state of the metrics-latest beforehand + b = serve.mongo_client["labyrinth"]["metrics-latest"].find({}) + current_length = len(list(b)) + + + a = unwrap(serve.bulk_insert)() + assert a[1] == 200 + + + b = serve.mongo_client["labyrinth"]["metrics-latest"].find({}) + c = [True for x in b if "tags" in x and x["tags"]["host"] == "00-00-00-00-01"] + assert len(c) == 3 def test_list_dashboard(setup): diff --git a/cron/bulk_write.sh b/cron/bulk_write.sh new file mode 100644 index 00000000..bac34885 --- /dev/null +++ b/cron/bulk_write.sh @@ -0,0 +1,10 @@ +#!/bin/sh +cd /src +if [ -f .env ]; then + set -a; + source .env; + set +a; +fi + +# MONGO_HOST=mongo REDIS_HOST=redis MONGO_USERNAME=root MONGO_PASSWORD=temp python3 serve.py watcher 2>&1 +python3 serve.py updater 2>&1 \ No newline at end of file diff --git a/cron/cron.d/crontab b/cron/cron.d/crontab index 8ca93ff4..a32c66d6 100644 --- a/cron/cron.d/crontab +++ b/cron/cron.d/crontab @@ -1,4 +1,5 @@ * * * * * /bin/bash /cron/run.sh > /var/log/finder +* * * * * /bin/bash /cron/bulk_write.sh > /var/log/bulk_write */10 * * * * /bin/bash /cron/watcher.sh > /var/log/watcher */2 * * * * /bin/bash /cron/alive.sh > /var/log/alive