Skip to content

Commit 2de9748

Browse files
committed
add created_at column in script_log
1 parent 2e3f7f4 commit 2de9748

File tree

5 files changed

+83
-83
lines changed

5 files changed

+83
-83
lines changed

scheduler/app/faas_scheduler/models.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class ScriptLog(Base):
3232
output = Column(Text, nullable=True)
3333
operate_from = Column(String(255))
3434
state = Column(String(10))
35+
created_at = Column(DateTime, index=True)
3536

3637
PENDING = "pending"
3738
RUNNING = "running"
@@ -44,17 +45,17 @@ def __init__(
4445
org_id,
4546
script_name,
4647
context_data,
47-
started_at,
4848
state,
49+
created_at,
4950
operate_from=None,
5051
):
5152
self.dtable_uuid = dtable_uuid
5253
self.owner = owner
5354
self.org_id = org_id
5455
self.script_name = script_name
5556
self.context_data = context_data
56-
self.started_at = started_at
5757
self.state = state
58+
self.created_at = created_at
5859
self.operate_from = operate_from
5960

6061
def get_info(self):
@@ -77,14 +78,17 @@ def to_dict(self):
7778
"context_data": (
7879
json.loads(self.context_data) if self.context_data else None
7980
),
80-
"started_at": datetime_to_isoformat_timestr(self.started_at),
81+
"started_at": self.started_at
82+
and datetime_to_isoformat_timestr(self.started_at),
8183
"finished_at": self.finished_at
8284
and datetime_to_isoformat_timestr(self.finished_at),
8385
"success": self.success,
8486
"return_code": self.return_code,
8587
"output": self.output,
8688
"operate_from": self.operate_from,
8789
"state": self.state,
90+
"created_at": self.created_at
91+
and datetime_to_isoformat_timestr(self.created_at),
8892
}
8993

9094

scheduler/app/faas_scheduler/utils.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import json
33
import logging
44
import requests
5-
from datetime import datetime
5+
from datetime import datetime, timedelta
66
from uuid import UUID
77

