Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Autoscaling: connect with rabbitmq #3620

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
beb5603
refactoring
sanderegg Nov 29, 2022
3429d9d
added rabbitmq client
sanderegg Nov 29, 2022
8969f32
refactor remove dependency to postgres-database lib
sanderegg Nov 29, 2022
b50eb83
added cluter state message
sanderegg Nov 29, 2022
782a9e6
create subnet for autoscaling, to connect with the rabbit
sanderegg Nov 29, 2022
1f0d211
remove default values
sanderegg Nov 29, 2022
2b35132
add the rabbitmq for tests
sanderegg Nov 29, 2022
6e599a6
add status check entrypoint
sanderegg Nov 29, 2022
6bfa6a4
check when there is no rabbit setup
sanderegg Nov 29, 2022
0913fd4
added ping method
sanderegg Nov 30, 2022
e22cd80
check status implemented
sanderegg Nov 30, 2022
dd44241
added channel name
sanderegg Nov 30, 2022
81bb09f
test posting to rabbit does not reraise
sanderegg Nov 30, 2022
460d921
silence configuration error when posting
sanderegg Nov 30, 2022
2be2946
passing first rabbit messages from autoscaling
sanderegg Nov 30, 2022
5894dab
fix director-v2 unit tests
sanderegg Nov 30, 2022
4e60499
renaming
sanderegg Dec 5, 2022
aa9c514
refactor
sanderegg Dec 5, 2022
d08d2c4
properly test sending messages
sanderegg Dec 5, 2022
a48b005
adding rabbitmq utils
sanderegg Dec 5, 2022
b18126f
refactor
sanderegg Dec 5, 2022
07e532c
set labels on task
sanderegg Dec 5, 2022
cf701da
added model for osparc labels
sanderegg Dec 5, 2022
4377e7e
added rabbitmq utils, refactor and tests
sanderegg Dec 5, 2022
24611e9
fix unit test with new labels on tasks
sanderegg Dec 5, 2022
5c424d0
@pcrespov review: add tests on regex
sanderegg Dec 6, 2022
6c232c0
keep it the same everywhere
sanderegg Dec 6, 2022
38d44d7
@pcrespov review: use value
sanderegg Dec 6, 2022
f8026dd
sonarcloud
sanderegg Dec 6, 2022
71e68c1
sonarcloud2
sanderegg Dec 6, 2022
d5d0e8a
properly mock Rabbit entries
sanderegg Dec 6, 2022
698a0f0
ensure rabbit is setup correctly
sanderegg Dec 6, 2022
d4fe2c3
make test more reliable
sanderegg Dec 6, 2022
f02d3c7
fixes warnings
sanderegg Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion packages/models-library/src/models_library/basic_regex.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
#

# Universally unique Identifier. Pattern taken from https://stackoverflow.com/questions/136505/searching-for-uuids-in-text-with-regex

import re
from typing import Final

