Skip to content

Commit

Permalink
PTFE-1196 reduce impact of missed webhooks
Browse files Browse the repository at this point in the history
  • Loading branch information
tcarmet committed Dec 4, 2023
1 parent 11e6771 commit ff719d2
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"typeCheckingMode": "basic"
},
"editor": {
"defaultFormatter": "ms-python.python"
"defaultFormatter": "ms-python.black-formatter"
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions runner_manager/jobs/workflow_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
from datetime import timedelta

from githubkit.webhooks.models import (
WorkflowJobCompleted,
Expand All @@ -25,6 +26,11 @@ def log_workflow_job(webhook: WorkflowJobEvent) -> None:
)


def time_to_start(webhook: WorkflowJobInProgress | WorkflowJobCompleted) -> timedelta:
"""From a given webhook, calculate the time it took to start the job"""
return webhook.workflow_job.started_at - webhook.workflow_job.created_at


def completed(webhook: WorkflowJobCompleted) -> int:
log_workflow_job(webhook)
runner: Runner | None = Runner.find_from_webhook(webhook)
Expand Down Expand Up @@ -55,6 +61,16 @@ def in_progress(webhook: WorkflowJobInProgress) -> str | None:
log.info(f"Updating runner {name} in group {runner_group.name}")
runner: Runner = runner_group.update_runner(webhook=webhook)
log.info(f"Runner {name} in group {runner_group.name} has been updated")
tts = time_to_start(webhook)
log.info(f"{runner} took {tts} to start")
# If the time to start is greater than 15 minutes, create an extra runner.
# While we could do this for scaling purposes, the main reason is to
# ensure that in the case we have missed a webhook, we still have a runner
# available for the jobs that are requesting it.
if tts > timedelta(minutes=15):
log.info(f"Runner group {runner_group.name} needs a new runner")
github: GitHub = get_github()
runner_group.create_runner(github)
return runner.pk


Expand Down
1 change: 1 addition & 0 deletions tests/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class Repo(Repository):
runner_group_id=Int,
labels=st.lists(Text, min_size=1, max_size=5),
started_at=st.datetimes(),
created_at=st.datetimes(),
)

JobPropQueuedStrategy = st.builds(
Expand Down
56 changes: 55 additions & 1 deletion tests/unit/jobs/test_workflow_job.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
from time import sleep
from uuid import uuid4

Expand Down Expand Up @@ -103,7 +104,6 @@ def test_workflow_job_completed(
def test_workflow_job_in_progress(
webhook: WorkflowJobInProgress, queue: Queue, settings: Settings, redis: Redis
):

# flush all keys that start with settings.name in redis

init_model(Runner, redis, settings)
Expand Down Expand Up @@ -197,3 +197,57 @@ def test_workflow_job_queued(
).first()
assert runner.busy is False
assert runner.status == "offline"


@settings(max_examples=10)
@given(
webhook=WorkflowJobInProgressStrategy,
queue=QueueStrategy,
settings=SettingsStrategy,
redis=RedisStrategy,
)
def test_time_to_start(
webhook: WorkflowJobInProgress, queue: Queue, settings: Settings, redis: Redis
):
"""
This test will ensure that an extra runner is created when the time to start
the given workflow was higher than 15 minutes.
"""
init_model(Runner, redis, settings)
init_model(RunnerGroup, redis, settings)
runner_group: RunnerGroup = RunnerGroup(
organization=webhook.organization.login,
name=webhook.workflow_job.runner_group_name,
id=webhook.workflow_job.runner_group_id,
labels=webhook.workflow_job.labels,
manager=settings.name,
backend={"name": "base"},
)
runner_group.save()
Migrator().run()

runner: Runner = Runner(
id=webhook.workflow_job.runner_id,
name=webhook.workflow_job.runner_name,
busy=False,
status="online",
manager=settings.name,
runner_group_id=webhook.workflow_job.runner_group_id,
runner_group_name=webhook.workflow_job.runner_group_name,
)
runner.save()
Migrator().run()

assert len(runner_group.get_runners()) == 1
webhook.workflow_job.started_at = webhook.workflow_job.created_at + timedelta(
minutes=10
)
queue.enqueue(workflow_job.in_progress, webhook)
# ensure we have only one runner if the time to start is less than 15 minutes
assert len(runner_group.get_runners()) == 1
webhook.workflow_job.started_at = webhook.workflow_job.created_at + timedelta(
minutes=20
)
# ensure we have two runners if the time to start is greater than 15 minutes
queue.enqueue(workflow_job.in_progress, webhook)
assert len(runner_group.get_runners()) == 2

0 comments on commit ff719d2

Please sign in to comment.