Skip to content

Commit a7b64f4

Browse files
committed
add clean old data schedule task
1 parent 5211355 commit a7b64f4

File tree

4 files changed

+128
-78
lines changed

4 files changed

+128
-78
lines changed

scheduler/app/faas_scheduler/models.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ def __init__(
5757
self.operate_from = operate_from
5858
self.state = state
5959

60+
def get_info(self):
61+
return {
62+
'id': self.id,
63+
'org_id': self.org_id,
64+
'owner': self.owner,
65+
'dtable_uuid': self.dtable_uuid,
66+
'script_name': self.script_name
67+
}
68+
6069
def to_dict(self):
6170
from faas_scheduler.utils import datetime_to_isoformat_timestr
6271

scheduler/app/flask_server.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ def script_api(script_id):
131131
script = get_script(db_session, script_id)
132132
if not script:
133133
return make_response(("Not found", 404))
134-
if dtable_uuid != script.dtable_uuid or script_name != script.script_name:
134+
if uuid_str_to_32_chars(dtable_uuid) != script.dtable_uuid or script_name != script.script_name:
135135
return make_response(("Bad request", 400))
136136

137137
if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int):
@@ -277,21 +277,19 @@ def record_script_result():
277277
output = data.get("output")
278278
spend_time = data.get("spend_time")
279279
script_id = data.get("script_id")
280-
281-
db_session = DBSession()
282-
283280
# update script_log and run-time statistics
284281
try:
285282
if script_id:
286-
hook_update_script(
287-
db_session, script_id, success, return_code, output, spend_time
283+
# hook_update_script(
284+
# db_session, script_id, success, return_code, output, spend_time
285+
# )
286+
scheduler.script_done_callback(
287+
script_id, success, return_code, output, spend_time
288288
)
289289

290290
except Exception as e:
291291
logger.exception(e)
292292
return make_response(("Internal server error", 500))
293-
finally:
294-
db_session.close()
295293

296294
return "success"
297295

@@ -376,5 +374,5 @@ def base_run_python_statistics():
376374

377375
if __name__ == "__main__":
378376
scheduler.start()
379-
http_server = WSGIServer(("127.0.0.1", 5055), app)
377+
http_server = WSGIServer(("0.0.0.0", 5055), app)
380378
http_server.serve_forever()

scheduler/app/scheduler.py

Lines changed: 112 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,23 @@
22
import logging
33
import os
44
import time
5+
from datetime import datetime
56
from threading import Lock, Thread
67

78
from database import DBSession
89
from faas_scheduler.models import ScriptLog
910
from faas_scheduler.utils import (
1011
add_script,
12+
delete_log_after_days,
13+
delete_statistics_after_days,
1114
run_script,
1215
get_script_file,
1316
hook_update_script
1417
)
1518

1619
logger = logging.getLogger(__name__)
20+
SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15))
21+
TIMEOUT_OUTPUT = "Script running for too long time!"
1722

1823

1924
class ScriptQueue:
@@ -68,6 +73,7 @@ def add_script_log(self, script_log: ScriptLog):
6873
with self.lock:
6974
self.q.append(script_log)
7075
self.script_logs_dict[script_log.id] = script_log
76+
self.inspect_queue_and_running(pre_msg=f'add script {script_log.get_info()} to queue')
7177