UUID_RE_BASE = (
r"[0-9a-fA-F]{8}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{4}-?[0-9a-fA-F]{12}"
Expand Down Expand Up @@ -60,3 +61,12 @@
# the digits 0 through 9, and the space character.
# They may not be only numerals.
# SEE # https://www.twilio.com/docs/glossary/what-alphanumeric-sender-id


# Docker
DOCKER_LABEL_KEY_REGEX: Final[re.Pattern] = re.compile(
# NOTE: https://docs.docker.com/config/labels-custom-metadata/#key-format-recommendations
r"^(?!(\.|\-|com.docker\.|io.docker\.|org.dockerproject\.|\d))(?!.*(--|\.\.))[a-z0-9\.-]+(?<![\d\.\-])$"
)
DOCKER_IMAGE_KEY_RE = r"[\w/-]+"
DOCKER_IMAGE_VERSION_RE = r"[\w/.]+"
11 changes: 6 additions & 5 deletions packages/models-library/src/models_library/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

from pydantic import ConstrainedStr, constr

DOCKER_IMAGE_KEY_RE = r"[\w/-]+"
DOCKER_IMAGE_VERSION_RE = r"[\w/.]+"
from .basic_regex import (
DOCKER_IMAGE_KEY_RE,
DOCKER_IMAGE_VERSION_RE,
DOCKER_LABEL_KEY_REGEX,
)

DockerImageKey = constr(regex=DOCKER_IMAGE_KEY_RE)
DockerImageVersion = constr(regex=DOCKER_IMAGE_VERSION_RE)
Expand All @@ -13,6 +16,4 @@
class DockerLabelKey(ConstrainedStr):
# NOTE: https://docs.docker.com/config/labels-custom-metadata/#key-format-recommendations
# good practice: use reverse DNS notation
regex: Optional[re.Pattern[str]] = re.compile(
r"^(?!(\.|\-|com.docker\.|io.docker\.|org.dockerproject\.))(?!.*(--|\.\.))[a-z0-9\.-]+$"
)
regex: Optional[re.Pattern[str]] = DOCKER_LABEL_KEY_REGEX
37 changes: 27 additions & 10 deletions packages/models-library/src/models_library/rabbitmq_messages.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from enum import Enum
from typing import Literal, Optional
from typing import Any, Literal, Optional

from models_library.projects import ProjectID
from models_library.projects_nodes import NodeID
from models_library.projects_state import RunningState
from models_library.users import UserID
from pydantic import BaseModel
from pydantic.types import NonNegativeFloat
from simcore_postgres_database.models.comp_tasks import NodeClass


class RabbitEventMessageType(str, Enum):
Expand All @@ -16,38 +15,56 @@ class RabbitEventMessageType(str, Enum):

class RabbitMessageBase(BaseModel):
channel_name: str
node_id: NodeID
user_id: UserID
project_id: ProjectID

@classmethod
def get_channel_name(cls) -> str:
# NOTE: this returns the channel type name
return cls.__fields__["channel_name"].default


class LoggerRabbitMessage(RabbitMessageBase):
class NodeMessageBase(BaseModel):
node_id: NodeID
user_id: UserID
project_id: ProjectID


class LoggerRabbitMessage(RabbitMessageBase, NodeMessageBase):
channel_name: Literal["simcore.services.logs"] = "simcore.services.logs"
messages: list[str]


class EventRabbitMessage(RabbitMessageBase):
class EventRabbitMessage(RabbitMessageBase, NodeMessageBase):
channel_name: Literal["simcore.services.events"] = "simcore.services.events"
action: RabbitEventMessageType


class ProgressRabbitMessage(RabbitMessageBase):
class ProgressRabbitMessage(RabbitMessageBase, NodeMessageBase):
channel_name: Literal["simcore.services.progress"] = "simcore.services.progress"
progress: NonNegativeFloat


class InstrumentationRabbitMessage(RabbitMessageBase):
class InstrumentationRabbitMessage(RabbitMessageBase, NodeMessageBase):
channel_name: Literal[
"simcore.services.instrumentation"
] = "simcore.services.instrumentation"
metrics: str
service_uuid: NodeID
service_type: NodeClass
service_type: str
service_key: str
service_tag: str
result: Optional[RunningState] = None


class AutoscalingStatus(str, Enum):
IDLE = "IDLE"
SCALING_UP = "SCALING_UP"


class RabbitAutoscalingMessage(RabbitMessageBase):
channel_name: Literal["io.simcore.autoscaling"] = "io.simcore.autoscaling"
origin: str
number_monitored_nodes: int
cluster_total_resources: dict[str, Any]
cluster_used_resources: dict[str, Any]
number_pending_tasks_without_resources: int
status: AutoscalingStatus
75 changes: 71 additions & 4 deletions packages/models-library/tests/test_basic_regex.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
# pylint:disable=protected-access
import re
from datetime import datetime
from typing import Any, Optional, Sequence
from typing import Any, Optional, Pattern, Sequence, Union

import pytest
from models_library.basic_regex import (
DATE_RE,
DOCKER_LABEL_KEY_REGEX,
PUBLIC_VARIABLE_NAME_RE,
SEMANTIC_VERSION_RE_W_CAPTURE_GROUPS,
SEMANTIC_VERSION_RE_W_NAMED_GROUPS,
Expand All @@ -26,18 +27,18 @@


def assert_match_and_get_capture(
regex_str: str,
regex_or_str: Union[str, Pattern[str]],
test_str: str,
expected: Any,
*,
group_names: Optional[tuple[str]] = None,
) -> Optional[Sequence]:
match = re.match(regex_str, test_str)
match = re.match(regex_or_str, test_str)
if expected is INVALID:
assert match is None
elif expected is VALID:
assert match is not None
print(regex_str, "captured:", match.group(), "->", match.groups())
print(regex_or_str, "captured:", match.group(), "->", match.groups())
else:
assert match
captured = match.groups()
Expand Down Expand Up @@ -270,3 +271,69 @@ def test_TWILIO_ALPHANUMERIC_SENDER_ID_RE(sample, expected):
# the digits 0 through 9, and the space character.

assert_match_and_get_capture(TWILIO_ALPHANUMERIC_SENDER_ID_RE, sample, expected)


@pytest.mark.parametrize(
"sample, expected",
[
("com.docker.*", INVALID), # reserved
("io.docker.*", INVALID), # reserved
("org.dockerproject.*", INVALID), # reserved
("com.example.some-label", VALID), # valid
(
"0sadfjh.sadf-dskhj",
INVALID,
), # starts with digit
(
"sadfjh.sadf-dskhj",
VALID,
), # only allow lowercasealphanumeric, being and end with letter, no consecutive -, .
(
"sadfjh.sadf-dskhj0",
INVALID,
), # ends with digit
(
"sadfjh.sadf-ds0khj",
VALID,
), # only allow lowercasealphanumeric, being and end with letter, no consecutive -, .
(
"sadfjh.EAGsadf-ds0khj",
INVALID,
), # upper case
(
"sadfjh..sadf-ds0khj",
INVALID,
), # double dot
(
"sadfjh.sadf--ds0khj",
INVALID,
), # double dash
(
"io.simcore.some234.cool.label",
VALID,
), # only allow lowercasealphanumeric, being and end with letter, no consecutive -, .
(
".io.simcore.some234.cool",
INVALID,
), # starts with dot
(
"io.simcore.some234.cool.",
INVALID,
), # ends with dot
(
"-io.simcore.some234.cool",
INVALID,
), # starts with dash
(
"io.simcore.some234.cool-",
INVALID,
), # ends with dash
(
"io.simcore.so_me234.cool",
INVALID,
), # contains invalid character
],
ids=lambda d: f"{d if isinstance(d, str) else ('INVALID' if d is INVALID else 'VALID')}",
)
def test_DOCKER_LABEL_KEY_REGEX(sample, expected):
assert_match_and_get_capture(DOCKER_LABEL_KEY_REGEX, sample, expected)
2 changes: 1 addition & 1 deletion packages/models-library/tests/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
("Node.labels.standard_worker", False),
("Node.labels.standardworker", False),
("node.labels.standardworker", True),
("io.osparc.auto-scaler", True),
("io.simcore.auto-scaler", True),
),
)
def test_docker_label_key(label_key: str, valid: bool):
Expand Down
19 changes: 15 additions & 4 deletions packages/pytest-simcore/src/pytest_simcore/rabbit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import aio_pika
import pytest
import tenacity
from servicelib.rabbitmq import RabbitMQClient
from settings_library.rabbit import RabbitSettings
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_attempt
Expand All @@ -32,7 +33,7 @@ async def wait_till_rabbit_responsive(url: str) -> None:
await connection.close()


