Skip to content

Commit

Permalink
added rabbitmq utils, refactor and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 5, 2022
1 parent cf701da commit 4377e7e
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
from datetime import datetime

from fastapi import FastAPI
from models_library.rabbitmq_messages import AutoscalingStatus
from pydantic import parse_obj_as
from types_aiobotocore_ec2.literals import InstanceTypeType

from . import utils_aws, utils_docker
from ._meta import VERSION
from .core.errors import Ec2InstanceNotFoundError
from .core.settings import ApplicationSettings
from .rabbitmq import post_message
from .utils.rabbitmq import create_rabbit_message
from .utils import rabbitmq

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,36 +47,31 @@ async def check_dynamic_resources(app: FastAPI) -> None:
pending_tasks = await utils_docker.pending_service_tasks_with_insufficient_resources(
service_labels=app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS
)
await post_message(
await rabbitmq.post_state_message(
app,
create_rabbit_message(
app,
monitored_nodes,
cluster_total_resources,
cluster_used_resources,
pending_tasks,
status=AutoscalingStatus.SCALING_UP
if pending_tasks
else AutoscalingStatus.IDLE,
),
monitored_nodes,
cluster_total_resources,
cluster_used_resources,
pending_tasks,
)

if not pending_tasks:
logger.debug("no pending tasks with insufficient resources at the moment")
return

logger.info(
"%s service task(s) with %s label(s) are pending due to insufficient resources",
f"{len(pending_tasks)}",
f"{app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS}",
)

assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
list_of_ec2_instances = await utils_aws.get_ec2_instance_capabilities(
app_settings.AUTOSCALING_EC2_ACCESS, app_settings.AUTOSCALING_EC2_INSTANCES
)

for task in pending_tasks:
await rabbitmq.post_log_message(
app,
task,
"service is pending due to insufficient resources, scaling up cluster please wait...",
logging.INFO,
)
try:
ec2_instances_needed = [
utils_aws.find_best_fitting_ec2_instance(
Expand All @@ -98,22 +91,19 @@ async def check_dynamic_resources(app: FastAPI) -> None:
InstanceTypeType, ec2_instances_needed[0].name
),
tags={
"io.osparc.autoscaling.created": f"{datetime.utcnow()}",
"io.osparc.autoscaling.version": f"{VERSION}",
"io.osparc.autoscaling.monitored_nodes_labels": json.dumps(
"io.simcore.autoscaling.created": f"{datetime.utcnow()}",
"io.simcore.autoscaling.version": f"{VERSION}",
"io.simcore.autoscaling.monitored_nodes_labels": json.dumps(
app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS
),
"io.osparc.autoscaling.monitored_services_labels": json.dumps(
"io.simcore.autoscaling.monitored_services_labels": json.dumps(
app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_SERVICE_LABELS
),
},
startup_script=await utils_docker.get_docker_swarm_join_bash_command(),
)
logger.info(
"a new instance was created with %s", f"{new_instance_dns_name=}"
)
# NOTE: new_instance_dns_name is of type ip-123-23-23-3.ec2.internal and we need only the first part

# NOTE: new_instance_dns_name is of type ip-123-23-23-3.ec2.internal and we need only the first part
if match := re.match(_EC2_INTERNAL_DNS_RE, new_instance_dns_name):
new_instance_dns_name = match.group(1)
new_node = await utils_docker.wait_for_node(new_instance_dns_name)
Expand All @@ -129,6 +119,12 @@ async def check_dynamic_resources(app: FastAPI) -> None:
},
available=True,
)
await rabbitmq.post_log_message(
app,
task,
"cluster was scaled up and is now ready to run service",
logging.INFO,
)
# NOTE: in this first trial we start one instance at a time
# In the next iteration, some tasks might already run with that instance
break
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,50 @@
import logging

from fastapi import FastAPI
from models_library.generated_models.docker_rest_api import Node, Task
from models_library.rabbitmq_messages import AutoscalingStatus, RabbitAutoscalingMessage
from models_library.rabbitmq_messages import (
AutoscalingStatus,
LoggerRabbitMessage,
RabbitAutoscalingMessage,
)
from servicelib.logging_utils import log_catch

from ..models import Resources, SimcoreServiceDockerLabelKeys
from ..rabbitmq import post_message

from ..models import Resources
logger = logging.getLogger(__name__)


def create_rabbit_message(
async def post_state_message(
app: FastAPI,
monitored_nodes: list[Node],
cluster_total_resources: Resources,
cluster_used_resources: Resources,
pending_tasks: list[Task],
status: AutoscalingStatus,
) -> RabbitAutoscalingMessage:
return RabbitAutoscalingMessage(
origin=app.title,
number_monitored_nodes=len(monitored_nodes),
cluster_total_resources=cluster_total_resources.dict(),
cluster_used_resources=cluster_used_resources.dict(),
number_pending_tasks_without_resources=len(pending_tasks),
status=status,
)
) -> None:
with log_catch(logger, reraise=False):
message = RabbitAutoscalingMessage(
origin=app.title,
number_monitored_nodes=len(monitored_nodes),
cluster_total_resources=cluster_total_resources.dict(),
cluster_used_resources=cluster_used_resources.dict(),
number_pending_tasks_without_resources=len(pending_tasks),
status=AutoscalingStatus.SCALING_UP
if pending_tasks
else AutoscalingStatus.IDLE,
)
logger.debug("autoscaling state: %s", message)
await post_message(app, message)


async def post_log_message(app: FastAPI, task: Task, log: str, level: int):
with log_catch(logger, reraise=False):
simcore_label_keys = SimcoreServiceDockerLabelKeys.from_docker_task(task)
message = LoggerRabbitMessage(
node_id=simcore_label_keys.node_id,
user_id=simcore_label_keys.user_id,
project_id=simcore_label_keys.project_id,
messages=[log],
)
logger.log(level, message)
await post_message(app, message)
10 changes: 10 additions & 0 deletions services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from settings_library.rabbit import RabbitSettings
from simcore_service_autoscaling.core.application import create_app
from simcore_service_autoscaling.core.settings import ApplicationSettings, EC2Settings
from simcore_service_autoscaling.models import SimcoreServiceDockerLabelKeys
from simcore_service_autoscaling.utils_aws import EC2Client
from simcore_service_autoscaling.utils_aws import ec2_client as autoscaling_ec2_client
from tenacity import retry
Expand Down Expand Up @@ -537,3 +538,12 @@ def host_cpu_count() -> int:
@pytest.fixture
def host_memory_total() -> ByteSize:
return ByteSize(psutil.virtual_memory().total)


@pytest.fixture
def osparc_docker_label_keys(
faker: Faker,
) -> SimcoreServiceDockerLabelKeys:
return SimcoreServiceDockerLabelKeys.parse_obj(
dict(user_id=faker.pyint(), project_id=faker.uuid4(), node_id=faker.uuid4())
)
14 changes: 2 additions & 12 deletions services/autoscaling/tests/unit/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import aiodocker
import pytest
from faker import Faker
from models_library.generated_models.docker_rest_api import Task
from pydantic import ValidationError, parse_obj_as
from simcore_service_autoscaling.models import SimcoreServiceDockerLabelKeys
Expand All @@ -31,15 +30,6 @@ async def test_task_ownership_from_task_with_missing_labels_raises(
SimcoreServiceDockerLabelKeys.from_docker_task(service_tasks[0])


@pytest.fixture
def osparc_docker_label_keys(
faker: Faker,
) -> SimcoreServiceDockerLabelKeys:
return SimcoreServiceDockerLabelKeys.parse_obj(
dict(user_id=faker.pyint(), project_id=faker.uuid4(), node_id=faker.uuid4())
)


def test_osparc_docker_label_keys_to_docker_labels(
osparc_docker_label_keys: SimcoreServiceDockerLabelKeys,
):
Expand All @@ -56,14 +46,14 @@ async def test_task_ownership_from_task(
task_template: dict[str, Any],
osparc_docker_label_keys: SimcoreServiceDockerLabelKeys,
):
service_missing_osparc_labels = await create_service(
service_with_labels = await create_service(
task_template,
osparc_docker_label_keys.to_docker_labels(),
)
service_tasks = parse_obj_as(
list[Task],
await async_docker_client.tasks.list(
filters={"service": service_missing_osparc_labels["Spec"]["Name"]}
filters={"service": service_with_labels["Spec"]["Name"]}
),
)
assert service_tasks
Expand Down
114 changes: 114 additions & 0 deletions services/autoscaling/tests/unit/test_utils_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# pylint:disable=unused-variable
# pylint:disable=unused-argument
# pylint:disable=redefined-outer-name


from typing import Any, Awaitable, Callable, Mapping

import aiodocker
from faker import Faker
from fastapi import FastAPI
from models_library.generated_models.docker_rest_api import Task
from models_library.rabbitmq_messages import LoggerRabbitMessage
from pydantic import parse_obj_as
from pytest_mock.plugin import MockerFixture
from servicelib.rabbitmq import RabbitMQClient
from settings_library.rabbit import RabbitSettings
from simcore_service_autoscaling.models import SimcoreServiceDockerLabelKeys
from simcore_service_autoscaling.utils.rabbitmq import post_log_message
from tenacity._asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed

_TENACITY_RETRY_PARAMS = dict(
reraise=True,
retry=retry_if_exception_type(AssertionError),
stop=stop_after_delay(30),
wait=wait_fixed(0.5),
)


# Selection of core and tool services started in this swarm fixture (integration)
pytest_simcore_core_services_selection = [
"rabbit",
]

pytest_simcore_ops_services_selection = []


async def test_post_log_message(
disable_dynamic_service_background_task,
enabled_rabbitmq: RabbitSettings,
initialized_app: FastAPI,
rabbit_client: RabbitMQClient,
mocker: MockerFixture,
async_docker_client: aiodocker.Docker,
create_service: Callable[
[dict[str, Any], dict[str, str]], Awaitable[Mapping[str, Any]]
],
task_template: dict[str, Any],
osparc_docker_label_keys: SimcoreServiceDockerLabelKeys,
faker: Faker,
):
mocked_message_handler = mocker.AsyncMock(return_value=True)
await rabbit_client.subscribe(
LoggerRabbitMessage.get_channel_name(), mocked_message_handler
)

service_with_labels = await create_service(
task_template,
osparc_docker_label_keys.to_docker_labels(),
)
service_tasks = parse_obj_as(
list[Task],
await async_docker_client.tasks.list(
filters={"service": service_with_labels["Spec"]["Name"]}
),
)
assert service_tasks
assert len(service_tasks) == 1

log_message = faker.pystr()
await post_log_message(initialized_app, service_tasks[0], log_message, 0)

async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS):
with attempt:
print(
f"--> checking for message in rabbit exchange {LoggerRabbitMessage.get_channel_name()}, {attempt.retry_state.retry_object.statistics}"
)
mocked_message_handler.assert_called_once_with(
LoggerRabbitMessage(
node_id=osparc_docker_label_keys.node_id,
project_id=osparc_docker_label_keys.project_id,
user_id=osparc_docker_label_keys.user_id,
messages=[log_message],
)
.json()
.encode()
)
print("... message received")


async def test_post_log_message_does_not_raise_if_service_has_no_labels(
disable_dynamic_service_background_task,
enabled_rabbitmq: RabbitSettings,
initialized_app: FastAPI,
async_docker_client: aiodocker.Docker,
create_service: Callable[[dict[str, Any]], Awaitable[Mapping[str, Any]]],
task_template: dict[str, Any],
faker: Faker,
):
service_without_labels = await create_service(task_template)
service_tasks = parse_obj_as(
list[Task],
await async_docker_client.tasks.list(
filters={"service": service_without_labels["Spec"]["Name"]}
),
)
assert service_tasks
assert len(service_tasks) == 1

# this shall not raise any exception even if the task does not contain
# the necessary labels
await post_log_message(initialized_app, service_tasks[0], faker.pystr(), 0)

0 comments on commit 4377e7e

Please sign in to comment.