-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathautodns.py
More file actions
207 lines (171 loc) · 7.53 KB
/
autodns.py
File metadata and controls
207 lines (171 loc) · 7.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""
DNS Management Tool for Cloudflare
This script automates the management of DNS records for domains hosted on Cloudflare, enabling direct interactions from the command line or through a Flask-based web interface. It uses environment variables for configuration, supports operations such as creating, updating, and deleting DNS A records based on GUID mappings, and features automatic IP detection for incoming web requests. The tool also includes a configurable notification system for operation alerts.
Features:
- CLI for DNS record management.
- Flask web server for DNS updates via web requests.
- GUID to DNS A record mapping for secure record management.
- Automatic IP detection from web requests, supporting both direct and proxied connections.
- Configurable notification system using Apprise.
Usage:
- For CLI operations: `python autodns.py generate <subdomain>`
- To run the Flask server: `python autodns.py`, then access `http://localhost:<port>/update-dns?guid=<GUID>`
Configuration:
- Set Cloudflare API credentials (`CF_ZONE_ID`, `CF_API_TOKEN`), notification service URLs (`APPRISE_URLS`), enable notifications (`ENABLE_NOTIFICATIONS`), and optionally the Flask server port (`FLASK_RUN_PORT`) via environment variables.
Dependencies:
- Flask, Apprise, requests, hashlib, datetime, and standard Python libraries.
"""
import argparse
import hashlib
import json
import os
import requests
import sys
from datetime import datetime, timedelta, timezone
from flask import Flask, request, jsonify
import apprise
# Constants
RATE_LIMIT_MINUTES = 10 # Time between updates in minutes
app = Flask(__name__)
# Environment Variables
CF_ZONE_ID = os.getenv("CF_ZONE_ID")
CF_API_TOKEN = os.getenv("CF_API_TOKEN")
CF_API_URL_BASE = f"https://api.cloudflare.com/client/v4/zones/{CF_ZONE_ID}/dns_records"
ENABLE_NOTIFICATIONS = os.getenv("ENABLE_NOTIFICATIONS", "false").lower() in [
"true",
"1",
"t",
]
APPRISE_URLS = os.getenv("APPRISE_URLS", "").split(",")
TRUSTED_PROXIES = {
ip.strip() for ip in os.getenv("TRUSTED_PROXIES", "").split(",") if ip.strip()
}
MAPPING_FILE = "/config/guid_mapping.json"
def get_client_ip():
"""Extract the real client IP from the request.
Only trusts X-Forwarded-For when the immediate connection comes from a
configured trusted proxy. Otherwise falls back to remote_addr.
"""
if TRUSTED_PROXIES and request.remote_addr in TRUSTED_PROXIES:
forwarded = request.headers.get("X-Forwarded-For", "")
if forwarded:
return forwarded.split(",")[0].strip()
return request.remote_addr
def load_guid_mapping():
"""Load GUID to A record mapping and last update timestamps from a JSON file."""
try:
with open(MAPPING_FILE) as file:
return json.load(file)
except FileNotFoundError:
return {}
except json.JSONDecodeError:
sys.exit("Error decoding JSON from the mapping file.")
except Exception as e:
sys.exit(f"Error loading GUID mapping: {e}")
def save_guid_mapping(mapping):
"""Save the updated GUID mapping and timestamps back to the JSON file."""
try:
with open(MAPPING_FILE, "w") as file:
json.dump(mapping, file, indent=4)
except Exception as e:
sys.exit(f"Error saving GUID mapping: {e}")
def generate_guid(subdomain):
"""Generate a 64-character SHA-256 hash GUID based on subdomain and current time."""
unique_string = f"{subdomain}"
return hashlib.sha256(unique_string.encode()).hexdigest()
def is_update_allowed(guid):
"""Check if the update is allowed based on the last updated timestamp."""
mapping = load_guid_mapping()
if guid not in mapping:
return True
last_update = datetime.fromisoformat(mapping[guid]["lastUpdated"])
return datetime.now(timezone.utc) - last_update > timedelta(minutes=RATE_LIMIT_MINUTES)
def send_notification(message):
"""Send notification using Apprise if notifications are enabled and configured."""
if not ENABLE_NOTIFICATIONS or not APPRISE_URLS:
print("Notifications are disabled or not configured.")
return
notification = apprise.Apprise()
for url in APPRISE_URLS:
if url:
notification.add(url)
success = notification.notify(body=message)
if success:
print("Notification sent successfully.")
else:
print("Failed to send notification.")
@app.route("/update-dns", methods=["GET"])
def update_dns_web():
"""Web endpoint to update DNS record based on GUID and detected IP."""
guid = request.args.get("guid")
if not guid:
return jsonify({"error": "GUID parameter is missing"}), 400
mapping = load_guid_mapping()
if guid not in mapping or not is_update_allowed(guid):
return jsonify({"error": "Update not allowed or unknown GUID"}), 429
new_ip = get_client_ip()
headers = {
"Authorization": f"Bearer {CF_API_TOKEN}",
"Content-Type": "application/json",
}
dns_record = mapping[guid]["subdomain"]
data = {"type": "A", "name": dns_record, "content": new_ip, "ttl": 1}
response = requests.get(
f"{CF_API_URL_BASE}?name={dns_record}&type=A", headers=headers, timeout=30
)
if response.status_code == 200 and response.json()["result"]:
dns_record_id = response.json()["result"][0]["id"]
update_response = requests.put(
f"{CF_API_URL_BASE}/{dns_record_id}", headers=headers, json=data, timeout=30
)
if update_response.status_code == 200:
mapping[guid]["lastUpdated"] = datetime.now(timezone.utc).isoformat()
save_guid_mapping(mapping)
send_notification(
f"DNS record for {dns_record} updated successfully to {new_ip}."
)
return jsonify({"success": True, "message": "DNS record updated."})
else:
return jsonify({"error": "Failed to update DNS record."}), 500
else:
return jsonify(
{"error": "DNS record does not exist or Cloudflare API error."}
), 404
def parse_arguments():
"""Parse command line arguments for the DNS management script."""
parser = argparse.ArgumentParser(
description="Manage DNS records via Cloudflare API."
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")
generate_parser = subparsers.add_parser(
"generate", help="Generate a new GUID for a subdomain"
)
generate_parser.add_argument(
"subdomain", type=str, help="Subdomain for which to generate a GUID"
)
generate_parser.set_defaults(func=handle_generate_command)
return parser.parse_args()
def handle_generate_command(args):
"""Handle the 'generate' command to create a new GUID for a subdomain."""
subdomain = args.subdomain
guid = generate_guid(subdomain)
mapping = load_guid_mapping()
if subdomain in [m["subdomain"] for m in mapping.values()]:
print(f"Subdomain {subdomain} already has a GUID assigned.")
return
mapping[guid] = {"subdomain": subdomain, "lastUpdated": datetime.now(timezone.utc).isoformat()}
save_guid_mapping(mapping)
print(f"Generated GUID for {subdomain}: {guid}")
send_notification(f"Generated new GUID for {subdomain}.")
def main():
"""Main function to run the Flask app or handle CLI commands."""
args = parse_arguments()
if hasattr(args, "func"):
args.func(args)
else:
# Run autodns daemon by default
port = int(os.getenv("AUTODNS_PORT", "5000"))
listen_on = os.getenv("AUTODNS_HOST", "0.0.0.0")
app.run(host=listen_on, port=port)
if __name__ == "__main__":
main()