Skip to content

Commit

Permalink
🐛 removing unsubscribe from rabbitmq (#4687)
Browse files Browse the repository at this point in the history
  • Loading branch information
matusdrobuliak66 authored Aug 31, 2023
1 parent 831c5e7 commit 87c1224
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool:
*(
publish_service_resource_tracking_heartbeat(
self.rabbitmq_client,
get_resource_tracking_run_id(user_id, t.project_id, iteration),
get_resource_tracking_run_id(
user_id, t.project_id, t.node_id, iteration
),
)
for t in running_tasks
)
Expand Down Expand Up @@ -355,7 +357,7 @@ async def _process_started_tasks(
publish_service_resource_tracking_started(
self.rabbitmq_client,
service_run_id=get_resource_tracking_run_id(
user_id, t.project_id, iteration
user_id, t.project_id, t.node_id, iteration
),
wallet_id=run_metadata.get("wallet_id"),
wallet_name=run_metadata.get("wallet_name"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ async def _process_task_result(
# resource tracking
await publish_service_resource_tracking_stopped(
self.rabbitmq_client,
get_resource_tracking_run_id(user_id, project_id, iteration),
get_resource_tracking_run_id(user_id, project_id, node_id, iteration),
simcore_platform_status=simcore_platform_status,
)
# instrumentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from models_library.docker import DockerGenericTag
from models_library.projects import ProjectID
from models_library.projects_nodes import NodeID
from models_library.projects_state import RunningState
from models_library.services_resources import (
ResourceValue,
Expand Down Expand Up @@ -47,9 +48,9 @@


def get_resource_tracking_run_id(
user_id: UserID, project_id: ProjectID, iteration: Iteration
user_id: UserID, project_id: ProjectID, node_id: NodeID, iteration: Iteration
) -> str:
return f"comp_{user_id}_{project_id}_{iteration}"
return f"comp_{user_id}_{project_id}_{node_id}_{iteration}"


def create_service_resources_from_task(task: CompTaskAtDB) -> ServiceResourcesDict:
Expand Down
15 changes: 11 additions & 4 deletions services/director-v2/tests/unit/test_utils_comp_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
from models_library.docker import DockerGenericTag
from models_library.projects import ProjectID
from models_library.projects_nodes import NodeID
from models_library.projects_state import RunningState
from models_library.users import UserID
from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB
Expand Down Expand Up @@ -49,21 +50,27 @@ def test_scheduler_knows_all_the_states():


@pytest.mark.parametrize(
"user_id, project_id, iteration, expected_result",
"user_id, project_id, node_id, iteration, expected_result",
[
(
2,
ProjectID("e08356e4-eb74-49e9-b769-2c26e34c61d9"),
NodeID("a08356e4-eb74-49e9-b769-2c26e34c61d1"),
5,
"comp_2_e08356e4-eb74-49e9-b769-2c26e34c61d9_5",
"comp_2_e08356e4-eb74-49e9-b769-2c26e34c61d9_a08356e4-eb74-49e9-b769-2c26e34c61d1_5",
)
],
)
def test_get_resource_tracking_run_id(
user_id: UserID, project_id: ProjectID, iteration: Iteration, expected_result: str
user_id: UserID,
project_id: ProjectID,
node_id: NodeID,
iteration: Iteration,
expected_result: str,
):
assert (
get_resource_tracking_run_id(user_id, project_id, iteration) == expected_result
get_resource_tracking_run_id(user_id, project_id, node_id, iteration)
== expected_result
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ async def on_startup() -> None:
)

async def on_shutdown() -> None:
if (
app.state.rabbitmq_client
and not app.state.resource_tracker_rabbitmq_consumer
):
if app.state.rabbitmq_client:
await app.state.rabbitmq_client.close()

app.add_event_handler("startup", on_startup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ async def _subscribe_to_rabbitmq(app) -> str:
return subscribed_queue


async def _unsubscribe_from_rabbitmq(app) -> None:
with log_context(
_logger, logging.INFO, msg="Unsubscribing from rabbitmq channels"
), log_catch(_logger, reraise=False):
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
await rabbit_client.unsubscribe(app.state.resource_tracker_rabbitmq_consumer)


def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]:
async def _startup() -> None:
with log_context(
Expand All @@ -54,11 +46,13 @@ async def _startup() -> None:
return _startup


def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]:
def on_app_shutdown(
_app: FastAPI,
) -> Callable[[], Awaitable[None]]:
async def _stop() -> None:
if app.state.resource_tracker_rabbitmq_consumer:
await _unsubscribe_from_rabbitmq(app)
await app.state.rabbitmq_client.close()
# NOTE: We want to have persistent queue, therefore we will not unsubscribe
assert _app # nosec
return None

return _stop

Expand Down

0 comments on commit 87c1224

Please sign in to comment.