Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在执行工单函数中,对于不正常的工单状态,记录日志,避免q-task不断重试 #2850

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions sql/utils/execute_sql.py
Original file line number Diff line number Diff line change
@@ -17,21 +17,43 @@

def execute(workflow_id, user=None):
"""为延时或异步任务准备的execute, 传入工单ID和执行人信息"""
audit_id = Audit.detail_by_workflow_id(
workflow_id=workflow_id, workflow_type=WorkflowType.SQL_REVIEW
).audit_id
# 使用当前读防止重复执行
with transaction.atomic():
workflow_detail = SqlWorkflow.objects.select_for_update().get(id=workflow_id)
# 只有排队中和定时执行的数据才可以继续执行,否则直接抛错
if workflow_detail.status not in ["workflow_queuing", "workflow_timingtask"]:
raise Exception("工单状态不正确,禁止执行!")
logger.error(f"工单号[{workflow_id}] 可能被任务调度器重试")
Audit.add_log(
audit_id=audit_id,
operation_type=5,
operation_type_desc="执行工单发生异常",
operation_info="请检查工单执行情况",
operator=user.username if user else "",
operator_display=user.display if user else "系统",
)
result = ReviewSet(
rows=[
ReviewResult(
id=1,
errlevel=2,
stagestatus="执行发生错误",
errormessage=f"任务[{workflow_id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员",
)
],
)
result.error = (
f"任务[{workflow_id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员",
)
return result
# 将工单状态修改为执行中
else:
SqlWorkflow(id=workflow_id, status="workflow_executing").save(
update_fields=["status"]
)
# 增加执行日志
audit_id = Audit.detail_by_workflow_id(
workflow_id=workflow_id, workflow_type=WorkflowType.SQL_REVIEW
).audit_id
Audit.add_log(
audit_id=audit_id,
operation_type=5,
31 changes: 31 additions & 0 deletions sql/utils/tests.py
Original file line number Diff line number Diff line change
@@ -364,6 +364,19 @@ def setUp(self):
db_name="some_db",
syntax_type=1,
)
self.wf_executing = SqlWorkflow.objects.create(
workflow_name="some_name",
group_id=1,
group_name="g1",
engineer_display="",
audit_auth_groups="some_group",
create_time=datetime.datetime.now(),
status="workflow_executing",
is_backup=True,
instance=self.ins,
db_name="some_db",
syntax_type=1,
)
SqlWorkflowContent.objects.create(
workflow=self.wf,
sql_content="some_sql",
@@ -409,6 +422,24 @@ def test_execute(self, _get_engine, _execute_workflow, _audit):
operator_display="系统",
)

@patch("sql.utils.execute_sql.Audit")
@patch("sql.engines.mysql.MysqlEngine.execute_workflow")
@patch("sql.engines.get_engine")
def test_execute_in_executing(self, _get_engine, _execute_workflow, _audit):
_audit.detail_by_workflow_id.return_value.audit_id = 1
result = execute(self.wf_executing.id)
_audit.add_log.assert_called_with(
audit_id=1,
operation_type=5,
operation_type_desc="执行工单发生异常",
operation_info="请检查工单执行情况",
operator="",
operator_display="系统",
)
assert result.error == (
f"任务[{self.wf_executing.id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员",
)

@patch("sql.utils.execute_sql.notify_for_execute")
@patch("sql.utils.execute_sql.Audit")
def test_execute_callback_success(self, _audit, _notify):