Skip to content

Commit cf3123b

Browse files
committed
init refactor, add Scheduler
1 parent 1ba29ce commit cf3123b

File tree

6 files changed

+258
-76
lines changed

6 files changed

+258
-76
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,5 @@ local_settings.py
1313

1414
seatable-python-runner/
1515
seatable-python-runner.zip
16+
17+
.python-version

scheduler/app/database/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from sqlalchemy.ext.declarative import declarative_base
55
from sqlalchemy import create_engine
6-
from sqlalchemy.orm import sessionmaker
6+
from sqlalchemy.orm import sessionmaker, scoped_session
77

88
DB_ROOT_USER = os.getenv("DB_ROOT_USER", "root")
99
DB_ROOT_PASSWD = os.getenv("DB_ROOT_PASSWD")
@@ -37,4 +37,4 @@
3737

3838
engine = create_engine(db_url, **db_kwargs)
3939
Base = declarative_base()
40-
DBSession = sessionmaker(bind=engine)
40+
DBSession = scoped_session(sessionmaker(bind=engine))

scheduler/app/faas_scheduler/models.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ class ScriptLog(Base):
3131
return_code = Column(Integer, nullable=True)
3232
output = Column(Text, nullable=True)
3333
operate_from = Column(String(255))
34+
state = Column(String(10))
35+
36+
PENDING = 'pending'
37+
RUNNING = 'running'
38+
FINISHED = 'finished'
3439

3540
def __init__(
3641
self,
@@ -40,6 +45,7 @@ def __init__(
4045
script_name,
4146
context_data,
4247
started_at,
48+
state,
4349
operate_from=None,
4450
):
4551
self.dtable_uuid = dtable_uuid
@@ -49,6 +55,7 @@ def __init__(
4955
self.context_data = context_data
5056
self.started_at = started_at
5157
self.operate_from = operate_from
58+
self.state = state
5259

5360
def to_dict(self):
5461
from faas_scheduler.utils import datetime_to_isoformat_timestr
@@ -68,6 +75,7 @@ def to_dict(self):
6875
"return_code": self.return_code,
6976
"output": self.output,
7077
"operate_from": self.operate_from,
78+
"state": self.state
7179
}
7280

7381

scheduler/app/flask_server.py

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,11 @@
88
from datetime import datetime
99
from flask import Flask, request, make_response
1010
from gevent.pywsgi import WSGIServer
11-
from concurrent.futures import ThreadPoolExecutor
1211

1312
from database import DBSession
1413
from faas_scheduler.utils import (
1514
check_auth_token,
16-
run_script,
1715
get_script,
18-
add_script,
1916
get_run_script_statistics_by_month,
2017
hook_update_script,
2118
can_run_task,
@@ -26,6 +23,7 @@
2623
uuid_str_to_32_chars,
2724
basic_log,
2825
)
26+
from .scheduler import scheduler
2927

3028

3129
basic_log("scheduler.log")
@@ -38,7 +36,11 @@
3836
app = Flask(__name__)
3937

4038
logger = logging.getLogger(__name__)
41-
executor = ThreadPoolExecutor(max_workers=SCRIPT_WORKERS)
39+
40+
41+
@app.teardown_appcontext
42+
def shutdown_session(exception=None):
43+
DBSession.remove()
4244

4345

4446
@app.route("/ping/", methods=["GET"])
@@ -72,8 +74,6 @@ def scripts_api():
7274
context_data = data.get("context_data")
7375
owner = data.get("owner")
7476
org_id = data.get("org_id")
75-
script_url = data.get("script_url")
76-
temp_api_token = data.get("temp_api_token")
7777
scripts_running_limit = data.get("scripts_running_limit", -1)
7878
operate_from = data.get("operate_from", "manualy")
7979
if not dtable_uuid or not script_name or not owner:
@@ -87,27 +87,16 @@ def scripts_api():
8787
owner, org_id, db_session, scripts_running_limit=scripts_running_limit
8888
):
8989
return make_response(("The number of runs exceeds the limit"), 400)
90-
script = add_script(
91-
db_session,
92-
dtable_uuid,
93-
owner,
90+
script_log = scheduler.add_script_log(
91+
uuid_str_to_32_chars(dtable_uuid),
9492
org_id,
93+
owner,
9594
script_name,
9695
context_data,
97-
operate_from,
98-
)
99-
logger.debug("lets call the starter to fire up the runner...")
100-
executor.submit(
101-
run_script,
102-
script.id,
103-
dtable_uuid,
104-
script_name,
105-
script_url,
106-
temp_api_token,
107-
context_data,
96+
operate_from
10897
)
10998

110-
return make_response(({"script_id": script.id}, 200))
99+
return make_response(({"script_id": script_log.id}, 200))
111100
except Exception as e:
112101
logger.exception(e)
113102
return make_response(("Internal server error", 500))
@@ -143,15 +132,15 @@ def script_api(script_id):
143132
if dtable_uuid != script.dtable_uuid or script_name != script.script_name:
144133
return make_response(("Bad request", 400))
145134

146-
if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int):
147-
now = datetime.now()
148-
duration_seconds = (now - script.started_at).seconds
149-
if duration_seconds > SUB_PROCESS_TIMEOUT:
150-
script.success = False
151-
script.return_code = -1
152-
script.finished_at = now
153-
script.output = TIMEOUT_OUTPUT
154-
db_session.commit()
135+
# if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int):
136+
# now = datetime.now()
137+
# duration_seconds = (now - script.started_at).seconds
138+
# if duration_seconds > SUB_PROCESS_TIMEOUT:
139+
# script.success = False
140+
# script.return_code = -1
141+
# script.finished_at = now
142+
# script.output = TIMEOUT_OUTPUT
143+
# db_session.commit()
155144

156145
return make_response(({"script": script.to_dict()}, 200))
157146

scheduler/app/scheduler.py

Lines changed: 170 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,183 @@
1+
import json
2+
import logging
13
import os
2-
import gc
34
import time
4-
import logging
5-
from threading import Thread
5+
from threading import Lock, Thread
66

77
from database import DBSession
8+
from faas_scheduler.models import ScriptLog
89
from faas_scheduler.utils import (
9-
check_and_set_tasks_timeout,
10-
delete_log_after_days,
11-
delete_statistics_after_days,
12-
basic_log,
10+
add_script,
11+
run_script,
12+
get_script_file,
13+
hook_update_script
1314
)
1415

15-
basic_log("scheduler.log")
16+
logger = logging.getLogger(__name__)
17+
1618

17-
SUB_PROCESS_TIMEOUT = int(
18-
os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15)
19-
) # 15 minutes
19+
class ScriptQueue:
2020

21-
logger = logging.getLogger(__name__)
21+
def __init__(self):
22+
self.q = [] # a list of ScriptLog
23+
self.script_logs_dict = {} # a dict of {id: ScriptLog}
24+
self.lock = Lock()
25+
self.running_count = {}
26+
# a dict of
27+
# {
28+
# "<team>": 0,
29+
# "<team>_<dtable_uuid>": 0,
30+
# "<team>_<dtable_uuid>_<script_name>": 0
31+
# }
32+
try:
33+
run_limit_per_team = os.environ.get('RUN_LIMIT_PER_TEAM', 0)
34+
except:
35+
run_limit_per_team = 0
36+
try:
37+
run_limit_per_base = os.environ.get('RUN_LIMIT_PER_BASE', 0)
38+
except:
39+
run_limit_per_base = 0
40+
try:
41+
run_limit_per_script = os.environ.get('RUN_LIMIT_PER_SCRIPT', 0)
42+
except:
43+
run_limit_per_script = 0
44+
self.config = {
45+
'run_limit_per_team': run_limit_per_team,
46+
'run_limit_per_base': run_limit_per_base,
47+
'run_limit_per_script': run_limit_per_script
48+
}
49+
50+
def can_run_script(self, script_log: ScriptLog):
51+
if script_log.org_id != -1:
52+
running_team_key = f'{script_log.org_id}'
53+
else:
54+
running_team_key = f'{script_log.owner}'
55+
running_base_key = f'{running_team_key}_{script_log.dtable_uuid}'
56+
running_script_key = f'{running_base_key}_{script_log.script_name}'
57+
58+
if self.config['run_limit_per_team'] > 0 and self.config['run_limit_per_team'] <= self.running_count.get(running_team_key, 0):
59+
return False
60+
if self.config['run_limit_per_base'] > 0 and self.config['run_limit_per_base'] <= self.running_count.get(running_base_key, 0):
61+
return False
62+
if self.config['run_limit_per_script'] > 0 and self.config['run_limit_per_script'] <= self.running_count.get(running_script_key, 0):
63+
return False
64+
65+
return True
66+
67+
def add_script_log(self, script_log: ScriptLog):
68+
with self.lock:
69+
self.q.append(script_log)
70+
self.script_logs_dict[script_log.id] = script_log
71+
72+
def get(self):
73+
"""get the first valid task from self.q
2274
75+
Return: an instance of ScriptTask or None
76+
"""
77+
with self.lock:
78+
return_task = None
2379

24-
class FAASTaskTimeoutSetter(Thread):
80+
index = 0
81+
while index < len(self.q):
82+
script_log = self.q[index]
83+
if self.can_run_script(script_log):
84+
return_task = script_log
85+
self.q.pop(index)
86+
break
87+
index += 1
88+
89+
return return_task
90+
91+
def script_done_callback(self, script_log: ScriptLog):
92+
with self.lock:
93+
if script_log.org_id != -1:
94+
running_team_key = f'{script_log.org_id}'
95+
else:
96+
running_team_key = f'{script_log.owner}'
97+
running_base_key = f'{running_team_key}_{script_log.dtable_uuid}'
98+
running_script_key = f'{running_base_key}_{script_log.script_name}'
99+
if running_team_key in self.running_count:
100+
self.running_count[running_team_key] -= 1
101+
if running_base_key in self.running_count:
102+
self.running_count[running_base_key] -= 1
103+
if running_script_key in self.running_count:
104+
self.running_count[running_script_key] -= 1
105+
106+
107+
class Scheduelr:
25108

26109
def __init__(self):
27-
super(FAASTaskTimeoutSetter, self).__init__()
28-
self.interval = 60 * 30 # every half an hour
29-
30-
def run(self):
31-
if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int):
32-
while True:
33-
logger.info("Start automatic cleanup ...")
34-
db_session = DBSession()
35-
try:
36-
check_and_set_tasks_timeout(db_session)
37-
except Exception as e:
38-
logger.exception("task cleaner error: %s", e)
39-
finally:
40-
db_session.close()
41-
42-
# python garbage collection
43-
logger.info("gc.collect: %s", str(gc.collect()))
44-
45-
# remove old script_logs and statistics
46-
delete_log_after_days(db_session)
47-
delete_statistics_after_days(db_session)
48-
49-
# sleep
50-
logger.info("Sleep for %d seconds ...", self.interval)
51-
time.sleep(self.interval)
52-
53-
54-
if __name__ == "__main__":
55-
task_timeout_setter = FAASTaskTimeoutSetter()
56-
task_timeout_setter.start()
110+
self.script_queue = ScriptQueue()
111+
112+
def add_script_log(
113+
self,
114+
dtable_uuid,
115+
org_id,
116+
owner,
117+
script_name,
118+
context_data,
119+
operate_from
120+
):
121+
script_log = add_script(
122+
DBSession(),
123+
dtable_uuid,
124+
owner,
125+
org_id,
126+
script_name,
127+
context_data,
128+
operate_from
129+
)
130+
self.script_queue.add_script_log(script_log)
131+
return script_log
132+
133+
def schedule(self):
134+
while True:
135+
script_log = self.script_queue.get()
136+
if not script_log:
137+
time.sleep(0.5)
138+
try:
139+
script_file_info = get_script_file(script_log.dtable_uuid, script_log.script_name)
140+
run_script(
141+
script_log.script_id,
142+
script_log.dtable_uuid,
143+
script_log.script_name,
144+
script_file_info['script_url'],
145+
script_file_info['temp_api_token'],
146+
json.loads(script_log.context_data)
147+
)
148+
except Exception as e:
149+
logger.exception(f'run script: {script_log} error {e}')
150+
151+
def script_done_callback(
152+
self,
153+
script_id,
154+
success,
155+
return_code,
156+
output,
157+
spend_time
158+
):
159+
script_log = self.script_queue.script_logs_dict.pop(script_id)
160+
hook_update_script(
161+
DBSession(),
162+
script_id,
163+
success,
164+
return_code,
165+
output,
166+
spend_time
167+
)
168+
if not script_log: # not counted in memory, only update db record
169+
return
170+
171+
def load_pending_script_logs(self):
172+
"""load pending script logs, should be called only when server start
173+
"""
174+
script_logs = DBSession.query(ScriptLog).filter_by(state=ScriptLog.PENDING).order_by(ScriptLog.id)
175+
for script_log in script_logs:
176+
self.script_queue.add_script_log(script_log)
177+
178+
def start(self):
179+
self.load_pending_script_logs()
180+
Thread(target=self.schedule, daemon=True).start()
181+
182+
183+
scheduler = Scheduelr()

0 commit comments

Comments
 (0)