@pytest.fixture(scope="function")
@pytest.fixture
async def rabbit_settings(
docker_stack: dict, testing_environ_vars: dict # stack is up
) -> RabbitSettings:
Expand All @@ -55,7 +56,7 @@ async def rabbit_settings(
return settings


@pytest.fixture(scope="function")
@pytest.fixture
async def rabbit_service(
rabbit_settings: RabbitSettings, monkeypatch: pytest.MonkeyPatch
) -> RabbitSettings:
Expand All @@ -73,7 +74,7 @@ async def rabbit_service(
return rabbit_settings


@pytest.fixture(scope="function")
@pytest.fixture
async def rabbit_connection(
rabbit_settings: RabbitSettings,
) -> AsyncIterator[aio_pika.abc.AbstractConnection]:
Expand Down Expand Up @@ -103,7 +104,7 @@ def _connection_close_callback(sender: Any, exc: Optional[BaseException] = None)
assert connection.is_closed


@pytest.fixture(scope="function")
@pytest.fixture
async def rabbit_channel(
rabbit_connection: aio_pika.abc.AbstractConnection,
) -> AsyncIterator[aio_pika.abc.AbstractChannel]:
Expand All @@ -118,3 +119,13 @@ def _channel_close_callback(sender: Any, exc: Optional[BaseException] = None):
channel.close_callbacks.add(_channel_close_callback)
yield channel
assert channel.is_closed


@pytest.fixture
async def rabbit_client(
rabbit_settings: RabbitSettings,
) -> AsyncIterator[RabbitMQClient]:
client = RabbitMQClient("pytest", settings=rabbit_settings)
assert client
yield client
await client.close()
6 changes: 6 additions & 0 deletions packages/service-library/src/servicelib/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ async def _get_channel(self) -> aio_pika.abc.AbstractChannel:
channel.close_callbacks.add(_channel_close_callback)
return channel

async def ping(self) -> bool:
assert self._connection_pool # nosec
async with self._connection_pool.acquire() as connection:
connection: aio_pika.RobustConnection
return connection.connected.is_set()

async def subscribe(
self, exchange_name: str, message_handler: MessageHandler
) -> None:
Expand Down
18 changes: 18 additions & 0 deletions packages/service-library/tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import AsyncIterator, Callable
from unittest import mock

import docker
import pytest
from faker import Faker
from pytest_mock.plugin import MockerFixture
Expand Down Expand Up @@ -190,3 +191,20 @@ def _raise_once_then_true(*args, **kwargs):
await consumer.subscribe(random_exchange_name, mocked_message_parser)
await publisher.publish(random_exchange_name, message)
await _assert_message_received(mocked_message_parser, 3, message)


async def test_rabbit_client_lose_connection(
rabbitmq_client: Callable[[str], RabbitMQClient],
docker_client: docker.client.DockerClient,
):
rabbit_client = rabbitmq_client("pinger")
assert await rabbit_client.ping() is True
# now let's put down the rabbit service
for rabbit_docker_service in (
docker_service
for docker_service in docker_client.services.list()
if "rabbit" in docker_service.name # type: ignore
):
rabbit_docker_service.remove() # type: ignore
await asyncio.sleep(10) # wait for the client to disconnect
assert await rabbit_client.ping() is False
14 changes: 3 additions & 11 deletions packages/settings-library/src/settings_library/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,23 @@

from pydantic.networks import AnyUrl
from pydantic.types import SecretStr
from typing_extensions import TypedDict

from .base import BaseCustomSettings
from .basic_types import PortInt


class Channels(TypedDict):
log: str
progress: str
instrumentation: str
events: str


class RabbitDsn(AnyUrl):
allowed_schemes = {"amqp"}


class RabbitSettings(BaseCustomSettings):
# host
RABBIT_HOST: str = "rabbit"
RABBIT_HOST: str
RABBIT_PORT: PortInt = 5672

# auth
RABBIT_USER: str = "simcore"
RABBIT_PASSWORD: SecretStr = SecretStr("simcore")
RABBIT_USER: str
RABBIT_PASSWORD: SecretStr

@cached_property
def dsn(self) -> str:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from fastapi import FastAPI, Request


def get_app(request: Request) -> FastAPI:
return request.app
Loading