Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ local_settings.py

seatable-python-runner/
seatable-python-runner.zip

.python-version
14 changes: 11 additions & 3 deletions scheduler/app/faas_scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,24 @@ def __init__(
org_id,
script_name,
context_data,
started_at,
operate_from=None,
):
self.dtable_uuid = dtable_uuid
self.owner = owner
self.org_id = org_id
self.script_name = script_name
self.context_data = context_data
self.started_at = started_at
self.operate_from = operate_from

def get_info(self):
return {
"id": self.id,
"org_id": self.org_id,
"owner": self.owner,
"dtable_uuid": self.dtable_uuid,
"script_name": self.script_name,
}

def to_dict(self):
from faas_scheduler.utils import datetime_to_isoformat_timestr

Expand All @@ -61,7 +68,8 @@ def to_dict(self):
"context_data": (
json.loads(self.context_data) if self.context_data else None
),
"started_at": datetime_to_isoformat_timestr(self.started_at),
"started_at": self.started_at
and datetime_to_isoformat_timestr(self.started_at),
"finished_at": self.finished_at
and datetime_to_isoformat_timestr(self.finished_at),
"success": self.success,
Expand Down
217 changes: 153 additions & 64 deletions scheduler/app/faas_scheduler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
import json
import logging
import requests
from datetime import datetime
from datetime import datetime, timedelta
from uuid import UUID

from tzlocal import get_localzone
from sqlalchemy import desc, text
from faas_scheduler.models import ScriptLog
from faas_scheduler.models import (
ScriptLog,
UserRunScriptStatistics,
OrgRunScriptStatistics,
DTableRunScriptStatistics,
)

import sys

Expand Down Expand Up @@ -75,7 +80,6 @@ def ping_starter():
return False


## triggered from scheduler.py to remove old script_logs
def delete_log_after_days(db_session):
clean_script_logs = (
"DELETE FROM `script_log` WHERE `started_at` < DATE_SUB(NOW(), INTERVAL %s DAY)"
Expand All @@ -94,7 +98,6 @@ def delete_log_after_days(db_session):
db_session.close()


## triggered from scheduler.py to remove old statistics
def delete_statistics_after_days(db_session):
tables = [
"dtable_run_script_statistics",
Expand Down Expand Up @@ -172,6 +175,7 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None):
},
"context_data": context_data,
"script_id": script_id,
"timeout": int(SUB_PROCESS_TIMEOUT),
}
headers = {"User-Agent": "python-scheduler/" + VERSION}
logger.debug("I call starter at url %s", RUN_FUNC_URL)
Expand All @@ -196,66 +200,140 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None):
return None


def update_statistics(db_session, dtable_uuid, owner, org_id, spend_time):
if not spend_time:
return
username = owner

# dtable_run_script_statistcis
sqls = [
"""
INSERT INTO dtable_run_script_statistics(dtable_uuid, run_date, total_run_count, total_run_time, update_at) VALUES
(:dtable_uuid, :run_date, 1, :spend_time, :update_at)
ON DUPLICATE KEY UPDATE
total_run_count=total_run_count+1,
total_run_time=total_run_time+:spend_time,
update_at=:update_at;
"""
]
def update_stats_run_count(db_session, dtable_uuid, owner, org_id):
run_date = datetime.today().strftime("%Y-%m-%d")
try:
dtable_stats = (
db_session.query(DTableRunScriptStatistics)
.filter_by(dtable_uuid=dtable_uuid, run_date=run_date)
.first()
)
if not dtable_stats:
dtable_stats = DTableRunScriptStatistics(
dtable_uuid=dtable_uuid,
run_date=run_date,
total_run_count=1,
total_run_time=0,
update_at=datetime.now(),
)
db_session.add(dtable_stats)
else:
dtable_stats.total_run_count += 1
dtable_stats.update_at = datetime.now()
if org_id == -1:
if "@seafile_group" not in owner:
user_stats = (
db_session.query(UserRunScriptStatistics)
.filter_by(username=owner, run_date=run_date)
.first()
)
if not user_stats:
user_stats = UserRunScriptStatistics(
username=owner,
run_date=run_date,
total_run_count=1,
total_run_time=0,
update_at=datetime.now(),
)
db_session.add(user_stats)
else:
user_stats.total_run_count += 1
user_stats.update_at = datetime.now()
else:
org_stats = (
db_session.query(OrgRunScriptStatistics)
.filter_by(org_id=org_id, run_date=run_date)
.first()
)
if not org_stats:
org_stats = OrgRunScriptStatistics(
org_id=org_id,
run_date=run_date,
total_run_count=1,
total_run_time=0,
update_at=datetime.now(),
)
db_session.add(org_stats)
else:
org_stats.total_run_count += 1
org_stats.update_at = datetime.now()
db_session.commit()
except Exception as e:
logger.exception(
"update stats for org_id %s owner %s dtable %s run count error %s",
org_id,
owner,
dtable_uuid,
e,
)

