From fb18bbf99911ca754c7d2c641e211a0f37c1cb8c Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Wed, 11 May 2022 11:04:01 -0700 Subject: [PATCH 1/7] Track kernels by tenant id If tenant id is specified in the kernel start request's body via tenant_id, or as a query parameter (?tenant_id) to GET /api/kernels, it will be used to associate kernels with a tenant. If no tenant ID is given in the request body or as a query parameter the UNIVERSAL_TENANT_ID will be used so that all managed kernels use the same logic. --- .../contributors/system-architecture.md | 20 +++++ docs/source/users/kernel-envs.md | 9 ++ enterprise_gateway/services/api/swagger.yaml | 15 +++- .../services/kernels/handlers.py | 77 ++++++++++------- .../services/kernels/remotemanager.py | 39 ++++++++- enterprise_gateway/tests/test_handlers.py | 85 +++++++++++++++++++ 6 files changed, 213 insertions(+), 32 deletions(-) diff --git a/docs/source/contributors/system-architecture.md b/docs/source/contributors/system-architecture.md index 85c6147a5..bb13078f7 100644 --- a/docs/source/contributors/system-architecture.md +++ b/docs/source/contributors/system-architecture.md @@ -491,3 +491,23 @@ Theoretically speaking, enabling a kernel for use in other frameworks amounts to ```{seealso} This topic is covered in the [Developers Guide](../developers/index.rst). ``` + +## Multiple Tenant Support + +Enterprise Gateway offers viably minimal support multi-tenant environments by tracking the managed kernels by their tenant _ID_. This is accomplished on the client request when starting a kernel by adding a UUID-formatted string to the kernel start request's body with an associated key of `tenant_id`. + +```JSON +{"tenant_id":"f730794d-d175-40fa-b819-2a67d5308210"} +``` + +Likewise, when calling the `/api/kernels` endpoint to get the list of active kernels, tenant-aware applications should add a `tenant_id` query parameter in order to get appropriate managed kernel information. + +``` +GET /api/kernels?tenant_id=f730794d-d175-40fa-b819-2a67d5308210 +``` + +Kernel start or list requests that do not include a `tenant_id` will have their kernels associated with the `UNIVERSAL_TENANT_ID` which merely acts a catch-all and allows common code usage relative to existing clients. + +Enterprise Gateway will add the environment variable `KERNEL_TENANT_ID` to the kernel's environment so that this value is available to the kernel's launch logic and the kernel itself. It should be noted that if the original request also included a `KERNEL_TENANT_ID` in the body's `env` stanza, it will be overwritten with the value corresponding to `tenant_id` (or `UNIVERSAL_TENANT_ID` if `tenant_id` was not provided). + +Kernel specifications and other resources do not currently adhere to tenant-based _partitioning_. diff --git a/docs/source/users/kernel-envs.md b/docs/source/users/kernel-envs.md index e49817d94..c7bcc6d8f 100644 --- a/docs/source/users/kernel-envs.md +++ b/docs/source/users/kernel-envs.md @@ -88,6 +88,15 @@ There are several supported `KERNEL_` variables that the Enterprise Gateway serv identified by EG_KERNEL_CLUSTER_ROLE. If not provided, it will be derived from EG_DEFAULT_KERNEL_SERVICE_ACCOUNT_NAME. + KERNEL_TENANT_ID= + Indicates the tenant ID (UUID string) corresponding to the kernel. This value + is derived from the optional `tenant_id` provided by the client application and + is meant to represent the entity or organization in which the client application + is associated. If `tenant_id` is not provided on the kernel start request, then + `KERNEL_TENANT_ID` will hold a value of ``"27182818-2845-9045-2353-602874713527"` + (the `UNIVERSAL_TENANT_ID`), which provides for backwards compatible support for + older applications. + KERNEL_UID= or 1000 Containers only. This value represents the user id in which the container will run. The default value is 1000 representing the jovyan user - which is how all kernel images diff --git a/enterprise_gateway/services/api/swagger.yaml b/enterprise_gateway/services/api/swagger.yaml index 5e5ea1c05..da4a9cb1a 100644 --- a/enterprise_gateway/services/api/swagger.yaml +++ b/enterprise_gateway/services/api/swagger.yaml @@ -112,6 +112,13 @@ paths: summary: List the JSON data for all currently running kernels tags: - kernels + parameters: + - name: tenant_id + in: query + description: When present, the list of running kernels will apply to the given tenant ID. + required: false + type: string + format: uuid responses: 200: description: List of running kernels @@ -137,11 +144,17 @@ paths: name: type: string description: Kernel spec name (defaults to default kernel spec for server) + tenant_id: + type: string + format: uuid + description: | + The (optional) UUID of the tenant making the request. The list of active + (running) kernels will be filtered by this value. env: type: object description: | A dictionary of environment variables and values to include in the - kernel process - subject to whitelisting. + kernel process - subject to filtering. additionalProperties: type: string responses: diff --git a/enterprise_gateway/services/kernels/handlers.py b/enterprise_gateway/services/kernels/handlers.py index 35a07ec8a..b0aa28302 100644 --- a/enterprise_gateway/services/kernels/handlers.py +++ b/enterprise_gateway/services/kernels/handlers.py @@ -8,9 +8,11 @@ import jupyter_server.services.kernels.handlers as jupyter_server_handlers import tornado from jupyter_client.jsonutil import date_default +from jupyter_server.utils import ensure_async from tornado import web from ...mixins import CORSMixin, JSONErrorsMixin, TokenAuthorizationMixin +from ..kernels.remotemanager import UNIVERSAL_TENANT_ID class MainKernelHandler( @@ -45,35 +47,41 @@ async def post(self): if len(kernels) >= max_kernels: raise tornado.web.HTTPError(403, "Resource Limit") - # Try to get env vars from the request body + tenant_id = UNIVERSAL_TENANT_ID + env = {} + # Try to get tenant_id and env vars from the request body model = self.get_json_body() - if model is not None and "env" in model: - if not isinstance(model["env"], dict): - raise tornado.web.HTTPError(400) - # Start with the PATH from the current env. Do not provide the entire environment - # which might contain server secrets that should not be passed to kernels. - env = {"PATH": os.getenv("PATH", "")} - # Whitelist environment variables from current process environment - env.update( - { - key: value - for key, value in os.environ.items() - if key in self.env_process_whitelist - } - ) - # Whitelist KERNEL_* args and those allowed by configuration from client. If all - # envs are requested, just use the keys from the payload. - env_whitelist = self.env_whitelist - if env_whitelist == ["*"]: - env_whitelist = model["env"].keys() - env.update( - { - key: value - for key, value in model["env"].items() - if key.startswith("KERNEL_") or key in env_whitelist - } - ) + if model is not None: + tenant_id = model.get("tenant_id", UNIVERSAL_TENANT_ID) + if "env" in model: + if not isinstance(model["env"], dict): + raise tornado.web.HTTPError(400) + # Start with the PATH from the current env. Do not provide the entire environment + # which might contain server secrets that should not be passed to kernels. + env = {"PATH": os.getenv("PATH", "")} + # Whitelist environment variables from current process environment + env.update( + { + key: value + for key, value in os.environ.items() + if key in self.env_process_whitelist + } + ) + # Whitelist KERNEL_* args and those allowed by configuration from client. If all + # envs are requested, just use the keys from the payload. + env_whitelist = self.env_whitelist + if env_whitelist == ["*"]: + env_whitelist = model["env"].keys() + env.update( + { + key: value + for key, value in model["env"].items() + if key.startswith("KERNEL_") or key in env_whitelist + } + ) + # Set KERNEL_TENANT_ID. If already present, we override with the value in the body + env["KERNEL_TENANT_ID"] = tenant_id # If kernel_headers are configured, fetch each of those and include in start request kernel_headers = {} missing_headers = [] @@ -97,7 +105,10 @@ async def post(self): # so do a temporary partial (ugh) orig_start = self.kernel_manager.start_kernel self.kernel_manager.start_kernel = partial( - self.kernel_manager.start_kernel, env=env, kernel_headers=kernel_headers + self.kernel_manager.start_kernel, + env=env, + kernel_headers=kernel_headers, + tenant_id=tenant_id, ) try: await super().post() @@ -117,10 +128,16 @@ async def get(self): tornado.web.HTTPError 403 Forbidden if kernel listing is disabled """ - if not self.settings.get("eg_list_kernels"): + + tenant_id_filter = self.request.query_arguments.get("tenant_id") or UNIVERSAL_TENANT_ID + if isinstance(tenant_id_filter, list): + tenant_id_filter = tenant_id_filter[0].decode("utf-8") + if not self.settings.get("eg_list_kernels") and tenant_id_filter == UNIVERSAL_TENANT_ID: raise tornado.web.HTTPError(403, "Forbidden") else: - await super().get() + km = self.kernel_manager + kernels = await ensure_async(km.list_kernels(tenant_id=tenant_id_filter)) + self.finish(json.dumps(kernels, default=date_default)) def options(self, **kwargs): """Method for properly handling CORS pre-flight""" diff --git a/enterprise_gateway/services/kernels/remotemanager.py b/enterprise_gateway/services/kernels/remotemanager.py index 5a3e38dfe..2408df0ed 100644 --- a/enterprise_gateway/services/kernels/remotemanager.py +++ b/enterprise_gateway/services/kernels/remotemanager.py @@ -6,6 +6,7 @@ import re import signal import uuid +from typing import Dict, List, Optional from jupyter_client.ioloop.manager import AsyncIOLoopKernelManager from jupyter_server.services.kernels.kernelmanager import AsyncMappingKernelManager @@ -18,6 +19,8 @@ from ..processproxies.processproxy import LocalProcessProxy, RemoteProcessProxy from ..sessions.kernelsessionmanager import KernelSessionManager +UNIVERSAL_TENANT_ID: str = "27182818-2845-9045-2353-602874713527" + def import_item(name): """Import and return ``bar`` given the string ``foo.bar``. @@ -148,6 +151,9 @@ class RemoteMappingKernelManager(AsyncMappingKernelManager): pending_requests = TrackPendingRequests() # Used to enforce max-kernel limits + _tenant_to_kernels: Dict[str, List[str]] = {} # Maps a tenant_id to its kernel ids + _kernel_to_tenant: Dict[str, str] = {} # Maps a kernel_id to its tenant_id + def _kernel_manager_class_default(self): return "enterprise_gateway.services.kernels.remotemanager.RemoteKernelManager" @@ -177,18 +183,37 @@ async def start_kernel(self, *args, **kwargs): kernel_name=kwargs["kernel_name"], username=username ) ) - # Check max kernel limits self._enforce_kernel_limits(username) RemoteMappingKernelManager.pending_requests.increment(username) + tenant_id = kwargs.pop( + "tenant_id", UNIVERSAL_TENANT_ID + ) # Remove tenant_id from forwarded kwargs try: kernel_id = await super().start_kernel(*args, **kwargs) finally: RemoteMappingKernelManager.pending_requests.decrement(username) self.parent.kernel_session_manager.create_session(kernel_id, **kwargs) + + # Map tenant_id to kernel, and vice versa + kernels_for_tenant = self._tenant_to_kernels.get(tenant_id, []) + kernels_for_tenant.append(kernel_id) + self._tenant_to_kernels[tenant_id] = kernels_for_tenant + self._kernel_to_tenant[kernel_id] = tenant_id + return kernel_id + async def shutdown_kernel(self, kernel_id, now=False, restart=False): + await super().shutdown_kernel(kernel_id, now=now, restart=restart) + # Remove this kernel's association from its tenant + tenant_id = self._kernel_to_tenant.get(kernel_id) + if tenant_id: + kernels = self._tenant_to_kernels.get(tenant_id) + if kernel_id in kernels: + kernels.remove(kernel_id) + self._kernel_to_tenant.pop(kernel_id) + def _enforce_kernel_limits(self, username: str) -> None: """ If MaxKernels or MaxKernelsPerUser are configured, enforce the respective values. @@ -241,6 +266,18 @@ def remove_kernel(self, kernel_id): super().remove_kernel(kernel_id) self.parent.kernel_session_manager.delete_session(kernel_id) + def list_kernels(self, tenant_id: Optional[str] = UNIVERSAL_TENANT_ID): + """Returns a list of kernel_id's of kernels running.""" + kernels = [] + kernel_ids = self._tenant_to_kernels.get(tenant_id) or [] + for kernel_id in kernel_ids: + try: + model = self.kernel_model(kernel_id) + kernels.append(model) + except (web.HTTPError, KeyError): + pass # Probably due to a (now) non-existent kernel, continue building the list + return kernels + def start_kernel_from_session( self, kernel_id, kernel_name, connection_info, process_info, launch_args ): diff --git a/enterprise_gateway/tests/test_handlers.py b/enterprise_gateway/tests/test_handlers.py index 9342b37a5..75b585b92 100644 --- a/enterprise_gateway/tests/test_handlers.py +++ b/enterprise_gateway/tests/test_handlers.py @@ -11,6 +11,7 @@ from tornado.testing import gen_test from tornado.websocket import websocket_connect +from ..services.kernels.remotemanager import UNIVERSAL_TENANT_ID from .test_gatewayapp import RESOURCES, TestGatewayAppBase @@ -344,6 +345,90 @@ def test_get_kernels(self): self.assertEqual(len(kernels), 1) self.assertEqual(kernels[0]["id"], kernel["id"]) + @gen_test + def test_get_tenant_kernels(self): + """Server should respond with running kernel information.""" + my_tenant_id = "6e22b538-cbe2-42b6-8598-e5c58be997b1" + other_tenant_id = "1529760d-945c-48fa-a45f-15064883e3bb" + response = yield self.http_client.fetch( + self.get_url(f"/api/kernels?tenant_id={my_tenant_id}") + ) + self.assertEqual(response.code, 200) + kernels = json_decode(response.body) + self.assertEqual(len(kernels), 0) + + # Launch a kernel for this tenant + response = yield self.http_client.fetch( + self.get_url("/api/kernels"), method="POST", body=f'{{"tenant_id":"{my_tenant_id}"}}' + ) + self.assertEqual(response.code, 201) + my_kernel = json_decode(response.body) + + # Launch a kernel for a different tenant + response = yield self.http_client.fetch( + self.get_url("/api/kernels"), method="POST", body=f'{{"tenant_id":"{other_tenant_id}"}}' + ) + self.assertEqual(response.code, 201) + other_kernel = json_decode(response.body) + + # Launch another kernel for this tenant + response = yield self.http_client.fetch( + self.get_url("/api/kernels"), method="POST", body=f'{{"tenant_id":"{my_tenant_id}"}}' + ) + self.assertEqual(response.code, 201) + my_kernel2 = json_decode(response.body) + + # Check the list again for our tenant + response = yield self.http_client.fetch( + self.get_url(f"/api/kernels?tenant_id={my_tenant_id}") + ) + self.assertEqual(response.code, 200) + my_kernels = json_decode(response.body) + self.assertEqual(len(my_kernels), 2) + self.assertTrue(my_kernels[0]["id"] in [my_kernel["id"], my_kernel2["id"]]) + self.assertTrue(my_kernels[1]["id"] in [my_kernel["id"], my_kernel2["id"]]) + + # Check the list for the other tenant + response = yield self.http_client.fetch( + self.get_url(f"/api/kernels?tenant_id={other_tenant_id}") + ) + self.assertEqual(response.code, 200) + other_kernels = json_decode(response.body) + self.assertEqual(len(other_kernels), 1) + self.assertEqual(other_kernels[0]["id"], other_kernel["id"]) + + # Delete the other tenant's kernel and ensure its not listed + response = yield self.http_client.fetch( + self.get_url(f"/api/kernels/{other_kernel['id']}"), method="DELETE" + ) + self.assertEqual(response.code, 204) + response = yield self.http_client.fetch( + self.get_url(f"/api/kernels?tenant_id={other_tenant_id}") + ) + self.assertEqual(response.code, 200) + other_kernels = json_decode(response.body) + self.assertEqual(len(other_kernels), 0) + + # Delete one of my tenant's kernels and ensure only the other is still listed + response = yield self.http_client.fetch( + self.get_url(f"/api/kernels/{ my_kernel['id']}"), method="DELETE" + ) + self.assertEqual(response.code, 204) + response = yield self.http_client.fetch( + self.get_url(f"/api/kernels?tenant_id={my_tenant_id}") + ) + self.assertEqual(response.code, 200) + my_kernels = json_decode(response.body) + self.assertEqual(len(my_kernels), 1) + self.assertEqual(my_kernels[0]["id"], my_kernel2["id"]) + + # Check the list using the default (Universal) tenant_id. Since we didn't allow kernel lists, + # we expect a 403 + response = yield self.http_client.fetch( + self.get_url(f"/api/kernels?tenant_id={UNIVERSAL_TENANT_ID}"), raise_error=False + ) + self.assertEqual(response.code, 403) + @gen_test def test_kernel_comm(self): """Default kernel should launch and accept commands.""" From ab466e0ffd041cddc7ec509e03c3649ece63e141 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Wed, 25 May 2022 15:26:05 -0700 Subject: [PATCH 2/7] Refactor to use class, include tenant-id in session persistence --- .../services/kernels/handlers.py | 1 - .../services/kernels/remotemanager.py | 102 ++++++++++++------ .../services/sessions/kernelsessionmanager.py | 2 + 3 files changed, 74 insertions(+), 31 deletions(-) diff --git a/enterprise_gateway/services/kernels/handlers.py b/enterprise_gateway/services/kernels/handlers.py index b0aa28302..178925e3f 100644 --- a/enterprise_gateway/services/kernels/handlers.py +++ b/enterprise_gateway/services/kernels/handlers.py @@ -47,7 +47,6 @@ async def post(self): if len(kernels) >= max_kernels: raise tornado.web.HTTPError(403, "Resource Limit") - tenant_id = UNIVERSAL_TENANT_ID env = {} # Try to get tenant_id and env vars from the request body model = self.get_json_body() diff --git a/enterprise_gateway/services/kernels/remotemanager.py b/enterprise_gateway/services/kernels/remotemanager.py index 2408df0ed..bb55bc837 100644 --- a/enterprise_gateway/services/kernels/remotemanager.py +++ b/enterprise_gateway/services/kernels/remotemanager.py @@ -121,14 +121,19 @@ def new_kernel_id(**kwargs): class TrackPendingRequests: """ - Simple class to track (increment/decrement) pending kernel start requests, both total and per user. + Simple class to track (increment/decrement) pending kernel start requests, both total and per user. + This tracking is necessary due to an inherent race condition that occurs now that kernel startup is asynchronous. As a result, multiple/simultaneous requests must be considered, in addition all existing kernel sessions. + + This class instance is essentially a singleton in that its single instance is contained within + the single instance of RemoteMappingKernelManager. """ - _pending_requests_all = 0 - _pending_requests_user = {} + def __init__(self): + self._pending_requests_all = 0 + self._pending_requests_user = {} def increment(self, username: str) -> None: self._pending_requests_all += 1 @@ -144,15 +149,61 @@ def get_counts(self, username): return self._pending_requests_all, int(self._pending_requests_user.get(username, 0)) +class TenantToKernels: + """ + Simple class to manage the association of a kernel to its tenant and vice versa. + + This is necessary to accommodate installation wishing to use Enterprise Gateway as a multi-tenant + kernel service and allows things like "list kernels" to only return the kernel information + associated with the requesting tenant. + + Requests that do not specify a tenant id will use the UNIVERSAL_TENANT_ID so that all + logic goes through the same path. + + This class instance is essentially a singleton in that its single instance is contained within + the single instance of RemoteMappingKernelManager. + """ + + def __init__(self): + self._tenant_to_kernels: Dict[str, List[str]] = {} # Maps a tenant_id to its kernel ids + self._kernel_to_tenant: Dict[str, str] = {} # Maps a kernel_id to its tenant_id + + def add(self, tenant_id: str, kernel_id: str) -> None: + """Adds a mapping between the given tenant_id and kernel_id.""" + + kernels_for_tenant = self._tenant_to_kernels.get(tenant_id, []) + # We could already have the kernel_id if this call is made on + # behalf of refreshing a persisted session. + if kernel_id not in kernels_for_tenant: + kernels_for_tenant.append(kernel_id) + self._tenant_to_kernels[tenant_id] = kernels_for_tenant + self._kernel_to_tenant[kernel_id] = tenant_id + + def remove_kernel_id(self, kernel_id: str) -> None: + """Removes kernel's association from its tenant.""" + tenant_id = self._kernel_to_tenant.get(kernel_id) + if tenant_id: + kernels = self._tenant_to_kernels.get(tenant_id) + if kernel_id in kernels: + kernels.remove(kernel_id) + self._kernel_to_tenant.pop(kernel_id) + + def get_kernel_ids(self, tenant_id: str) -> List: + """Returns a list of kernel ids associated with this tenant.""" + return self._tenant_to_kernels.get(tenant_id) or [] + + def get_tenant_id(self, kernel_id: str) -> str: + """Returns the tenant_id corresponding to this kernel_id.""" + return self._kernel_to_tenant.get(kernel_id) or UNIVERSAL_TENANT_ID + + class RemoteMappingKernelManager(AsyncMappingKernelManager): """ Extends the AsyncMappingKernelManager with support for managing remote kernels via the process-proxy. """ pending_requests = TrackPendingRequests() # Used to enforce max-kernel limits - - _tenant_to_kernels: Dict[str, List[str]] = {} # Maps a tenant_id to its kernel ids - _kernel_to_tenant: Dict[str, str] = {} # Maps a kernel_id to its tenant_id + tenant_kernels = TenantToKernels() # Manage kernel's association to its tenant def _kernel_manager_class_default(self): return "enterprise_gateway.services.kernels.remotemanager.RemoteKernelManager" @@ -195,24 +246,12 @@ async def start_kernel(self, *args, **kwargs): finally: RemoteMappingKernelManager.pending_requests.decrement(username) self.parent.kernel_session_manager.create_session(kernel_id, **kwargs) - - # Map tenant_id to kernel, and vice versa - kernels_for_tenant = self._tenant_to_kernels.get(tenant_id, []) - kernels_for_tenant.append(kernel_id) - self._tenant_to_kernels[tenant_id] = kernels_for_tenant - self._kernel_to_tenant[kernel_id] = tenant_id - + self.tenant_kernels.add(tenant_id, kernel_id) return kernel_id async def shutdown_kernel(self, kernel_id, now=False, restart=False): await super().shutdown_kernel(kernel_id, now=now, restart=restart) - # Remove this kernel's association from its tenant - tenant_id = self._kernel_to_tenant.get(kernel_id) - if tenant_id: - kernels = self._tenant_to_kernels.get(tenant_id) - if kernel_id in kernels: - kernels.remove(kernel_id) - self._kernel_to_tenant.pop(kernel_id) + self.tenant_kernels.remove_kernel_id(kernel_id) def _enforce_kernel_limits(self, username: str) -> None: """ @@ -223,7 +262,6 @@ def _enforce_kernel_limits(self, username: str) -> None: pending_all, pending_user = RemoteMappingKernelManager.pending_requests.get_counts( username ) - # Enforce overall limit... if self.parent.max_kernels is not None: active_and_pending = len(self.list_kernels()) + pending_all @@ -269,7 +307,7 @@ def remove_kernel(self, kernel_id): def list_kernels(self, tenant_id: Optional[str] = UNIVERSAL_TENANT_ID): """Returns a list of kernel_id's of kernels running.""" kernels = [] - kernel_ids = self._tenant_to_kernels.get(tenant_id) or [] + kernel_ids = self.tenant_kernels.get_kernel_ids(tenant_id) for kernel_id in kernel_ids: try: model = self.kernel_model(kernel_id) @@ -279,7 +317,7 @@ def list_kernels(self, tenant_id: Optional[str] = UNIVERSAL_TENANT_ID): return kernels def start_kernel_from_session( - self, kernel_id, kernel_name, connection_info, process_info, launch_args + self, kernel_id, kernel_name, connection_info, process_info, launch_args, tenant_id ): """ Starts a kernel from a persisted kernel session. @@ -303,6 +341,8 @@ def start_kernel_from_session( from persistent storage launch_args : dict The arguments used for the initial launch of the kernel + tenant_id : str + The tenant_id corresponding to this kernel Returns ------- True if kernel could be located and started, False otherwise. @@ -352,6 +392,8 @@ def start_kernel_from_session( func = getattr(self, "initialize_culler", None) if func: func() + # Make sure we've recorded the tenant/kernel association + self.tenant_kernels.add(tenant_id, kernel_id) return True def new_kernel_id(self, **kwargs): @@ -381,8 +423,8 @@ def __init__(self, **kwargs): self.restarting = False # need to track whether we're in a restart situation or not # If this instance supports port caching, then disable cache_ports since we don't need this - # for remote kernels and it breaks the ability to support port ranges for local kernels (which - # is viewed as more imporant for EG). + # for remote kernels, and it breaks the ability to support port ranges for local kernels (which + # is viewed as more important for EG). # Note: This check MUST remain in this method since cache_ports is used immediately # following construction. if hasattr(self, "cache_ports"): @@ -476,7 +518,7 @@ def format_kernel_cmd(self, extra_arguments=None): if self.kernel_id: ns["kernel_id"] = self.kernel_id - pat = re.compile(r"\{([A-Za-z0-9_]+)\}") + pat = re.compile(r"{([A-Za-z0-9_]+)}") def from_ns(match): """Get the key out of ns if it's there, otherwise no change.""" @@ -519,7 +561,7 @@ def request_shutdown(self, restart=False): super().request_shutdown(restart) # If we're using a remote proxy, we need to send the launcher indication that we're - # shutting down so it can exit its listener thread, if its using one. + # shutting down so it can exit its listener thread, if it's using one. if isinstance(self.process_proxy, RemoteProcessProxy): self.process_proxy.shutdown_listener() @@ -620,7 +662,7 @@ def cleanup(self, connection_file=True): # remains here for pre-6.2.0 jupyter_client installations. # Note we must use `process_proxy` here rather than `kernel`, although they're the same value. - # The reason is because if the kernel shutdown sequence has triggered its "forced kill" logic + # The reason is that if the kernel shutdown sequence has triggered its "forced kill" logic # then that method (jupyter_client/manager.py/_kill_kernel()) will set `self.kernel` to None, # which then prevents process proxy cleanup. if self.process_proxy: @@ -637,7 +679,7 @@ def cleanup_resources(self, restart=False): # will not be called until jupyter_client 6.2.0 has been released. # Note we must use `process_proxy` here rather than `kernel`, although they're the same value. - # The reason is because if the kernel shutdown sequence has triggered its "forced kill" logic + # The reason is that if the kernel shutdown sequence has triggered its "forced kill" logic # then that method (jupyter_client/manager.py/_kill_kernel()) will set `self.kernel` to None, # which then prevents process proxy cleanup. if self.process_proxy: @@ -670,7 +712,7 @@ def write_connection_file(self): def _get_process_proxy(self): """ - Reads the associated kernelspec and to see if has a process proxy stanza. + Reads the associated kernelspec and to see if it has a process proxy stanza. If one exists, it instantiates an instance. If a process proxy is not specified in the kernelspec, a LocalProcessProxy stanza is fabricated and instantiated. diff --git a/enterprise_gateway/services/sessions/kernelsessionmanager.py b/enterprise_gateway/services/sessions/kernelsessionmanager.py index 31ebd5823..419ce4f30 100644 --- a/enterprise_gateway/services/sessions/kernelsessionmanager.py +++ b/enterprise_gateway/services/sessions/kernelsessionmanager.py @@ -93,6 +93,7 @@ def create_session(self, kernel_id, **kwargs): # Compose the kernel_session entry kernel_session = dict() kernel_session["kernel_id"] = kernel_id + kernel_session["tenant_id"] = self.kernel_manager.tenant_kernels.get_tenant_id(kernel_id) kernel_session["username"] = KernelSessionManager.get_kernel_username(**kwargs) kernel_session["kernel_name"] = km.kernel_name @@ -181,6 +182,7 @@ def _start_session(self, kernel_session): connection_info=kernel_session["connection_info"], process_info=kernel_session["process_info"], launch_args=kernel_session["launch_args"], + tenant_id=kernel_session["tenant_id"], ) if not kernel_started: return False From fd5b270cefd65233ddd543352bde3f18d4860b7b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 26 May 2022 01:46:37 +0000 Subject: [PATCH 3/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- enterprise_gateway/services/kernels/remotemanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enterprise_gateway/services/kernels/remotemanager.py b/enterprise_gateway/services/kernels/remotemanager.py index d14043db4..cd5a8fcf1 100644 --- a/enterprise_gateway/services/kernels/remotemanager.py +++ b/enterprise_gateway/services/kernels/remotemanager.py @@ -6,8 +6,8 @@ import re import signal import time -from typing import Dict, List, Optional import uuid +from typing import Dict, List, Optional from jupyter_client.ioloop.manager import AsyncIOLoopKernelManager from jupyter_server.services.kernels.kernelmanager import AsyncMappingKernelManager From 13120e439d02a399d754614151219c60d15b0500 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Fri, 10 Jun 2022 10:22:12 -0700 Subject: [PATCH 4/7] Convey tenant_id via env and other review-based changes --- .../contributors/system-architecture.md | 8 +-- .../services/kernels/handlers.py | 19 ++--- .../services/kernels/remotemanager.py | 72 ++++--------------- .../services/sessions/kernelsessionmanager.py | 2 +- 4 files changed, 27 insertions(+), 74 deletions(-) diff --git a/docs/source/contributors/system-architecture.md b/docs/source/contributors/system-architecture.md index bb13078f7..ec6fb9b9d 100644 --- a/docs/source/contributors/system-architecture.md +++ b/docs/source/contributors/system-architecture.md @@ -494,10 +494,10 @@ This topic is covered in the [Developers Guide](../developers/index.rst). ## Multiple Tenant Support -Enterprise Gateway offers viably minimal support multi-tenant environments by tracking the managed kernels by their tenant _ID_. This is accomplished on the client request when starting a kernel by adding a UUID-formatted string to the kernel start request's body with an associated key of `tenant_id`. +Enterprise Gateway offers viably minimal support multi-tenant environments by tracking the managed kernels by their tenant _ID_. This is accomplished on the client request when starting a kernel by adding a UUID-formatted string to the kernel start request's `env` stanza with an associated key of `JUPYTER_GATEWAY_TENANT_ID`. ```JSON -{"tenant_id":"f730794d-d175-40fa-b819-2a67d5308210"} +{"env": {"JUPYTER_GATEWAY_TENANT_ID": "f730794d-d175-40fa-b819-2a67d5308210"}} ``` Likewise, when calling the `/api/kernels` endpoint to get the list of active kernels, tenant-aware applications should add a `tenant_id` query parameter in order to get appropriate managed kernel information. @@ -506,8 +506,8 @@ Likewise, when calling the `/api/kernels` endpoint to get the list of active ker GET /api/kernels?tenant_id=f730794d-d175-40fa-b819-2a67d5308210 ``` -Kernel start or list requests that do not include a `tenant_id` will have their kernels associated with the `UNIVERSAL_TENANT_ID` which merely acts a catch-all and allows common code usage relative to existing clients. +Kernel start requests that do not include a `tenant_id` will have their kernels associated with the `UNIVERSAL_TENANT_ID` which merely acts a catch-all and allows common code usage relative to existing clients. -Enterprise Gateway will add the environment variable `KERNEL_TENANT_ID` to the kernel's environment so that this value is available to the kernel's launch logic and the kernel itself. It should be noted that if the original request also included a `KERNEL_TENANT_ID` in the body's `env` stanza, it will be overwritten with the value corresponding to `tenant_id` (or `UNIVERSAL_TENANT_ID` if `tenant_id` was not provided). +Enterprise Gateway will add the environment variable `KERNEL_TENANT_ID` to the kernel's _launch_ environment so that this value is available to the kernel's launch framework. This value is currently not available to the kernel itself. It should be noted that if the original request also included a `KERNEL_TENANT_ID` in the body's `env` stanza, it will be overwritten with the value corresponding to `tenant_id` (or `UNIVERSAL_TENANT_ID` if `tenant_id` was not provided). Kernel specifications and other resources do not currently adhere to tenant-based _partitioning_. diff --git a/enterprise_gateway/services/kernels/handlers.py b/enterprise_gateway/services/kernels/handlers.py index 178925e3f..ab2681a56 100644 --- a/enterprise_gateway/services/kernels/handlers.py +++ b/enterprise_gateway/services/kernels/handlers.py @@ -51,7 +51,7 @@ async def post(self): # Try to get tenant_id and env vars from the request body model = self.get_json_body() if model is not None: - tenant_id = model.get("tenant_id", UNIVERSAL_TENANT_ID) + tenant_id = UNIVERSAL_TENANT_ID if "env" in model: if not isinstance(model["env"], dict): raise tornado.web.HTTPError(400) @@ -78,6 +78,8 @@ async def post(self): if key.startswith("KERNEL_") or key in env_whitelist } ) + # Pull client-provided tenant_id from env, defaulting to UNIVERSAL_TENANT_ID. + tenant_id = model["env"].get('JUPYTER_GATEWAY_TENANT_ID', tenant_id) # Set KERNEL_TENANT_ID. If already present, we override with the value in the body env["KERNEL_TENANT_ID"] = tenant_id @@ -107,7 +109,6 @@ async def post(self): self.kernel_manager.start_kernel, env=env, kernel_headers=kernel_headers, - tenant_id=tenant_id, ) try: await super().post() @@ -127,16 +128,16 @@ async def get(self): tornado.web.HTTPError 403 Forbidden if kernel listing is disabled """ + if not self.settings.get("eg_list_kernels"): + raise tornado.web.HTTPError(403, "Forbidden") - tenant_id_filter = self.request.query_arguments.get("tenant_id") or UNIVERSAL_TENANT_ID + tenant_id_filter = self.request.query_arguments.get("tenant_id") if isinstance(tenant_id_filter, list): tenant_id_filter = tenant_id_filter[0].decode("utf-8") - if not self.settings.get("eg_list_kernels") and tenant_id_filter == UNIVERSAL_TENANT_ID: - raise tornado.web.HTTPError(403, "Forbidden") - else: - km = self.kernel_manager - kernels = await ensure_async(km.list_kernels(tenant_id=tenant_id_filter)) - self.finish(json.dumps(kernels, default=date_default)) + + km = self.kernel_manager + kernels = await ensure_async(km.list_kernels(tenant_id=tenant_id_filter)) + self.finish(json.dumps(kernels, default=date_default)) def options(self, **kwargs): """Method for properly handling CORS pre-flight""" diff --git a/enterprise_gateway/services/kernels/remotemanager.py b/enterprise_gateway/services/kernels/remotemanager.py index d14043db4..37b2df53c 100644 --- a/enterprise_gateway/services/kernels/remotemanager.py +++ b/enterprise_gateway/services/kernels/remotemanager.py @@ -153,61 +153,12 @@ def get_counts(self, username): return self._pending_requests_all, int(self._pending_requests_user.get(username, 0)) -class TenantToKernels: - """ - Simple class to manage the association of a kernel to its tenant and vice versa. - - This is necessary to accommodate installation wishing to use Enterprise Gateway as a multi-tenant - kernel service and allows things like "list kernels" to only return the kernel information - associated with the requesting tenant. - - Requests that do not specify a tenant id will use the UNIVERSAL_TENANT_ID so that all - logic goes through the same path. - - This class instance is essentially a singleton in that its single instance is contained within - the single instance of RemoteMappingKernelManager. - """ - - def __init__(self): - self._tenant_to_kernels: Dict[str, List[str]] = {} # Maps a tenant_id to its kernel ids - self._kernel_to_tenant: Dict[str, str] = {} # Maps a kernel_id to its tenant_id - - def add(self, tenant_id: str, kernel_id: str) -> None: - """Adds a mapping between the given tenant_id and kernel_id.""" - - kernels_for_tenant = self._tenant_to_kernels.get(tenant_id, []) - # We could already have the kernel_id if this call is made on - # behalf of refreshing a persisted session. - if kernel_id not in kernels_for_tenant: - kernels_for_tenant.append(kernel_id) - self._tenant_to_kernels[tenant_id] = kernels_for_tenant - self._kernel_to_tenant[kernel_id] = tenant_id - - def remove_kernel_id(self, kernel_id: str) -> None: - """Removes kernel's association from its tenant.""" - tenant_id = self._kernel_to_tenant.get(kernel_id) - if tenant_id: - kernels = self._tenant_to_kernels.get(tenant_id) - if kernel_id in kernels: - kernels.remove(kernel_id) - self._kernel_to_tenant.pop(kernel_id) - - def get_kernel_ids(self, tenant_id: str) -> List: - """Returns a list of kernel ids associated with this tenant.""" - return self._tenant_to_kernels.get(tenant_id) or [] - - def get_tenant_id(self, kernel_id: str) -> str: - """Returns the tenant_id corresponding to this kernel_id.""" - return self._kernel_to_tenant.get(kernel_id) or UNIVERSAL_TENANT_ID - - class RemoteMappingKernelManager(AsyncMappingKernelManager): """ Extends the AsyncMappingKernelManager with support for managing remote kernels via the process-proxy. """ pending_requests = TrackPendingRequests() # Used to enforce max-kernel limits - tenant_kernels = TenantToKernels() # Manage kernel's association to its tenant def _kernel_manager_class_default(self): return "enterprise_gateway.services.kernels.remotemanager.RemoteKernelManager" @@ -242,15 +193,11 @@ async def start_kernel(self, *args, **kwargs): self._enforce_kernel_limits(username) RemoteMappingKernelManager.pending_requests.increment(username) - tenant_id = kwargs.pop( - "tenant_id", UNIVERSAL_TENANT_ID - ) # Remove tenant_id from forwarded kwargs try: kernel_id = await super().start_kernel(*args, **kwargs) finally: RemoteMappingKernelManager.pending_requests.decrement(username) self.parent.kernel_session_manager.create_session(kernel_id, **kwargs) - self.tenant_kernels.add(tenant_id, kernel_id) return kernel_id async def restart_kernel(self, kernel_id): @@ -274,7 +221,6 @@ async def shutdown_kernel(self, kernel_id, now=False, restart=False): except KeyError as ke: # this is hit for multiple shutdown request. self.log.exception(f"Exception while shutting down kernel: '{kernel_id}': {ke}") raise web.HTTPError(404, "Kernel does not exist: %s" % kernel_id) - self.tenant_kernels.remove_kernel_id(kernel_id) async def wait_for_restart_finish(self, kernel_id, action="shutdown"): kernel = self.get_kernel(kernel_id) @@ -346,12 +292,14 @@ def remove_kernel(self, kernel_id): super().remove_kernel(kernel_id) self.parent.kernel_session_manager.delete_session(kernel_id) - def list_kernels(self, tenant_id: Optional[str] = UNIVERSAL_TENANT_ID): - """Returns a list of kernel_id's of kernels running.""" + def list_kernels(self, tenant_id: Optional[str] = None): + """Returns a list of kernel_id's of kernels running, filtering on tenant_id if provided.""" kernels = [] - kernel_ids = self.tenant_kernels.get_kernel_ids(tenant_id) - for kernel_id in kernel_ids: + for kernel_id, kernel_manager in self._kernels.items(): try: + # If requested, discard non-matching tenant_id kernels + if tenant_id and tenant_id != kernel_manager.tenant_id: + continue model = self.kernel_model(kernel_id) kernels.append(model) except (web.HTTPError, KeyError): @@ -403,6 +351,8 @@ def start_kernel_from_session( kernel_name=kernel_name, **constructor_kwargs, ) + # Set the persisted tenant_id + km.tenant_id = tenant_id # Load connection info into member vars - no need to write out connection file km.load_connection_info(connection_info) @@ -434,8 +384,6 @@ def start_kernel_from_session( func = getattr(self, "initialize_culler", None) if func: func() - # Make sure we've recorded the tenant/kernel association - self.tenant_kernels.add(tenant_id, kernel_id) return True def new_kernel_id(self, **kwargs): @@ -461,6 +409,7 @@ def __init__(self, **kwargs): self.public_key = None self.sigint_value = None self.kernel_id = None + self.tenant_id = None self.user_overrides = {} self.kernel_launch_timeout = default_kernel_launch_timeout self.restarting = False # need to track whether we're in a restart situation or not @@ -534,6 +483,9 @@ def _capture_user_overrides(self, **kwargs): of the kernelspec env stanza that would have otherwise overridden the user-provided values. """ env = kwargs.get("env", {}) + # Although this particular env is not "user-provided", we still handle its capture here. + self.tenant_id = env.get("KERNEL_TENANT_ID", UNIVERSAL_TENANT_ID) + # If KERNEL_LAUNCH_TIMEOUT is passed in the payload, override it. self.kernel_launch_timeout = float( env.get("KERNEL_LAUNCH_TIMEOUT", default_kernel_launch_timeout) diff --git a/enterprise_gateway/services/sessions/kernelsessionmanager.py b/enterprise_gateway/services/sessions/kernelsessionmanager.py index 419ce4f30..2c95ec723 100644 --- a/enterprise_gateway/services/sessions/kernelsessionmanager.py +++ b/enterprise_gateway/services/sessions/kernelsessionmanager.py @@ -93,7 +93,7 @@ def create_session(self, kernel_id, **kwargs): # Compose the kernel_session entry kernel_session = dict() kernel_session["kernel_id"] = kernel_id - kernel_session["tenant_id"] = self.kernel_manager.tenant_kernels.get_tenant_id(kernel_id) + kernel_session["tenant_id"] = km.tenant_id kernel_session["username"] = KernelSessionManager.get_kernel_username(**kwargs) kernel_session["kernel_name"] = km.kernel_name From 28c7bd1296e2b5e53dee63c2c95ed999624de7c8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 10 Jun 2022 17:27:56 +0000 Subject: [PATCH 5/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- enterprise_gateway/services/kernels/handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enterprise_gateway/services/kernels/handlers.py b/enterprise_gateway/services/kernels/handlers.py index ab2681a56..a167b4edc 100644 --- a/enterprise_gateway/services/kernels/handlers.py +++ b/enterprise_gateway/services/kernels/handlers.py @@ -79,7 +79,7 @@ async def post(self): } ) # Pull client-provided tenant_id from env, defaulting to UNIVERSAL_TENANT_ID. - tenant_id = model["env"].get('JUPYTER_GATEWAY_TENANT_ID', tenant_id) + tenant_id = model["env"].get("JUPYTER_GATEWAY_TENANT_ID", tenant_id) # Set KERNEL_TENANT_ID. If already present, we override with the value in the body env["KERNEL_TENANT_ID"] = tenant_id From 3a6d1f786b56526b56e0e9041558b9609d3fe0fa Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Fri, 10 Jun 2022 10:28:53 -0700 Subject: [PATCH 6/7] apply linting updates --- enterprise_gateway/services/kernels/handlers.py | 2 +- enterprise_gateway/services/kernels/remotemanager.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/enterprise_gateway/services/kernels/handlers.py b/enterprise_gateway/services/kernels/handlers.py index ab2681a56..a167b4edc 100644 --- a/enterprise_gateway/services/kernels/handlers.py +++ b/enterprise_gateway/services/kernels/handlers.py @@ -79,7 +79,7 @@ async def post(self): } ) # Pull client-provided tenant_id from env, defaulting to UNIVERSAL_TENANT_ID. - tenant_id = model["env"].get('JUPYTER_GATEWAY_TENANT_ID', tenant_id) + tenant_id = model["env"].get("JUPYTER_GATEWAY_TENANT_ID", tenant_id) # Set KERNEL_TENANT_ID. If already present, we override with the value in the body env["KERNEL_TENANT_ID"] = tenant_id diff --git a/enterprise_gateway/services/kernels/remotemanager.py b/enterprise_gateway/services/kernels/remotemanager.py index 93ab19540..356bb1569 100644 --- a/enterprise_gateway/services/kernels/remotemanager.py +++ b/enterprise_gateway/services/kernels/remotemanager.py @@ -7,7 +7,7 @@ import signal import time import uuid -from typing import Dict, List, Optional +from typing import Optional from jupyter_client.ioloop.manager import AsyncIOLoopKernelManager from jupyter_server.services.kernels.kernelmanager import AsyncMappingKernelManager From 87cfc8c0c0f168b734b6a9b72d7ea24dbe0b3e52 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Fri, 10 Jun 2022 12:11:26 -0700 Subject: [PATCH 7/7] Fix test --- enterprise_gateway/tests/test_handlers.py | 29 +++++++++++++---------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/enterprise_gateway/tests/test_handlers.py b/enterprise_gateway/tests/test_handlers.py index 75b585b92..c49b7e38b 100644 --- a/enterprise_gateway/tests/test_handlers.py +++ b/enterprise_gateway/tests/test_handlers.py @@ -11,7 +11,6 @@ from tornado.testing import gen_test from tornado.websocket import websocket_connect -from ..services.kernels.remotemanager import UNIVERSAL_TENANT_ID from .test_gatewayapp import RESOURCES, TestGatewayAppBase @@ -348,6 +347,7 @@ def test_get_kernels(self): @gen_test def test_get_tenant_kernels(self): """Server should respond with running kernel information.""" + self.app.web_app.settings["eg_list_kernels"] = True my_tenant_id = "6e22b538-cbe2-42b6-8598-e5c58be997b1" other_tenant_id = "1529760d-945c-48fa-a45f-15064883e3bb" response = yield self.http_client.fetch( @@ -359,21 +359,27 @@ def test_get_tenant_kernels(self): # Launch a kernel for this tenant response = yield self.http_client.fetch( - self.get_url("/api/kernels"), method="POST", body=f'{{"tenant_id":"{my_tenant_id}"}}' + self.get_url("/api/kernels"), + method="POST", + body=f'{{"env": {{"JUPYTER_GATEWAY_TENANT_ID":"{my_tenant_id}"}}}}', ) self.assertEqual(response.code, 201) my_kernel = json_decode(response.body) # Launch a kernel for a different tenant response = yield self.http_client.fetch( - self.get_url("/api/kernels"), method="POST", body=f'{{"tenant_id":"{other_tenant_id}"}}' + self.get_url("/api/kernels"), + method="POST", + body=f'{{"env": {{"JUPYTER_GATEWAY_TENANT_ID":"{other_tenant_id}"}}}}', ) self.assertEqual(response.code, 201) other_kernel = json_decode(response.body) - # Launch another kernel for this tenant + # Launch another kernel for original tenant response = yield self.http_client.fetch( - self.get_url("/api/kernels"), method="POST", body=f'{{"tenant_id":"{my_tenant_id}"}}' + self.get_url("/api/kernels"), + method="POST", + body=f'{{"env": {{"JUPYTER_GATEWAY_TENANT_ID":"{my_tenant_id}"}}}}', ) self.assertEqual(response.code, 201) my_kernel2 = json_decode(response.body) @@ -397,6 +403,12 @@ def test_get_tenant_kernels(self): self.assertEqual(len(other_kernels), 1) self.assertEqual(other_kernels[0]["id"], other_kernel["id"]) + # Check the list w/o a parameter - all expected + response = yield self.http_client.fetch(self.get_url("/api/kernels")) + self.assertEqual(response.code, 200) + other_kernels = json_decode(response.body) + self.assertEqual(len(other_kernels), 3) + # Delete the other tenant's kernel and ensure its not listed response = yield self.http_client.fetch( self.get_url(f"/api/kernels/{other_kernel['id']}"), method="DELETE" @@ -422,13 +434,6 @@ def test_get_tenant_kernels(self): self.assertEqual(len(my_kernels), 1) self.assertEqual(my_kernels[0]["id"], my_kernel2["id"]) - # Check the list using the default (Universal) tenant_id. Since we didn't allow kernel lists, - # we expect a 403 - response = yield self.http_client.fetch( - self.get_url(f"/api/kernels?tenant_id={UNIVERSAL_TENANT_ID}"), raise_error=False - ) - self.assertEqual(response.code, 403) - @gen_test def test_kernel_comm(self): """Default kernel should launch and accept commands."""