Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨new style dynamic-services shut down when wallet runs out of credits (⚠️ devops) #4922

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
23896d0
director-v2 closes dy-services when credits run out
Oct 25, 2023
a12e072
Merge remote-tracking branch 'upstream/master' into pr-osparc-react-o…
Oct 25, 2023
f6f5d9f
Merge remote-tracking branch 'upstream/master' into pr-osparc-react-o…
Oct 25, 2023
28821dc
refactor
Oct 27, 2023
cdd5218
refactor tests
Oct 27, 2023
b5d6675
extended redis
Oct 27, 2023
907897b
added new dbs
Oct 27, 2023
5a1c291
Merge remote-tracking branch 'upstream/master' into pr-osparc-react-o…
Oct 27, 2023
cde0a49
added osparc_resource_manager
Oct 27, 2023
ea7ee15
Merge remote-tracking branch 'upstream/master' into pr-osparc-react-o…
Oct 27, 2023
59ab1a3
added concurrency test
Oct 27, 2023
8a82057
can now remove not tracked resources
Oct 27, 2023
c0a233c
Merge remote-tracking branch 'upstream/master' into pr-osparc-react-o…
Oct 27, 2023
2fed7c1
removed unused node rights module
Oct 27, 2023
73f7876
refactor resource type
Oct 27, 2023
fcac8f6
refactor osparc resoruce name
Oct 27, 2023
f96547e
typo
Oct 27, 2023
669af96
keep track of dynamic services
Oct 27, 2023
26d7f14
WIP: status monitor
Oct 27, 2023
9cf2650
Merge remote-tracking branch 'upstream/master' into pr-osparc-react-o…
Oct 27, 2023
cd33cb5
refactor
Oct 30, 2023
9126c14
update docstring
Oct 30, 2023
86db2c5
update docstring
Oct 30, 2023
6c09d96
Merge remote-tracking branch 'upstream/master' into pr-osparc-react-o…
Oct 30, 2023
b01f9b0
rename and refactor
Oct 30, 2023
29b3dd6
removed dataclass, causes issues when subclassing
Oct 30, 2023
1f87aa4
services state is no recovered via API calls
Oct 30, 2023
618824a
refactor to use the same client
Oct 30, 2023
a1d6a2e
Merge remote-tracking branch 'upstream/master' into pr-osparc-react-o…
Oct 30, 2023
2f604ea
expanded store and added tests
Oct 31, 2023
749e14c
refactor
Oct 31, 2023
f401764
Merge remote-tracking branch 'upstream/master' into pr-osparc-react-o…
Nov 20, 2023
ca98bcd
removed unused
Nov 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def routing_key(self) -> str | None:


class CreditsLimit(IntEnum):
MIN_CREDITS = 0
SHUTDOWN_SERVICES = 0


class WalletCreditsLimitReachedMessage(RabbitMessageBase):
Expand Down
51 changes: 46 additions & 5 deletions packages/service-library/src/servicelib/background_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime
import logging
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import Final
from typing import Final, cast

from pydantic.errors import PydanticErrorMixin
from servicelib.logging_utils import log_catch, log_context
Expand All @@ -23,19 +23,41 @@ class PeriodicTaskCancellationError(PydanticErrorMixin, Exception):
msg_template: str = "Could not cancel task '{task_name}'"


class ContinueCondition:
def __init__(self) -> None:
self._can_continue: bool = True

def stop(self):
self._can_continue = False

@property
def can_continue(self) -> bool:
return self._can_continue


class _ExtendedTask(asyncio.Task):
def __init__(self, coro, *, loop=None, name=None):
super().__init__(coro=coro, loop=loop, name=name)
self.continue_condition: ContinueCondition | None = None


