Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 removing unsubscribe from rabbitmq + ♻️ comp resource tracking id change #4687

Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool:
*(
publish_service_resource_tracking_heartbeat(
self.rabbitmq_client,
get_resource_tracking_run_id(user_id, t.project_id, iteration),
get_resource_tracking_run_id(
user_id, t.project_id, t.node_id, iteration
),
)
for t in running_tasks
)
Expand Down Expand Up @@ -355,7 +357,7 @@ async def _process_started_tasks(
publish_service_resource_tracking_started(
self.rabbitmq_client,
service_run_id=get_resource_tracking_run_id(
user_id, t.project_id, iteration
user_id, t.project_id, t.node_id, iteration
),
wallet_id=run_metadata.get("wallet_id"),
wallet_name=run_metadata.get("wallet_name"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ async def _process_task_result(
# resource tracking
await publish_service_resource_tracking_stopped(
self.rabbitmq_client,
get_resource_tracking_run_id(user_id, project_id, iteration),
get_resource_tracking_run_id(user_id, project_id, node_id, iteration),
simcore_platform_status=simcore_platform_status,
)
# instrumentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from models_library.docker import DockerGenericTag
from models_library.projects import ProjectID
from models_library.projects_nodes import NodeID
from models_library.projects_state import RunningState
from models_library.services_resources import (
ResourceValue,
Expand Down Expand Up @@ -47,9 +48,9 @@


def get_resource_tracking_run_id(
user_id: UserID, project_id: ProjectID, iteration: Iteration
user_id: UserID, project_id: ProjectID, node_id: NodeID, iteration: Iteration
) -> str:
return f"comp_{user_id}_{project_id}_{iteration}"
return f"comp_{user_id}_{project_id}_{node_id}_{iteration}"


def create_service_resources_from_task(task: CompTaskAtDB) -> ServiceResourcesDict:
Expand Down
15 changes: 11 additions & 4 deletions services/director-v2/tests/unit/test_utils_comp_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
from models_library.docker import DockerGenericTag
from models_library.projects import ProjectID
from models_library.projects_nodes import NodeID
from models_library.projects_state import RunningState
from models_library.users import UserID
from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB
Expand Down Expand Up @@ -49,21 +50,27 @@ def test_scheduler_knows_all_the_states():


@pytest.mark.parametrize(
"user_id, project_id, iteration, expected_result",
"user_id, project_id, node_id, iteration, expected_result",
[
(
2,
ProjectID("e08356e4-eb74-49e9-b769-2c26e34c61d9"),
NodeID("a08356e4-eb74-49e9-b769-2c26e34c61d1"),
5,
"comp_2_e08356e4-eb74-49e9-b769-2c26e34c61d9_5",
"comp_2_e08356e4-eb74-49e9-b769-2c26e34c61d9_a08356e4-eb74-49e9-b769-2c26e34c61d1_5",
)
],
)
def test_get_resource_tracking_run_id(
user_id: UserID, project_id: ProjectID, iteration: Iteration, expected_result: str
user_id: UserID,
project_id: ProjectID,
node_id: NodeID,
iteration: Iteration,
expected_result: str,
):
assert (
get_resource_tracking_run_id(user_id, project_id, iteration) == expected_result
get_resource_tracking_run_id(user_id, project_id, node_id, iteration)
== expected_result
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ async def on_startup() -> None:
)

async def on_shutdown() -> None:
if (
app.state.rabbitmq_client
and not app.state.resource_tracker_rabbitmq_consumer
):
if app.state.rabbitmq_client:
await app.state.rabbitmq_client.close()

app.add_event_handler("startup", on_startup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ async def _subscribe_to_rabbitmq(app) -> str:
return subscribed_queue


async def _unsubscribe_from_rabbitmq(app) -> None:
with log_context(
_logger, logging.INFO, msg="Unsubscribing from rabbitmq channels"
), log_catch(_logger, reraise=False):
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
await rabbit_client.unsubscribe(app.state.resource_tracker_rabbitmq_consumer)


def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]:
async def _startup() -> None:
with log_context(
Expand All @@ -54,11 +46,13 @@ async def _startup() -> None:
return _startup


def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]:
def on_app_shutdown(
_app: FastAPI,
) -> Callable[[], Awaitable[None]]:
async def _stop() -> None:
if app.state.resource_tracker_rabbitmq_consumer:
await _unsubscribe_from_rabbitmq(app)
await app.state.rabbitmq_client.close()
# NOTE: We want to have persistent queue, therefore we will not unsubscribe
assert _app # nosec
return None

return _stop

Expand Down