Skip to content

Commit

Permalink
✨ Autoscaling: connect with rabbitmq (#3620)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Dec 6, 2022
1 parent 57c8805 commit 3701d49
Show file tree
Hide file tree
Showing 44 changed files with 954 additions and 145 deletions.
12 changes: 11 additions & 1 deletion packages/models-library/src/models_library/basic_regex.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
#

# Universally unique Identifier. Pattern taken from https://stackoverflow.com/questions/136505/searching-for-uuids-in-text-with-regex

import re
from typing import Final

UUID_RE_BASE = (
r"[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}"
Expand Down Expand Up @@ -60,3 +61,12 @@
# the digits 0 through 9, and the space character.
# They may not be only numerals.
# SEE # https://www.twilio.com/docs/glossary/what-alphanumeric-sender-id


# Docker
DOCKER_LABEL_KEY_REGEX: Final[re.Pattern] = re.compile(
# NOTE: https://docs.docker.com/config/labels-custom-metadata/#key-format-recommendations
r"^(?!(\.|\-|com.docker\.|io.docker\.|org.dockerproject\.|\d))(?!.*(--|\.\.))[a-z0-9\.-]+(?<![\d\.\-])$"
)
DOCKER_IMAGE_KEY_RE = r"[\w/-]+"
DOCKER_IMAGE_VERSION_RE = r"[\w/.]+"
11 changes: 6 additions & 5 deletions packages/models-library/src/models_library/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

from pydantic import ConstrainedStr, constr

DOCKER_IMAGE_KEY_RE = r"[\w/-]+"
DOCKER_IMAGE_VERSION_RE = r"[\w/.]+"
from .basic_regex import (
DOCKER_IMAGE_KEY_RE,
DOCKER_IMAGE_VERSION_RE,
DOCKER_LABEL_KEY_REGEX,
)

DockerImageKey = constr(regex=DOCKER_IMAGE_KEY_RE)
DockerImageVersion = constr(regex=DOCKER_IMAGE_VERSION_RE)
Expand All @@ -13,6 +16,4 @@
class DockerLabelKey(ConstrainedStr):
# NOTE: https://docs.docker.com/config/labels-custom-metadata/#key-format-recommendations
# good practice: use reverse DNS notation
regex: Optional[re.Pattern[str]] = re.compile(
r"^(?!(\.|\-|com.docker\.|io.docker\.|org.dockerproject\.))(?!.*(--|\.\.))[a-z0-9\.-]+$"
)
regex: Optional[re.Pattern[str]] = DOCKER_LABEL_KEY_REGEX
37 changes: 27 additions & 10 deletions packages/models-library/src/models_library/rabbitmq_messages.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from enum import Enum
from typing import Literal, Optional
from typing import Any, Literal, Optional

from models_library.projects import ProjectID
from models_library.projects_nodes import NodeID
from models_library.projects_state import RunningState
from models_library.users import UserID
from pydantic import BaseModel
from pydantic.types import NonNegativeFloat
from simcore_postgres_database.models.comp_tasks import NodeClass


class RabbitEventMessageType(str, Enum):
Expand All @@ -16,38 +15,56 @@ class RabbitEventMessageType(str, Enum):

class RabbitMessageBase(BaseModel):
channel_name: str
node_id: NodeID
user_id: UserID
project_id: ProjectID

@classmethod
def get_channel_name(cls) -> str:
# NOTE: this returns the channel type name
return cls.__fields__["channel_name"].default


class LoggerRabbitMessage(RabbitMessageBase):
class NodeMessageBase(BaseModel):
node_id: NodeID
user_id: UserID
project_id: ProjectID


class LoggerRabbitMessage(RabbitMessageBase, NodeMessageBase):
channel_name: Literal["simcore.services.logs"] = "simcore.services.logs"
messages: list[str]


class EventRabbitMessage(RabbitMessageBase):
class EventRabbitMessage(RabbitMessageBase, NodeMessageBase):
channel_name: Literal["simcore.services.events"] = "simcore.services.events"
action: RabbitEventMessageType


class ProgressRabbitMessage(RabbitMessageBase):
class ProgressRabbitMessage(RabbitMessageBase, NodeMessageBase):
channel_name: Literal["simcore.services.progress"] = "simcore.services.progress"
progress: NonNegativeFloat


class InstrumentationRabbitMessage(RabbitMessageBase):
class InstrumentationRabbitMessage(RabbitMessageBase, NodeMessageBase):
channel_name: Literal[
"simcore.services.instrumentation"
] = "simcore.services.instrumentation"
metrics: str
service_uuid: NodeID
service_type: NodeClass
service_type: str
service_key: str
service_tag: str
result: Optional[RunningState] = None


class AutoscalingStatus(str, Enum):
IDLE = "IDLE"
SCALING_UP = "SCALING_UP"


class RabbitAutoscalingMessage(RabbitMessageBase):
channel_name: Literal["io.simcore.autoscaling"] = "io.simcore.autoscaling"
origin: str
number_monitored_nodes: int
cluster_total_resources: dict[str, Any]
cluster_used_resources: dict[str, Any]
number_pending_tasks_without_resources: int
status: AutoscalingStatus
75 changes: 71 additions & 4 deletions packages/models-library/tests/test_basic_regex.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
# pylint:disable=protected-access
import re
from datetime import datetime
from typing import Any, Optional, Sequence
from typing import Any, Optional, Pattern, Sequence, Union

import pytest
from models_library.basic_regex import (
DATE_RE,
DOCKER_LABEL_KEY_REGEX,
PUBLIC_VARIABLE_NAME_RE,
SEMANTIC_VERSION_RE_W_CAPTURE_GROUPS,
SEMANTIC_VERSION_RE_W_NAMED_GROUPS,
Expand All @@ -26,18 +27,18 @@


def assert_match_and_get_capture(
regex_str: str,
regex_or_str: Union[str, Pattern[str]],
test_str: str,
expected: Any,
*,
group_names: Optional[tuple[str]] = None,
) -> Optional[Sequence]:
match = re.match(regex_str, test_str)
match = re.match(regex_or_str, test_str)
if expected is INVALID:
assert match is None
elif expected is VALID:
assert match is not None
print(regex_str, "captured:", match.group(), "->", match.groups())
print(regex_or_str, "captured:", match.group(), "->", match.groups())
else:
assert match
captured = match.groups()
Expand Down Expand Up @@ -270,3 +271,69 @@ def test_TWILIO_ALPHANUMERIC_SENDER_ID_RE(sample, expected):
# the digits 0 through 9, and the space character.

assert_match_and_get_capture(TWILIO_ALPHANUMERIC_SENDER_ID_RE, sample, expected)


@pytest.mark.parametrize(
"sample, expected",
[
("com.docker.*", INVALID), # reserved
("io.docker.*", INVALID), # reserved
("org.dockerproject.*", INVALID), # reserved
("com.example.some-label", VALID), # valid
(
"0sadfjh.sadf-dskhj",
INVALID,
), # starts with digit
(
"sadfjh.sadf-dskhj",
VALID,
), # only allow lowercasealphanumeric, being and end with letter, no consecutive -, .
(
"sadfjh.sadf-dskhj0",
INVALID,
), # ends with digit
(
"sadfjh.sadf-ds0khj",
VALID,
), # only allow lowercasealphanumeric, being and end with letter, no consecutive -, .
(
"sadfjh.EAGsadf-ds0khj",
INVALID,
), # upper case
(
"sadfjh..sadf-ds0khj",
INVALID,
), # double dot
(
"sadfjh.sadf--ds0khj",
INVALID,
), # double dash
(
"io.simcore.some234.cool.label",
VALID,
), # only allow lowercasealphanumeric, being and end with letter, no consecutive -, .
(
".io.simcore.some234.cool",
INVALID,
), # starts with dot
(
"io.simcore.some234.cool.",
INVALID,
), # ends with dot
(
"-io.simcore.some234.cool",
INVALID,
), # starts with dash
(
"io.simcore.some234.cool-",
INVALID,
), # ends with dash
(
"io.simcore.so_me234.cool",
INVALID,
), # contains invalid character
],
ids=lambda d: f"{d if isinstance(d, str) else ('INVALID' if d is INVALID else 'VALID')}",
)
def test_DOCKER_LABEL_KEY_REGEX(sample, expected):
assert_match_and_get_capture(DOCKER_LABEL_KEY_REGEX, sample, expected)
2 changes: 1 addition & 1 deletion packages/models-library/tests/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
("Node.labels.standard_worker", False),
("Node.labels.standardworker", False),
("node.labels.standardworker", True),
("io.osparc.auto-scaler", True),
("io.simcore.auto-scaler", True),
),
)
def test_docker_label_key(label_key: str, valid: bool):
Expand Down
19 changes: 15 additions & 4 deletions packages/pytest-simcore/src/pytest_simcore/rabbit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import aio_pika
import pytest
import tenacity
from servicelib.rabbitmq import RabbitMQClient
from settings_library.rabbit import RabbitSettings
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_attempt
Expand All @@ -32,7 +33,7 @@ async def wait_till_rabbit_responsive(url: str) -> None:
await connection.close()


@pytest.fixture(scope="function")
@pytest.fixture
async def rabbit_settings(
docker_stack: dict, testing_environ_vars: dict # stack is up
) -> RabbitSettings:
Expand All @@ -55,7 +56,7 @@ async def rabbit_settings(
return settings


@pytest.fixture(scope="function")
@pytest.fixture
async def rabbit_service(
rabbit_settings: RabbitSettings, monkeypatch: pytest.MonkeyPatch
) -> RabbitSettings:
Expand All @@ -73,7 +74,7 @@ async def rabbit_service(
return rabbit_settings


@pytest.fixture(scope="function")
@pytest.fixture
async def rabbit_connection(
rabbit_settings: RabbitSettings,
) -> AsyncIterator[aio_pika.abc.AbstractConnection]:
Expand Down Expand Up @@ -103,7 +104,7 @@ def _connection_close_callback(sender: Any, exc: Optional[BaseException] = None)
assert connection.is_closed


@pytest.fixture(scope="function")
@pytest.fixture
async def rabbit_channel(
rabbit_connection: aio_pika.abc.AbstractConnection,
) -> AsyncIterator[aio_pika.abc.AbstractChannel]:
Expand All @@ -118,3 +119,13 @@ def _channel_close_callback(sender: Any, exc: Optional[BaseException] = None):
channel.close_callbacks.add(_channel_close_callback)
yield channel
assert channel.is_closed


@pytest.fixture
async def rabbit_client(
rabbit_settings: RabbitSettings,
) -> AsyncIterator[RabbitMQClient]:
client = RabbitMQClient("pytest", settings=rabbit_settings)
assert client
yield client
await client.close()
6 changes: 6 additions & 0 deletions packages/service-library/src/servicelib/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ async def _get_channel(self) -> aio_pika.abc.AbstractChannel:
channel.close_callbacks.add(_channel_close_callback)
return channel

async def ping(self) -> bool:
assert self._connection_pool # nosec
async with self._connection_pool.acquire() as connection:
connection: aio_pika.RobustConnection
return connection.connected.is_set()

async def subscribe(
self, exchange_name: str, message_handler: MessageHandler
) -> None:
Expand Down
18 changes: 18 additions & 0 deletions packages/service-library/tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import AsyncIterator, Callable
from unittest import mock

import docker
import pytest
from faker import Faker
from pytest_mock.plugin import MockerFixture
Expand Down Expand Up @@ -190,3 +191,20 @@ def _raise_once_then_true(*args, **kwargs):
await consumer.subscribe(random_exchange_name, mocked_message_parser)
await publisher.publish(random_exchange_name, message)
await _assert_message_received(mocked_message_parser, 3, message)


async def test_rabbit_client_lose_connection(
rabbitmq_client: Callable[[str], RabbitMQClient],
docker_client: docker.client.DockerClient,
):
rabbit_client = rabbitmq_client("pinger")
assert await rabbit_client.ping() is True
# now let's put down the rabbit service
for rabbit_docker_service in (
docker_service
for docker_service in docker_client.services.list()
if "rabbit" in docker_service.name # type: ignore
):
rabbit_docker_service.remove() # type: ignore
await asyncio.sleep(10) # wait for the client to disconnect
assert await rabbit_client.ping() is False
14 changes: 3 additions & 11 deletions packages/settings-library/src/settings_library/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,23 @@

from pydantic.networks import AnyUrl
from pydantic.types import SecretStr
from typing_extensions import TypedDict

from .base import BaseCustomSettings
from .basic_types import PortInt


class Channels(TypedDict):
log: str
progress: str
instrumentation: str
events: str


class RabbitDsn(AnyUrl):
allowed_schemes = {"amqp"}


class RabbitSettings(BaseCustomSettings):
# host
RABBIT_HOST: str = "rabbit"
RABBIT_HOST: str
RABBIT_PORT: PortInt = 5672

# auth
RABBIT_USER: str = "simcore"
RABBIT_PASSWORD: SecretStr = SecretStr("simcore")
RABBIT_USER: str
RABBIT_PASSWORD: SecretStr

@cached_property
def dsn(self) -> str:
Expand Down

This file was deleted.

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from fastapi import FastAPI, Request


def get_app(request: Request) -> FastAPI:
return request.app
Loading

0 comments on commit 3701d49

Please sign in to comment.