async def _periodic_scheduled_task(
task: Callable[..., Awaitable[None]],
*,
interval: datetime.timedelta,
continue_condition: ContinueCondition,
task_name: str,
**task_kwargs,
) -> None:
# NOTE: This retries forever unless cancelled
async for attempt in AsyncRetrying(wait=wait_fixed(interval.total_seconds())):
with attempt:
if not continue_condition.can_continue:
logger.debug("'%s' finished periodic actions", task_name)
return
with log_context(
logger,
logging.INFO,
logging.DEBUG,
msg=f"iteration {attempt.retry_state.attempt_number} of '{task_name}'",
), log_catch(logger):
await task(**task_kwargs)
Expand All @@ -53,15 +75,22 @@ def start_periodic_task(
with log_context(
logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'"
):
return asyncio.create_task(
continue_condition = ContinueCondition()
new_periodic_task: asyncio.Task = asyncio.create_task(
_periodic_scheduled_task(
task,
interval=interval,
continue_condition=continue_condition,
task_name=task_name,
**kwargs,
),
name=task_name,
)
# NOTE: adds an additional property to the task object
# which will be used when stopping the priodic task
new_periodic_task = cast(_ExtendedTask, new_periodic_task)
new_periodic_task.continue_condition = continue_condition
return new_periodic_task


async def cancel_task(
Expand Down Expand Up @@ -99,9 +128,21 @@ async def stop_periodic_task(
with log_context(
logger,
logging.DEBUG,
msg=f"cancel periodic background task '{asyncio_task.get_name()}'",
msg=f"stop periodic background task '{asyncio_task.get_name()}'",
):
await cancel_task(asyncio_task, timeout=timeout)
asyncio_task = cast(_ExtendedTask, asyncio_task)
continue_condition: ContinueCondition | None = asyncio_task.continue_condition
if continue_condition:
continue_condition.stop()

_, pending = await asyncio.wait((asyncio_task,), timeout=timeout)
if pending:
with log_context(
logger,
logging.WARNING,
msg=f"could not gracefully stop task '{asyncio_task.get_name()}', cancelling it",
):
await cancel_task(asyncio_task, timeout=timeout)


@contextlib.asynccontextmanager
Expand Down
12 changes: 7 additions & 5 deletions packages/service-library/src/servicelib/redis.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import contextlib
import datetime
import logging
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from typing import AsyncIterator, Final
from typing import Final
from uuid import uuid4

import redis.asyncio as aioredis
Expand Down Expand Up @@ -88,18 +89,19 @@ async def lock_context(
lock_value: bytes | str | None = None,
*,
blocking: bool = False,
blocking_timeout_s: NonNegativeFloat = 5,
blocking_timeout_s: NonNegativeFloat | None = 5,
) -> AsyncIterator[Lock]:
"""Tries to acquire a lock.

:param lock_key: unique name of the lock
:param lock_value: content of the lock, defaults to None
:param blocking: should block here while acquiring the lock, defaults to False
:param blocking_timeout_s: time to wait while acquire a lock before giving up, defaults to 5
:param blocking_timeout_s: time to wait while acquire a lock before giving up,
defaults to ``5``, when ``None`` it will continue forever

:raises CouldNotAcquireLockError: reasons why lock acquisition fails:
1. `blocking==False` the lock was already acquired by some other entity
2. `blocking==True` timeouts out while waiting for lock to be free (another entity holds the lock)
1. ``blocking==False`` the lock was already acquired by some other entity
2. ``blocking==True`` timeouts out while waiting for lock to be free (another entity holds the lock)
"""

total_lock_duration: datetime.timedelta = _DEFAULT_LOCK_TTL
Expand Down
3 changes: 2 additions & 1 deletion packages/service-library/tests/test_background_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,5 @@ async def test_periodic_task_context_manager(
await asyncio.sleep(5 * task_interval.total_seconds())
assert asyncio_task.cancelled() is False
assert asyncio_task.done() is False
assert asyncio_task.cancelled() is True
# NOTE: task is no longer cancelled but gracefully stopped
assert asyncio_task.cancelled() is False
1 change: 1 addition & 0 deletions packages/settings-library/src/settings_library/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class RedisDatabase(int, Enum):
USER_NOTIFICATIONS = 4
ANNOUNCEMENTS = 5
DISTRIBUTED_IDENTIFIERS = 6
CACHES = 7


class RedisSettings(BaseCustomSettings):
Expand Down
2 changes: 0 additions & 2 deletions scripts/precommit/pytest-testit.bash
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#!/bin/bash

# Check for the presence of @pytest.mark.testit in staged files
git diff --cached --name-only | while IFS= read -r file; do
if grep -n '@pytest\.mark\.testit' "$file"; then
sed -i '/@pytest\.mark\.testit/d' "$file"
echo "Removed @pytest.mark.testit from file '$file'"
exit 1
fi
done
3 changes: 2 additions & 1 deletion services/autoscaling/tests/manual/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ services:
scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,
user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,
announcements:${REDIS_HOST}:${REDIS_PORT}:5,
distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6
distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6,
caches:${REDIS_HOST}:${REDIS_PORT}:7
# If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml

autoscaling:
Expand Down
3 changes: 2 additions & 1 deletion services/clusters-keeper/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ services:
scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,
user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,
announcements:${REDIS_HOST}:${REDIS_PORT}:5,
distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6
distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6,
caches:${REDIS_HOST}:${REDIS_PORT}:7
# If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml

clusters-keeper:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
)
from models_library.projects_nodes import NodeID
from servicelib.logging_utils import log_decorator
from servicelib.osparc_resource_manager import OsparcResourceManager
from starlette.datastructures import URL

from ...core.dynamic_services_settings import DynamicServicesSettings
Expand All @@ -23,9 +24,9 @@ async def get_service_base_url(
director_v0_client: DirectorV0Client = Depends(get_director_v0_client),
) -> URL:
# get the service details
service_details: RunningDynamicServiceDetails = (
await director_v0_client.get_running_service_details(node_uuid)
)
service_details: (
RunningDynamicServiceDetails
) = await director_v0_client.get_running_service_details(node_uuid)
return URL(service_details.legacy_service_url)


Expand All @@ -42,3 +43,8 @@ def get_dynamic_services_settings(request: Request) -> DynamicServicesSettings:
def get_scheduler(request: Request) -> DynamicSidecarsScheduler:
scheduler: DynamicSidecarsScheduler = request.app.state.dynamic_sidecar_scheduler
return scheduler


def get_osparc_resource_manager(request: Request) -> OsparcResourceManager:
manager: OsparcResourceManager = request.app.state.osparc_resource_manager
return manager
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from servicelib.fastapi.requests_decorators import cancel_on_disconnect
from servicelib.json_serialization import json_dumps
from servicelib.logging_utils import log_decorator
from servicelib.osparc_resource_manager import OsparcResourceManager, OsparcResourceType
from servicelib.rabbitmq import RabbitMQClient
from servicelib.utils import logged_gather
from simcore_service_director_v2.core.dynamic_services_settings.scheduler import (
Expand Down Expand Up @@ -54,6 +55,7 @@
from ..dependencies.director_v0 import get_director_v0_client
from ..dependencies.dynamic_services import (
get_dynamic_services_settings,
get_osparc_resource_manager,
get_scheduler,
get_service_base_url,
get_services_client,
Expand Down Expand Up @@ -113,14 +115,17 @@ async def create_dynamic_service(
DynamicServicesSettings, Depends(get_dynamic_services_settings)
],
scheduler: Annotated[DynamicSidecarsScheduler, Depends(get_scheduler)],
osparc_resoruce_manager: Annotated[
OsparcResourceManager, Depends(get_osparc_resource_manager)
],
x_dynamic_sidecar_request_dns: str = Header(...),
x_dynamic_sidecar_request_scheme: str = Header(...),
x_simcore_user_agent: str = Header(...),
) -> DynamicServiceGet | RedirectResponse:
simcore_service_labels: SimcoreServiceLabels = (
await director_v0_client.get_service_labels(
service=ServiceKeyVersion(key=service.key, version=service.version)
)
simcore_service_labels: (
SimcoreServiceLabels
) = await director_v0_client.get_service_labels(
service=ServiceKeyVersion(key=service.key, version=service.version)
)

# LEGACY (backwards compatibility)
Expand All @@ -138,6 +143,9 @@ async def create_dynamic_service(
},
)
logger.debug("Redirecting %s", redirect_url_with_query)
await osparc_resoruce_manager.add(
OsparcResourceType.DYNAMIC_SERVICE, identifier=f"{service.node_uuid}"
)
return RedirectResponse(str(redirect_url_with_query))

#
Expand All @@ -153,6 +161,9 @@ async def create_dynamic_service(
request_simcore_user_agent=x_simcore_user_agent,
can_save=service.can_save,
)
await osparc_resoruce_manager.add(
OsparcResourceType.DYNAMIC_SERVICE, identifier=f"{service.node_uuid}"
)

return await scheduler.get_stack_status(service.node_uuid)

Expand Down Expand Up @@ -194,6 +205,9 @@ async def stop_dynamic_service(
dynamic_services_settings: Annotated[
DynamicServicesSettings, Depends(get_dynamic_services_settings)
],
osparc_resoruce_manager: Annotated[
OsparcResourceManager, Depends(get_osparc_resource_manager)
],
*,
can_save: bool | None = True,
) -> NoContentResponse | RedirectResponse:
Expand Down Expand Up @@ -242,6 +256,10 @@ def _log_error(retry_state: RetryCallState):
if scheduler.is_service_tracked(node_uuid):
raise TryAgain

await osparc_resoruce_manager.remove(
OsparcResourceType.DYNAMIC_SERVICE, identifier=f"{node_uuid}"
)

return NoContentResponse()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
dynamic_sidecar,
osparc_variables_substitutions,
rabbitmq,
redis,
remote_debug,
resource_usage_tracker_client,
service_status_observer,
storage,
)
from .errors import (
Expand Down Expand Up @@ -136,6 +138,9 @@ def init_app(settings: AppSettings | None = None) -> FastAPI:

osparc_variables_substitutions.setup(app)

redis.setup(app)
service_status_observer.setup(app)

if settings.SC_BOOT_MODE == BootModeEnum.DEBUG:
remote_debug.setup(app)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ class ComputationalTaskNotFoundError(PydanticErrorMixin, DirectorException):
msg_template = "Computational task {node_id} not found"


class NodeRightsAcquireError(PydanticErrorMixin, DirectorException):
msg_template = "Could not acquire a lock for {docker_node_id} since all {slots} slots are used."


#
# SCHEDULER ERRORS
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from models_library.projects_nodes_io import NodeID
from models_library.service_settings_labels import SimcoreServiceLabels
from models_library.users import UserID
from models_library.wallets import WalletID
from servicelib.fastapi.long_running_tasks.client import ProgressCallback
from servicelib.fastapi.long_running_tasks.server import TaskProgress

Expand Down Expand Up @@ -88,6 +89,7 @@ async def add_service(
def is_service_tracked(self, node_uuid: NodeID) -> bool:
"""returns True if service is being actively observed"""

@abstractmethod
def list_services(
self,
*,
Expand All @@ -105,6 +107,14 @@ async def mark_service_for_removal(
) -> None:
"""The service will be removed as soon as possible"""

@abstractmethod
async def mark_all_services_in_wallet_for_removal(
self, wallet_id: WalletID
) -> None:
"""When a certain threshold is reached a message for removing all the
services running under a certain wallet_id will be received.
"""

@abstractmethod
async def is_service_awaiting_manual_intervention(self, node_uuid: NodeID) -> bool:
"""
Expand Down
Loading