7278
def get(self):
7379
"""get the first valid task from self.q
@@ -83,25 +89,83 @@ def get(self):
8389
if self.can_run_script(script_log):
8490
return_task = script_log
8591
self.q.pop(index)
92+
self.increase_running(script_log)
93+
self.inspect_queue_and_running(pre_msg=f'get script {script_log.get_info()} from queue')
8694
break
8795
index += 1
8896

8997
return return_task
9098

99+
def increase_running(self, script_log):
100+
if script_log.org_id != -1:
101+
running_team_key = f'{script_log.org_id}'
102+
else:
103+
running_team_key = f'{script_log.owner}'
104+
running_base_key = f'{running_team_key}_{script_log.dtable_uuid}'
105+
running_script_key = f'{running_base_key}_{script_log.script_name}'
106+
self.running_count[running_team_key] = self.running_count[running_team_key] + 1 if self.running_count.get(running_team_key) else 1
107+
self.running_count[running_base_key] = self.running_count[running_base_key] + 1 if self.running_count.get(running_base_key) else 1
108+
self.running_count[running_script_key] = self.running_count[running_script_key] + 1 if self.running_count.get(running_script_key) else 1
109+
110+
def decrease_running(self, script_log):
111+
if script_log.org_id != -1:
112+
running_team_key = f'{script_log.org_id}'
113+
else:
114+
running_team_key = f'{script_log.owner}'
115+
running_base_key = f'{running_team_key}_{script_log.dtable_uuid}'
116+
running_script_key = f'{running_base_key}_{script_log.script_name}'
117+
118+
if running_team_key in self.running_count:
119+
self.running_count[running_team_key] -= 1
120+
if not self.running_count.get(running_team_key):
121+
self.running_count.pop(running_team_key, None)
122+
123+
if running_base_key in self.running_count:
124+
self.running_count[running_base_key] -= 1
125+
if not self.running_count.get(running_base_key):
126+
self.running_count.pop(running_base_key, None)
127+
128+
if running_script_key in self.running_count:
129+
self.running_count[running_script_key] -= 1
130+
if not self.running_count.get(running_script_key):
131+
self.running_count.pop(running_script_key, None)
132+
91133
def script_done_callback(self, script_log: ScriptLog):
92134
with self.lock:
93-
if script_log.org_id != -1:
94-
running_team_key = f'{script_log.org_id}'
95-
else:
96-
running_team_key = f'{script_log.owner}'
97-
running_base_key = f'{running_team_key}_{script_log.dtable_uuid}'
98-
running_script_key = f'{running_base_key}_{script_log.script_name}'
99-
if running_team_key in self.running_count:
100-
self.running_count[running_team_key] -= 1
101-
if running_base_key in self.running_count:
102-
self.running_count[running_base_key] -= 1
103-
if running_script_key in self.running_count:
104-
self.running_count[running_script_key] -= 1
135+
self.script_logs_dict.pop(script_log.id, None)
136+
self.decrease_running(script_log)
137+
self.inspect_queue_and_running(pre_msg=f'script {script_log.get_info()} run done')
138+
139+
def inspect_queue_and_running(self, pre_msg=None):
140+
if logger.root.level != logging.DEBUG:
141+
return
142+
lines = ['\n']
143+
if pre_msg:
144+
lines.append(pre_msg)
145+
lines.append(f"{'>' * 10} running {'>' * 10}")
146+
for key, value in self.running_count.items():
147+
lines.append(f'{key}: {value}')
148+
lines.append(f"{'<' * 10} running {'<' * 10}")
149+
150+
lines.append(f"{'>' * 10} queue {'>' * 10}")
151+
for script_log in self.q:
152+
lines.append(f"org_id: {script_log.org_id} owner: {script_log.owner} dtable_uuid: {script_log.dtable_uuid} script_name: {script_log.script_name}")
153+
lines.append(f"{'<' * 10} queue {'<' * 10}")
154+
logger.debug('\n'.join(lines))
155+
156+
def get_script_log_by_id(self, script_id):
157+
return self.script_logs_dict.get(script_id)
158+
159+
def get_timeout_scripts(self):
160+
script_logs = []
161+
now_time = datetime.now()
162+
with self.lock:
163+
for index in range(len(self.q) - 1, -1, -1):
164+
script_log = self.q[index]
165+
if (now_time - script_log.started_at).seconds >= SUB_PROCESS_TIMEOUT:
166+
script_logs.append(self.q.pop(index))
167+
self.script_logs_dict.pop(script_log.id, None)
168+
return script_logs
105169

106170

107171
class Scheduelr:
@@ -157,7 +221,6 @@ def script_done_callback(
157221
output,
158222
spend_time
159223
):
160-
script_log = self.script_queue.script_logs_dict.pop(script_id)
161224
hook_update_script(
162225
DBSession(),
163226
script_id,
@@ -166,8 +229,10 @@ def script_done_callback(
166229
output,
167230
spend_time
168231
)
232+
script_log = self.script_queue.get_script_log_by_id(script_id)
169233
if not script_log: # not counted in memory, only update db record
170234
return
235+
self.script_queue.script_done_callback(script_log)
171236

172237
def load_pending_script_logs(self):
173238
"""load pending script logs, should be called only when server start
@@ -176,9 +241,43 @@ def load_pending_script_logs(self):
176241
for script_log in script_logs:
177242
self.script_queue.add_script_log(script_log)
178243

244+
def timeout_setter(self):
245+
while True:
246+
db_session = DBSession()
247+
now_time = datetime.now()
248+
try:
249+
script_logs = self.script_queue.get_timeout_scripts()
250+
if script_logs:
251+
db_session.query(ScriptLog).filter(ScriptLog.id.in_([script_log.id for script_log in script_logs])).update(
252+
{
253+
ScriptLog.state: ScriptLog.FINISHED,
254+
ScriptLog.finished_at: now_time,
255+
ScriptLog.success: False,
256+
ScriptLog.output: TIMEOUT_OUTPUT,
257+
ScriptLog.return_code: -1
258+
}
259+
)
260+
except Exception as e:
261+
logger.exception(e)
262+
finally:
263+
DBSession.remove()
264+
time.sleep(60)
265+
266+
def statistic_cleaner(self):
267+
while True:
268+
db_session = DBSession()
269+
try:
270+
delete_log_after_days(db_session)
271+
delete_statistics_after_days(db_session)
272+
except Exception as e:
273+
logger.exception(e)
274+
time.sleep(24 * 60 * 60)
275+
179276
def start(self):
180277
self.load_pending_script_logs()
181278
Thread(target=self.schedule, daemon=True).start()
279+
Thread(target=self.statistic_cleaner, daemon=True).start()
280+
Thread(target=self.timeout_setter, daemon=True).start()
182281

183282

184283
scheduler = Scheduelr()

scheduler/app/timeout_setter.py

Lines changed: 0 additions & 56 deletions
This file was deleted.

0 commit comments

Comments
 (0)