Skip to content

Commit f53b253

Browse files
committed
feat: add functions wrapper
1 parent 2b6c448 commit f53b253

File tree

2 files changed

+152
-73
lines changed

2 files changed

+152
-73
lines changed

ingestion-manager.py

Lines changed: 0 additions & 73 deletions
This file was deleted.

ingestion_manager.py

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import asyncio
2+
import json
3+
import sys
4+
import time
5+
import socket
6+
import os
7+
import bs4
8+
import requests
9+
10+
API_KEY = os.getenv("API_KEY", None)
11+
COUNTRY_NAME = os.getenv("COUNTRY_NAME", "Italy")
12+
STATE_NAME = os.getenv("STATE_NAME", "Sicily")
13+
GPS_LAT = int(os.getenv("GPS_LAT", "37.500000"))
14+
GPS_LON = int(os.getenv("GPS_LON", "15.090278"))
15+
CITY_TO_SCAN = os.getenv("CITY_TO_SCAN", "Catania")
16+
DATA_ACTION = os.getenv("DATA_ACTION", "NEAREST_IP_CITY")
17+
18+
ALL_COUNTRIES_URL = f"http://api.airvisual.com/v2/countries?key={API_KEY}"
19+
ALL_STATES_BY_COUNTRY_URL = f"http://api.airvisual.com/v2/states?country={COUNTRY_NAME}&key={API_KEY}" # pylint: disable=line-too-long
20+
ALL_CITIES_BY_STATECOUNTRY_URL = f"http://api.airvisual.com/v2/cities?state={STATE_NAME}&country={COUNTRY_NAME}&key={API_KEY}"
21+
NEAREST_IP_CITY_URL = f"http://api.airvisual.com/v2/nearest_city?key={API_KEY}"
22+
NEAREST_GPS_CITY_URL = f"http://api.airvisual.com/v2/nearest_city?lat={GPS_LAT}&lon={GPS_LON}&key={API_KEY}"
23+
24+
def get_data() -> str:
25+
"""
26+
Retrieves weather data based on the specified action.
27+
28+
Returns:
29+
str: The response text containing weather data.
30+
"""
31+
match DATA_ACTION:
32+
case "ALL_COUNTRIES":
33+
return requests.get(f"{ALL_COUNTRIES_URL}", timeout=15).text
34+
case "ALL_STATES_BY_COUNTRY":
35+
return requests.get(f"{ALL_STATES_BY_COUNTRY_URL}", timeout=15).text
36+
case "ALL_CITIES_BY_STATECOUNTRY":
37+
return requests.get(f"{ALL_CITIES_BY_STATECOUNTRY_URL}", timeout=15).text
38+
case "NEAREST_IP_CITY":
39+
return requests.get(f"{NEAREST_IP_CITY_URL}", timeout=15).text
40+
case "NEAREST_GPS_CITY":
41+
return requests.get(f"{NEAREST_GPS_CITY_URL}", timeout=15).text
42+
43+
def send_to_logstash(host: str, port: int, data: dict):
44+
"""
45+
Sends data to Logstash for ingestion.
46+
47+
Args:
48+
host (str): The hostname or IP address of the Logstash server.
49+
port (int): The port number of the Logstash server.
50+
data (dict): The data to be sent to Logstash.
51+
"""
52+
error = True
53+
while error:
54+
try:
55+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
56+
print("Socket created. sock: " + str(sock))
57+
58+
sock.connect((host, port))
59+
print("Socket connected to HOST: " + host + " PORT: " + str(port))
60+
print("Socket connected. Sock: " + str(sock))
61+
62+
print("Sending message: " + str(data))
63+
64+
sock.sendall(json.dumps(data).encode())
65+
print("End connection")
66+
sock.close()
67+
error = False
68+
except socket.error:
69+
print("Connection error. There will be a new attempt in 10 seconds")
70+
time.sleep(10)
71+
72+
73+
74+
def extract_data(source: str) -> dict:
75+
"""
76+
Extracts relevant weather data from the HTML source.
77+
78+
Args:
79+
source (str): The HTML source containing weather information.
80+
81+
Returns:
82+
dict: A dictionary containing extracted weather data.
83+
"""
84+
soup=bs4.BeautifulSoup(source, 'lxml')
85+
86+
weather = soup.find("div", {"class": "current-conditions-card content-module non-ad"})
87+
88+
data = {
89+
'LocationTime': weather.find("p", {"class": "module-header sub date"}).next,
90+
'WeatherText': weather.find("div", {"class": "phrase"}).next,
91+
'Temperature': {
92+
'Unit': "C"
93+
},
94+
'RealFeelTemperature': {
95+
'Unit': "C"
96+
},
97+
'RealFeelTemperatureShade': {
98+
'Unit': "C"
99+
}
100+
}
101+
return data
102+
103+
def test_logstash() -> None:
104+
"""
105+
Checks if Logstash is ready for receiving data.
106+
"""
107+
while True:
108+
try:
109+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
110+
sock.connect(('logstash', 5000))
111+
sock.close()
112+
print("[ingestion_manager]Logstash is ready!")
113+
break
114+
except socket.error:
115+
print("[ingestion_manager]Logstash not ready, waiting...")
116+
time.sleep(5)
117+
continue
118+
119+
def check_api_key() -> bool:
120+
"""
121+
Checks if the API key is set.
122+
123+
Returns:
124+
bool: True if API key is set, False otherwise.
125+
"""
126+
return API_KEY is not None
127+
128+
129+
130+
async def main() -> None:
131+
"""
132+
The main asynchronous function for executing the script.
133+
"""
134+
if not check_api_key():
135+
print ("[ingestion_manager] API_KEY environment variable not set!!")
136+
sys.exit()
137+
print("continuo")
138+
139+
data_raw = get_data()
140+
141+
extract_data(data_raw)
142+
143+
144+
145+
146+
147+
148+
if __name__ == '__main__':
149+
try:
150+
asyncio.run(main())
151+
except KeyboardInterrupt:
152+
print("[ingestion-manager]Program exited")

0 commit comments

Comments
 (0)