88
from tzlocal import get_localzone
@@ -380,8 +380,8 @@ def add_script(
380380
org_id,
381381
script_name,
382382
context_data,
383-
datetime.now(),
384383
ScriptLog.PENDING,
384+
datetime.now(),
385385
operate_from,
386386
)
387387
db_session.add(script)
@@ -390,8 +390,11 @@ def add_script(
390390
return script
391391

392392

393-
def update_script(db_session, script, success, return_code, output):
394-
script.finished_at = datetime.now()
393+
def update_script(
394+
db_session, script, success, return_code, output, started_at, finished_at
395+
):
396+
script.started_at = started_at
397+
script.finished_at = finished_at
395398
script.success = success
396399
script.return_code = return_code
397400
script.output = output
@@ -423,10 +426,15 @@ def run_script(
423426
return True
424427

425428

426-
def hook_update_script(db_session, script_id, success, return_code, output, spend_time):
429+
def hook_update_script(
430+
db_session, script_id, success, return_code, output, started_at, spend_time
431+
):
427432
script = db_session.query(ScriptLog).filter_by(id=script_id).first()
428433
if script:
429-
update_script(db_session, script, success, return_code, output)
434+
finished_at = started_at + timedelta(seconds=spend_time)
435+
update_script(
436+
db_session, script, success, return_code, output, started_at, finished_at
437+
)
430438
update_statistics(
431439
db_session, script.dtable_uuid, script.owner, script.org_id, spend_time
432440
)

scheduler/app/flask_server.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
check_auth_token,
1515
get_script,
1616
get_run_script_statistics_by_month,
17-
hook_update_script,
1817
can_run_task,
1918
get_run_scripts_count_monthly,
2019
ping_starter,
@@ -89,7 +88,7 @@ def scripts_api():
8988
owner, org_id, db_session, scripts_running_limit=scripts_running_limit
9089
):
9190
return make_response(("The number of runs exceeds the limit"), 400)
92-
script_log = scheduler.add_script_log(
91+
script_log = scheduler.add(
9392
uuid_str_to_32_chars(dtable_uuid),
9493
org_id,
9594
owner,
@@ -137,23 +136,21 @@ def script_api(script_id):
137136
):
138137
return make_response(("Bad request", 400))
139138

140-
if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int):
141-
now = datetime.now()
142-
duration_seconds = (now - script.started_at).seconds
143-
if duration_seconds > SUB_PROCESS_TIMEOUT:
144-
script.success = False
145-
script.return_code = -1
146-
script.finished_at = now
147-
script.output = TIMEOUT_OUTPUT
148-
db_session.commit()
139+
# if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int):
140+
# now = datetime.now()
141+
# duration_seconds = (now - script.created_at).seconds
142+
# if duration_seconds > SUB_PROCESS_TIMEOUT:
143+
# script.success = False
144+
# script.return_code = -1
145+
# script.finished_at = now
146+
# script.output = TIMEOUT_OUTPUT
147+
# db_session.commit()
149148

150149
return make_response(({"script": script.to_dict()}, 200))
151150

152151
except Exception as e:
153152
logger.exception(e)
154153
return make_response(("Internal server error", 500))
155-
finally:
156-
db_session.close()
157154

158155

159156
# get python script statistics logs...
@@ -278,16 +275,14 @@ def record_script_result():
278275
success = data.get("success", False)
279276
return_code = data.get("return_code")
280277
output = data.get("output")
278+
started_at = datetime.fromisoformat(data.get("started_at"))
281279
spend_time = data.get("spend_time")
282280
script_id = data.get("script_id")
283281
# update script_log and run-time statistics
284282
try:
285283
if script_id:
286-
# hook_update_script(
287-
# db_session, script_id, success, return_code, output, spend_time
288-
# )
289284
scheduler.script_done_callback(
290-
script_id, success, return_code, output, spend_time
285+
script_id, success, return_code, output, started_at, spend_time
291286
)
292287

293288
except Exception as e:

scheduler/app/scheduler.py

Lines changed: 44 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
import os
44
import time
5-
from datetime import datetime
5+
from datetime import datetime, timedelta
66
from threading import Lock, Thread
77

88
from database import DBSession
@@ -24,8 +24,10 @@
2424
class ScriptQueue:
2525

2626
def __init__(self):
27-
self.q = [] # a list of ScriptLog
28-
self.script_logs_dict = {} # a dict of {id: ScriptLog}
27+
self.script_queue = (
28+
[]
29+
) # a list of ScriptLog instances, but can not be used to update database records!!!
30+
self.script_dict = {} # a dict of {id: ScriptLog}
2931
self.lock = Lock()
3032
self.running_count = {}
3133
# a dict of
@@ -75,10 +77,10 @@ def can_run_script(self, script_log: ScriptLog):
7577

7678
return True
7779

78-
def add_script_log(self, script_log: ScriptLog):
80+
def add(self, script_log: ScriptLog):
7981
with self.lock:
80-
self.q.append(script_log)
81-
self.script_logs_dict[script_log.id] = script_log
82+
self.script_queue.append(script_log)
83+
self.script_dict[script_log.id] = script_log
8284
self.inspect_queue_and_running(
8385
pre_msg=f"add script {script_log.get_info()} to queue"
8486
)
@@ -92,11 +94,11 @@ def get(self):
9294
return_task = None
9395

9496
index = 0
95-
while index < len(self.q):
96-
script_log = self.q[index]
97+
while index < len(self.script_queue):
98+
script_log = self.script_queue[index]
9799
if self.can_run_script(script_log):
98100
return_task = script_log
99-
self.q.pop(index)
101+
self.script_queue.pop(index)
100102
self.increase_running(script_log)
101103
self.inspect_queue_and_running(
102104
pre_msg=f"get script {script_log.get_info()} from queue"
@@ -154,7 +156,7 @@ def decrease_running(self, script_log):
154156

155157
def script_done_callback(self, script_log: ScriptLog):
156158
with self.lock:
157-
self.script_logs_dict.pop(script_log.id, None)
159+
self.script_dict.pop(script_log.id, None)
158160
self.decrease_running(script_log)
159161
self.inspect_queue_and_running(
160162
pre_msg=f"script {script_log.get_info()} run done"
@@ -172,25 +174,32 @@ def inspect_queue_and_running(self, pre_msg=None):
172174
lines.append(f"{'<' * 10} running {'<' * 10}")
173175

174176
lines.append(f"{'>' * 10} queue {'>' * 10}")
175-
for script_log in self.q:
177+
for script_log in self.script_queue:
176178
lines.append(
177179
f"org_id: {script_log.org_id} owner: {script_log.owner} dtable_uuid: {script_log.dtable_uuid} script_name: {script_log.script_name}"
178180
)
179181
lines.append(f"{'<' * 10} queue {'<' * 10}")
180182
logger.debug("\n".join(lines))
181183

182184
def get_script_log_by_id(self, script_id):
183-
return self.script_logs_dict.get(script_id)
185+
return self.script_dict.get(script_id)
184186

185-
def get_timeout_scripts(self):
187+
def pop_timeout_scripts(self):
186188
script_logs = []
187189
now_time = datetime.now()
188190
with self.lock:
189-
for index in range(len(self.q) - 1, -1, -1):
190-
script_log = self.q[index]
191-
if (now_time - script_log.started_at).seconds >= SUB_PROCESS_TIMEOUT:
192-
script_logs.append(self.q.pop(index))
193-
self.script_logs_dict.pop(script_log.id, None)
191+
for index in range(len(self.script_queue) - 1, -1, -1):
192+
script_log = self.script_queue[index]
193+
if (
194+
script_log.state == ScriptLog.RUNNING
195+
and (now_time - script_log.started_at).seconds
196+
>= SUB_PROCESS_TIMEOUT
197+
):
198+
script_logs.append(self.script_queue.pop(index))
199+
self.decrease_running(script_log)
200+
self.inspect_queue_and_running(
201+
pre_msg=f"set script {script_log.get_info()} timeout from queue"
202+
)
194203
return script_logs
195204

196205

@@ -199,9 +208,7 @@ class Scheduelr:
199208
def __init__(self):
200209
self.script_queue = ScriptQueue()
201210

202-
def add_script_log(
203-
self, dtable_uuid, org_id, owner, script_name, context_data, operate_from
204-
):
211+
def add(self, dtable_uuid, org_id, owner, script_name, context_data, operate_from):
205212
script_log = add_script(
206213
DBSession(),
207214
dtable_uuid,
@@ -211,7 +218,7 @@ def add_script_log(
211218
context_data,
212219
operate_from,
213220
)
214-
self.script_queue.add_script_log(script_log)
221+
self.script_queue.add(script_log)
215222
return script_log
216223

217224
def schedule(self):
@@ -225,7 +232,11 @@ def schedule(self):
225232
db_session.query(ScriptLog).filter(
226233
ScriptLog.id == script_log.id
227234
).update(
228-
{ScriptLog.state: ScriptLog.RUNNING}, synchronize_session=False
235+
{
236+
ScriptLog.state: ScriptLog.RUNNING,
237+
ScriptLog.started_at: script_log.started_at,
238+
},
239+
synchronize_session=False,
229240
)
230241
db_session.commit()
231242
script_file_info = get_script_file(
@@ -244,49 +255,28 @@ def schedule(self):
244255
finally:
245256
DBSession.remove()
246257

247-
def script_done_callback(self, script_id, success, return_code, output, spend_time):
258+
def script_done_callback(
259+
self, script_id, success, return_code, output, started_at, spend_time
260+
):
248261
hook_update_script(
249-
DBSession(), script_id, success, return_code, output, spend_time
262+
DBSession(), script_id, success, return_code, output, started_at, spend_time
250263
)
251264
script_log = self.script_queue.get_script_log_by_id(script_id)
252265
if not script_log: # not counted in memory, only update db record
253266
return
254267
self.script_queue.script_done_callback(script_log)
255268

256-
def load_pending_script_logs(self):
269+
def load_pending_scripts(self):
257270
"""load pending script logs, should be called only when server start"""
258-
script_logs = (
271+
script_logs = list(
259272
DBSession.query(ScriptLog)
260273
.filter_by(state=ScriptLog.PENDING)
274+
.filter(ScriptLog.created_at > (datetime.now() - timedelta(hours=1)))
261275
.order_by(ScriptLog.id)
262276
)
277+
logger.info(f"load {len(script_logs)} pending scripts created within 1 hour")
263278
for script_log in script_logs:
264-
self.script_queue.add_script_log(script_log)
265-
266-
def timeout_setter(self):
267-
while True:
268-
db_session = DBSession()
269-
now_time = datetime.now()
270-
try:
271-
script_logs = self.script_queue.get_timeout_scripts()
272-
if script_logs:
273-
db_session.query(ScriptLog).filter(
274-
ScriptLog.id.in_([script_log.id for script_log in script_logs])
275-
).update(
276-
{
277-
ScriptLog.state: ScriptLog.FINISHED,
278-
ScriptLog.finished_at: now_time,
279-
ScriptLog.success: False,
280-
ScriptLog.output: TIMEOUT_OUTPUT,
281-
ScriptLog.return_code: -1,
282-
},
283-
synchronize_session=False,
284-
)
285-
except Exception as e:
286-
logger.exception(e)
287-
finally:
288-
DBSession.remove()
289-
time.sleep(60)
279+
self.script_queue.add(script_log)
290280

291281
def statistic_cleaner(self):
292282
while True:
@@ -301,10 +291,9 @@ def statistic_cleaner(self):
301291
time.sleep(24 * 60 * 60)
302292

303293
def start(self):
304-
self.load_pending_script_logs()
294+
self.load_pending_scripts()
305295
Thread(target=self.schedule, daemon=True).start()
306296
Thread(target=self.statistic_cleaner, daemon=True).start()
307-
Thread(target=self.timeout_setter, daemon=True).start()
308297

309298

310299
scheduler = Scheduelr()

0 commit comments

Comments
 (0)