# org_run_script_statistics
if org_id and org_id != -1:
sqls += [
"""
INSERT INTO org_run_script_statistics(org_id, run_date, total_run_count, total_run_time, update_at) VALUES
(:org_id, :run_date, 1, :spend_time, :update_at)
ON DUPLICATE KEY UPDATE
total_run_count=total_run_count+1,
total_run_time=total_run_time+:spend_time,
update_at=:update_at;
"""
]

# user_run_script_statistics
if "@seafile_group" not in username:
sqls += [
"""
INSERT INTO user_run_script_statistics(username, org_id, run_date, total_run_count, total_run_time, update_at) VALUES
(:username, :org_id, :run_date, 1, :spend_time, :update_at)
ON DUPLICATE KEY UPDATE
org_id=:org_id,
total_run_count=total_run_count+1,
total_run_time=total_run_time+:spend_time,
update_at=:update_at;
"""
]

def update_stats_run_time(db_session, dtable_uuid, owner, org_id, spend_time):
run_date = datetime.today().strftime("%Y-%m-%d")
try:
for sql in sqls:
db_session.execute(
text(sql),
{
"dtable_uuid": dtable_uuid,
"username": username,
"org_id": org_id,
"run_date": datetime.today(),
"spend_time": spend_time,
"update_at": datetime.now(),
},
dtable_stats = (
db_session.query(DTableRunScriptStatistics)
.filter_by(dtable_uuid=dtable_uuid, run_date=run_date)
.first()
)
if not dtable_stats:
dtable_stats = DTableRunScriptStatistics(
dtable_uuid=dtable_uuid,
run_date=run_date,
total_run_count=1,
total_run_time=spend_time,
update_at=datetime.now(),
)
db_session.add(dtable_stats)
else:
dtable_stats.total_run_time += spend_time
dtable_stats.update_at = datetime.now()
if org_id == -1:
if "@seafile_group" not in owner:
user_stats = (
db_session.query(UserRunScriptStatistics)
.filter_by(username=owner, run_date=run_date)
.first()
)
if not user_stats:
user_stats = UserRunScriptStatistics(
username=owner,
run_date=run_date,
total_run_count=1,
total_run_time=spend_time,
update_at=datetime.now(),
)
db_session.add(user_stats)
else:
user_stats.total_run_time += spend_time
user_stats.update_at = datetime.now()
else:
org_stats = (
db_session.query(OrgRunScriptStatistics)
.filter_by(org_id=org_id, run_date=run_date)
.first()
)
if not org_stats:
org_stats = OrgRunScriptStatistics(
org_id=org_id,
run_date=run_date,
total_run_count=1,
total_run_time=spend_time,
update_at=datetime.now(),
)
db_session.add(org_stats)
else:
org_stats.total_run_time += spend_time
org_stats.update_at = datetime.now()
db_session.commit()
except Exception as e:
logger.exception("update statistics sql error: %s", e)
logger.exception(
"update stats for org_id %s owner %s dtable %s run time error %s",
org_id,
owner,
dtable_uuid,
e,
)


# required to get "script logs" in dtable-web
Expand Down Expand Up @@ -379,17 +457,21 @@ def add_script(
org_id,
script_name,
context_data,
datetime.now(),
operate_from,
)
db_session.add(script)
db_session.commit()

update_stats_run_count(db_session, dtable_uuid, owner, org_id)

return script


def update_script(db_session, script, success, return_code, output):
script.finished_at = datetime.now()
def update_script(
db_session, script, success, return_code, output, started_at, finished_at
):
script.started_at = started_at
script.finished_at = finished_at
script.success = success
script.return_code = return_code
script.output = output
Expand All @@ -414,17 +496,24 @@ def run_script(
call_faas_func(script_url, temp_api_token, context_data, script_id=script_id)
except Exception as e:
logger.exception("Run script %d error: %s", script_id, e)
now = datetime.now()
hook_update_script(db_session, script_id, False, -1, "", now, 0)
finally:
db_session.close()

return True


def hook_update_script(db_session, script_id, success, return_code, output, spend_time):
def hook_update_script(
db_session, script_id, success, return_code, output, started_at, spend_time
):
script = db_session.query(ScriptLog).filter_by(id=script_id).first()
if script:
update_script(db_session, script, success, return_code, output)
update_statistics(
finished_at = started_at + timedelta(seconds=spend_time)
update_script(
db_session, script, success, return_code, output, started_at, finished_at
)
update_stats_run_time(
db_session, script.dtable_uuid, script.owner, script.org_id, spend_time
)

Expand Down
Loading