Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Usage of nested-asyncio in the Python client #774

Open
blublinsky opened this issue Feb 7, 2025 · 11 comments
Open

Usage of nested-asyncio in the Python client #774

blublinsky opened this issue Feb 7, 2025 · 11 comments
Labels
bug Something isn't working

Comments

@blublinsky
Copy link

blublinsky commented Feb 7, 2025

Describe the bug
The current implementation of the model registry client uses nested async (registry/blob/main/clients/python/src/model_registry/_client.py#L130, which works fine for standalone client usage, but fails when running inside Ray or any other framework using uviloop.

To Reproduce
Here is a simple failing Ray sample:

import ray
import asyncio
from model_registry import ModelRegistry

runtime_env = {
    "pip": ["model-registry==0.2.10"],
}


@ray.remote(runtime_env=runtime_env)
class ModelRegistryClient:
    def __init__(
            self,
            url: str="http://localhost",
            port: int=8081,
            user: str="boris",
            secure: bool=False
    ):
        self.registry = ModelRegistry(
            server_address=url, port=port, author=user, is_secure=secure
        )

    async def list(self) -> tuple[list, list]:
        lh_list = []
        hf_list = []
        rm_iter = self.registry.get_registered_models().page_size(20)
        for rm in rm_iter:
            versions_iter = (
                self.registry.get_model_versions(rm.name).page_size(20).order_by_id().descending()
            )
            # we use the latest version
            rv = next(versions_iter)
            try:
                origin = rv.custom_properties["model_origin"]
            except:
                print(f"Custom properties {rv.custom_properties} does not have model_origin")
                continue
            if origin == "lakehouse":
                # lakehouse
                lh_list.append(
                    {
                        "name": rm.name,
                        "env": rv.custom_properties["env"],
                        "namespace": rv.custom_properties["namespace"],
                        "revision": rv.custom_properties["revision"],
                    }
                )
            else:
                # hugging face
                hf_list.append({"name": rm.name})
        return lh_list, hf_list

async def main():
    rg = ModelRegistryClient.remote()
    lh, hf = await rg.list.remote()
    print(lh)
    print(hf)

if __name__ == "__main__":
    asyncio.run(main())

Expected behavior
This should of produce:

2025-02-09 19:28:26,810	INFO worker.py:1777 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
(ModelRegistryClient pid=4232) /Users/borisl/Projects/vllm_actuator/model_registry/ray_version.py:19: UserWarning: User access token is missing
(ModelRegistryClient pid=4232)   self.registry = ModelRegistry(
[{'name': 'granite-8b-japanese-base-v1', 'env': 'prod', 'namespace': 'base_training', 'revision': '20240806T153614'}, {'name': 'allam-1-13b-instruct', 'env': 'prod', 'namespace': 'watsonx', 'revision': '20240607'}]
[{'name': 'meta-llama/Llama-3.1-8B-Instruct'}, {'name': 'ibm-granite/granite-3.0-8b-base'}]

Process finished with exit code 0

but instead it produces the following exception:

2025-02-09 19:22:52,760	INFO worker.py:1777 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
(ModelRegistryClient pid=4084) Exception raised in creation task: The actor died because of an error raised in its creation task, ray::ModelRegistryClient.__init__() (pid=4084, ip=127.0.0.1, actor_id=663d256bf364c0d436685f3201000000, repr=<ray_version.ModelRegistryClient object at 0x108fcd420>)
(ModelRegistryClient pid=4084)   File "/opt/homebrew/Cellar/[email protected]/3.10.14/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py", line 458, in result
(ModelRegistryClient pid=4084)     return self.__get_result()
(ModelRegistryClient pid=4084)   File "/opt/homebrew/Cellar/[email protected]/3.10.14/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
(ModelRegistryClient pid=4084)     raise self._exception
(ModelRegistryClient pid=4084)   File "/Users/borisl/Projects/vllm_actuator/model_registry/ray_version.py", line 20, in __init__
(ModelRegistryClient pid=4084)     self.registry = ModelRegistry(
(ModelRegistryClient pid=4084)   File "/opt/homebrew/lib/python3.10/site-packages/model_registry/_client.py", line 53, in __init__
(ModelRegistryClient pid=4084)     nest_asyncio.apply()
(ModelRegistryClient pid=4084)   File "/opt/homebrew/lib/python3.10/site-packages/nest_asyncio.py", line 19, in apply
(ModelRegistryClient pid=4084)     _patch_loop(loop)
(ModelRegistryClient pid=4084)   File "/opt/homebrew/lib/python3.10/site-packages/nest_asyncio.py", line 193, in _patch_loop
(ModelRegistryClient pid=4084)     raise ValueError('Can\'t patch loop of type %s' % type(loop))
(ModelRegistryClient pid=4084) ValueError: Can't patch loop of type <class 'uvloop.Loop'>
Traceback (most recent call last):
  File "/Users/borisl/Projects/vllm_actuator/model_registry/ray_version.py", line 62, in <module>
    asyncio.run(main())
  File "/opt/homebrew/Cellar/[email protected]/3.10.14/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/opt/homebrew/Cellar/[email protected]/3.10.14/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/Users/borisl/Projects/vllm_actuator/model_registry/ray_version.py", line 56, in main
    lh, hf = await rg.list.remote()
ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task, ray::ModelRegistryClient.__init__() (pid=4084, ip=127.0.0.1, actor_id=663d256bf364c0d436685f3201000000, repr=<ray_version.ModelRegistryClient object at 0x108fcd420>)
  File "/opt/homebrew/Cellar/[email protected]/3.10.14/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/opt/homebrew/Cellar/[email protected]/3.10.14/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/Users/borisl/Projects/vllm_actuator/model_registry/ray_version.py", line 20, in __init__
    self.registry = ModelRegistry(
  File "/opt/homebrew/lib/python3.10/site-packages/model_registry/_client.py", line 53, in __init__
    nest_asyncio.apply()
  File "/opt/homebrew/lib/python3.10/site-packages/nest_asyncio.py", line 19, in apply
    _patch_loop(loop)
  File "/opt/homebrew/lib/python3.10/site-packages/nest_asyncio.py", line 193, in _patch_loop
    raise ValueError('Can\'t patch loop of type %s' % type(loop))
ValueError: Can't patch loop of type <class 'uvloop.Loop'>

Additional context
I am willing to provide PR

@blublinsky blublinsky added the bug Something isn't working label Feb 7, 2025
@tarilabs
Copy link
Member

I am willing to provide PR

thanks @blublinsky do you want to describe to the community what are your thoughts of the modification/improvement you want to propose here?

@blublinsky
Copy link
Author

blublinsky commented Feb 14, 2025

In the current implementation Model registry client is using nested-asyncio, which is patching a loop to allow additional loop creation
As the error code states the issue here is that uvloop.loop, used in Ray and many other frameworks can not be patched. An alternative approach of wrapping an async request to get a sync response can be found here. I have tried to use it in Model Registry client and it works great in both standalone and Ray-based usage.

@tarilabs
Copy link
Member

In the current implementation Model registry client is using nested-asyncio, which is patching a loop to allow additional loop creation

yes, here:

import nest_asyncio
logger.debug("Setting up reentrant async event loop")
nest_asyncio.apply()

As the error code states the issue here is that uvloop.loop, used in Ray and many other frameworks can not be patched.

Interesting, thanks for the context

An alternative approach of wrapping an async request to get a sync response can be found here. I have tried to use it in Model Registry client and it works great in both standalone and Ray-based usage.

At this point I'm also wondering as well if other REST client generators for python which directly produce sync rest calls 🤔 but your proposal SGTM too, I'm going to raise the attention to this at the next KF Model Registry biweekly (next one is Monday Feb 17th)

@blublinsky
Copy link
Author

Thanks, @tarilabs, please keep me posted on the progression and let me know when I can create a PR. We need this functionality yesterday and for now just patching implementation locally, but would like to have it as part of the project.

@tarilabs
Copy link
Member

ie I'm trying to understand if you are suggesting to change only _client.py to make task submissions (like this one for example) handled by a wrapper strategy/class, or the modifications required extends beyond those user-facing methods?

@tarilabs
Copy link
Member

let me know when I can create a PR

this is always encouraged, but especially if you have a local patch even more encouraged!

@blublinsky
Copy link
Author

blublinsky commented Feb 14, 2025

just the client.

"""Standard client for the model registry."""

from __future__ import annotations

import os
from collections.abc import Mapping
from pathlib import Path
from typing import Any, TypeVar, Union, get_args
from warnings import warn

from model_registry.core import ModelRegistryAPIClient
from model_registry.exceptions import StoreError
from model_registry.types import (
    ListOptions,
    ModelArtifact,
    ModelVersion,
    Pager,
    RegisteredModel,
    SupportedTypes,
)
from async_task_runner import AsyncTaskRunner


ModelTypes = Union[RegisteredModel, ModelVersion, ModelArtifact]
TModel = TypeVar("TModel", bound=ModelTypes)


class ModelRegistry:
    """Model registry client."""

    def __init__(
            self,
            server_address: str,
            port: int = 443,
            *,
            author: str,
            is_secure: bool = True,
            user_token: str | None = None,
            custom_ca: str | None = None,
    ):
        """Constructor.

        Args:
            server_address: Server address.
            port: Server port. Defaults to 443.

        Keyword Args:
            author: Name of the author.
            is_secure: Whether to use a secure connection. Defaults to True.
            user_token: The PEM-encoded user token as a string. Defaults to content of path on envvar KF_PIPELINES_SA_TOKEN_PATH.
            custom_ca: Path to the PEM-encoded root certificates as a string. Defaults to path on envvar CERT.
        """

        # TODO: get remaining args from env
        self._author = author

        if not user_token:
            # /var/run/secrets/kubernetes.io/serviceaccount/token
            sa_token = os.environ.get("KF_PIPELINES_SA_TOKEN_PATH")
            if sa_token:
                user_token = Path(sa_token).read_text()
            else:
                warn("User access token is missing", stacklevel=2)

        if is_secure:
            root_ca = None
            if not custom_ca:
                if cert := os.getenv("CERT"):
                    root_ca = cert
                    # client might have a default CA setup
            else:
                root_ca = custom_ca

            if not user_token:
                msg = "user token must be provided for secure connection"
                raise StoreError(msg)

            self._api = ModelRegistryAPIClient.secure_connection(
                server_address, port, user_token=user_token, custom_ca=root_ca
            )
        elif custom_ca:
            msg = "Custom CA provided without secure connection, conflicting options"
            raise StoreError(msg)
        else:
            self._api = ModelRegistryAPIClient.insecure_connection(
                server_address, port, user_token
            )
        self.runner = AsyncTaskRunner.get_instance()

    async def _register_model(self, name: str, **kwargs) -> RegisteredModel:
        if rm := await self._api.get_registered_model_by_params(name):
            return rm

        return await self._api.upsert_registered_model(
            RegisteredModel(name=name, **kwargs)
        )

    async def _register_new_version(
            self, rm: RegisteredModel, version: str, author: str, /, **kwargs
    ) -> ModelVersion:
        assert rm.id is not None, "Registered model must have an ID"
        if await self._api.get_model_version_by_params(rm.id, version):
            msg = f"Version {version} already exists"
            raise StoreError(msg)

        return await self._api.upsert_model_version(
            ModelVersion(name=version, author=author, **kwargs), rm.id
        )

    async def _register_model_artifact(
            self, mv: ModelVersion, name: str, uri: str, /, **kwargs
    ) -> ModelArtifact:
        assert mv.id is not None, "Model version must have an ID"
        return await self._api.upsert_model_version_artifact(
            ModelArtifact(name=name, uri=uri, **kwargs), mv.id
        )

    def register_model(
            self,
            name: str,
            uri: str,
            *,
            model_format_name: str,
            model_format_version: str,
            version: str,
            storage_key: str | None = None,
            storage_path: str | None = None,
            service_account_name: str | None = None,
            author: str | None = None,
            owner: str | None = None,
            description: str | None = None,
            metadata: Mapping[str, SupportedTypes] | None = None,
    ) -> RegisteredModel:
        """Register a model.

        This registers a model in the model registry. The model is not downloaded, and has to be stored prior to
        registration.

        Most models can be registered using their URI, along with optional connection-specific parameters, `storage_key`
        and `storage_path` or, simply a `service_account_name`.
        URI builder utilities are recommended when referring to specialized storage; for example `utils.s3_uri_from`
        helper when using S3 object storage data connections.

        Args:
            name: Name of the model.
            uri: URI of the model.

        Keyword Args:
            version: Version of the model. Has to be unique.
            model_format_name: Name of the model format.
            model_format_version: Version of the model format.
            description: Description of the model.
            author: Author of the model. Defaults to the client author.
            owner: Owner of the model. Defaults to the client author.
            storage_key: Storage key.
            storage_path: Storage path.
            service_account_name: Service account name.
            metadata: Additional version metadata. Defaults to values returned by `default_metadata()`.

        Returns:
            Registered model.
        """
        rm = self.runner.run(self._register_model(name, owner=owner or self._author))
        mv = self.runner.run(
            self._register_new_version(
                rm,
                version,
                author or self._author,
                description=description,
                custom_properties=metadata or {},
                )
        )
        self.runner.run(
            self._register_model_artifact(
                mv,
                name,
                uri,
                model_format_name=model_format_name,
                model_format_version=model_format_version,
                storage_key=storage_key,
                storage_path=storage_path,
                service_account_name=service_account_name,
            )
        )

        return rm

    def update(self, model: TModel) -> TModel:
        """Update a model."""
        if not model.id:
            msg = "Model must have an ID"
            raise StoreError(msg)
        if not isinstance(model, get_args(ModelTypes)):
            msg = f"Model must be one of {get_args(ModelTypes)}"
            raise StoreError(msg)
        if isinstance(model, RegisteredModel):
            return self.runner.run(self._api.upsert_registered_model(model))
        if isinstance(model, ModelVersion):
            return self.runner.run(self._api.upsert_model_version(model, None))
        return self.runner.run(self._api.upsert_model_artifact(model))

    def register_hf_model(
            self,
            repo: str,
            path: str,
            *,
            version: str,
            model_format_name: str,
            model_format_version: str,
            author: str | None = None,
            owner: str | None = None,
            model_name: str | None = None,
            description: str | None = None,
            git_ref: str = "main",
    ) -> RegisteredModel:
        """Register a Hugging Face model.

        This imports a model from Hugging Face hub and registers it in the model registry.
        Note that the model is not downloaded.

        Args:
            repo: Name of the repository from Hugging Face hub.
            path: URI of the model.

        Keyword Args:
            version: Version of the model. Has to be unique.
            model_format_name: Name of the model format.
            model_format_version: Version of the model format.
            author: Author of the model. Defaults to repo owner.
            owner: Owner of the model. Defaults to the client author.
            model_name: Name of the model. Defaults to the repo name.
            description: Description of the model.
            git_ref: Git reference to use. Defaults to `main`.

        Returns:
            Registered model.
        """
        try:
            from huggingface_hub import HfApi, hf_hub_url, utils
        except ImportError as e:
            msg = """package `huggingface-hub` is not installed.
            To import models from Hugging Face Hub, start by installing the `huggingface-hub` package, either directly or as an
            extra (available as `model-registry[hf]`), e.g.:
            ```sh
            !pip install --pre model-registry[hf]
            ```
            or
            ```sh
            !pip install huggingface-hub
            ```
            """
            raise StoreError(msg) from e

        api = HfApi()
        try:
            model_info = api.model_info(repo, revision=git_ref)
        except utils.RepositoryNotFoundError as e:
            msg = f"Repository {repo} does not exist"
            raise StoreError(msg) from e
        except utils.RevisionNotFoundError as e:
            # TODO: as all hf-hub client calls default to using main, should we provide a tip?
            msg = f"Revision {git_ref} does not exist"
            raise StoreError(msg) from e

        if not author:
            # model author can be None if the repo is in a "global" namespace (i.e. no / in repo).
            if model_info.author is None:
                model_author = "unknown"
                warn(
                    "Model author is unknown. This is likely because the model is in a global namespace.",
                    stacklevel=2,
                )
            else:
                model_author = model_info.author
        else:
            model_author = author
        source_uri = hf_hub_url(repo, path, revision=git_ref)
        metadata = {
            "repo": repo,
            "source_uri": source_uri,
            "model_origin": "huggingface_hub",
            "model_author": model_author,
        }
        # card_data is the new field, but let's use the old one for backwards compatibility.
        if card_data := model_info.cardData:
            metadata.update(
                {
                    k: v
                    for k, v in card_data.to_dict().items()
                    # TODO: (#151) preserve tags, possibly other complex metadata
                    if isinstance(v, get_args(SupportedTypes))
                }
            )
        return self.register_model(
            model_name or model_info.id,
            source_uri,
            author=author or model_author,
            owner=owner or self._author,
            version=version,
            model_format_name=model_format_name,
            model_format_version=model_format_version,
            description=description,
            storage_path=path,
            metadata=metadata,
            )

    def get_registered_model(self, name: str) -> RegisteredModel | None:
        """Get a registered model.

        Args:
            name: Name of the model.

        Returns:
            Registered model.
        """
        return self.runner.run(self._api.get_registered_model_by_params(name))

    def get_model_version(self, name: str, version: str) -> ModelVersion | None:
        """Get a model version.

        Args:
            name: Name of the model.
            version: Version of the model.

        Returns:
            Model version.

        Raises:
            StoreException: If the model does not exist.
        """
        if not (rm := self.get_registered_model(name)):
            msg = f"Model {name} does not exist"
            raise StoreError(msg)
        assert rm.id
        return self.runner.run(self._api.get_model_version_by_params(rm.id, version))

    def get_model_artifact(self, name: str, version: str) -> ModelArtifact | None:
        """Get a model artifact.

        Args:
            name: Name of the model.
            version: Version of the model.

        Returns:
            Model artifact.

        Raises:
            StoreException: If either the model or the version don't exist.
        """
        if not (mv := self.get_model_version(name, version)):
            msg = f"Version {version} does not exist"
            raise StoreError(msg)
        assert mv.id
        return self.runner.run(self._api.get_model_artifact_by_params(name, mv.id))

    def get_registered_models(self) -> Pager[RegisteredModel]:
        """Get a pager for registered models.

        Returns:
            Iterable pager for registered models.
        """

        def rm_list(options: ListOptions) -> list[RegisteredModel]:
            return self.runner.run(self._api.get_registered_models(options))

        return Pager[RegisteredModel](rm_list)

    def get_model_versions(self, name: str) -> Pager[ModelVersion]:
        """Get a pager for model versions.

        Args:
            name: Name of the model.

        Returns:
            Iterable pager for model versions.

        Raises:
            StoreException: If the model does not exist.
        """
        if not (rm := self.get_registered_model(name)):
            msg = f"Model {name} does not exist"
            raise StoreError(msg)

        def rm_versions(options: ListOptions) -> list[ModelVersion]:
            # type checkers can't restrict the type inside a nested function: https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions
            assert rm.id
            return self.runner.run(self._api.get_model_versions(rm.id, options))

        return Pager[ModelVersion](rm_versions)

So basically it just replacing async-runner implementation

@tarilabs
Copy link
Member

indeed feel free to raise as PR, that is always encouraged

@blublinsky
Copy link
Author

thanks @tarilabs

@blublinsky
Copy link
Author

the pr is here #802

@tarilabs
Copy link
Member

much appreciated @blublinsky thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants