diff --git a/torchx/schedulers/kubernetes_mcad_scheduler.py b/torchx/schedulers/kubernetes_mcad_scheduler.py index 818aa4f9a..9fac1410f 100644 --- a/torchx/schedulers/kubernetes_mcad_scheduler.py +++ b/torchx/schedulers/kubernetes_mcad_scheduler.py @@ -13,10 +13,9 @@ Prerequisites ============== -TorchX kubernetes scheduler depends on AppWrapper + MCAD. - -Install MCAD +TorchX Kubernetes_MCAD scheduler depends on AppWrapper + MCAD. +Install MCAD: See deploying Multi-Cluster-Application-Dispatcher guide https://github.com/project-codeflare/multi-cluster-app-dispatcher/blob/main/doc/deploy/deployment.md @@ -168,6 +167,7 @@ def role_to_pod( role: Role, service_account: Optional[str], image_secret: Optional[str], + coscheduler_name: Optional[str], ) -> "V1Pod": from kubernetes.client.models import ( # noqa: F811 redefinition of unused V1Container, @@ -338,6 +338,7 @@ def role_to_pod( service_account_name=service_account, volumes=volumes, node_selector=node_selector, + scheduler_name=coscheduler_name, ), metadata=V1ObjectMeta( name=name, @@ -352,6 +353,31 @@ def role_to_pod( ) +def create_pod_group(role: Role, namespace: str, app_id: str) -> "Dict[str, Any]": + pod_group_name = app_id + "-" + cleanup_str(role.name) + "-pg" + + pod_group: Dict[str, Any] = { + "apiVersion": "scheduling.sigs.k8s.io/v1alpha1", + "kind": "PodGroup", + "metadata": { + "name": pod_group_name, + "namespace": namespace, + "labels": { + "appwrapper.mcad.ibm.com": app_id, + }, + }, + "spec": { + "minMember": role.num_replicas, + }, + } + + genericitem_pod_group: Dict[str, Any] = { + "replicas": 1, + "generictemplate": pod_group, + } + return genericitem_pod_group + + def mcad_svc(svc_name: str, namespace: str, service_port: str) -> "V1Service": from kubernetes.client.models import ( # noqa: F401, F811 V1Container, @@ -436,6 +462,7 @@ def app_to_resource( namespace: str, service_account: Optional[str], image_secret: Optional[str], + coscheduler_name: Optional[str], priority: Optional[int] = None, ) -> Dict[str, Any]: """ @@ -448,6 +475,12 @@ def app_to_resource( genericitems = [] unique_app_id = cleanup_str(make_unique(app.name)) + + if coscheduler_name is not None: + for role_idx, role in enumerate(app.roles): + genericitem_pod_group = create_pod_group(role, namespace, unique_app_id) + genericitems.append(genericitem_pod_group) + for role_idx, role in enumerate(app.roles): for replica_id in range(role.num_replicas): values = macros.Values( @@ -473,8 +506,18 @@ def app_to_resource( replica_role, service_account, image_secret, + coscheduler_name, + ) + pod.metadata.labels.update( + pod_labels( + app=app, + role_idx=role_idx, + role=role, + replica_id=replica_id, + coscheduler_name=coscheduler_name, + app_id=unique_app_id, + ) ) - pod.metadata.labels.update(pod_labels(app, role_idx, role, replica_id)) genericitem: Dict[str, Any] = { "replicas": 1, @@ -676,6 +719,7 @@ class KubernetesMCADOpts(TypedDict, total=False): service_account: Optional[str] priority: Optional[int] image_secret: Optional[str] + coscheduler_name: Optional[str] class KubernetesMCADScheduler(DockerWorkspaceMixin, Scheduler[KubernetesMCADOpts]): @@ -699,6 +743,14 @@ class KubernetesMCADScheduler(DockerWorkspaceMixin, Scheduler[KubernetesMCADOpts $ torchx run --scheduler kubernetes_mcad --scheduler_args namespace=default,image_repo= utils.echo --image alpine:latest --msg hello ... + The TorchX-MCAD scheduler can be used with a secondary scheduler on Kubernetes. + To enable this, the user must provide the name of the coscheduler. + With this feature, a PodGroup is defined for each TorchX role and the coscheduler + handles secondary scheduling on the Kubernetes cluster. For additional resources, see: + 1. PodGroups and Coscheduling: https://github.com/kubernetes-sigs/scheduler-plugins/tree/release-1.24/pkg/coscheduling + 2. Installing Secondary schedulers: https://github.com/kubernetes-sigs/scheduler-plugins/blob/release-1.24/doc/install.md + 3. PodGroup CRD: https://github.com/kubernetes-sigs/scheduler-plugins/blob/release-1.24/config/crd/bases/scheduling.sigs.k8s.io_podgroups.yaml + **Config Options** .. runopts:: @@ -861,9 +913,20 @@ def _submit_dryrun( namespace = cfg.get("namespace") assert isinstance(namespace, str), "namespace must be a str" + coscheduler_name = cfg.get("coscheduler_name") + assert coscheduler_name is None or isinstance( + coscheduler_name, str + ), "coscheduler_name must be a string" + resource = app_to_resource( - app, namespace, service_account, image_secret, priority + app=app, + namespace=namespace, + service_account=service_account, + image_secret=image_secret, + coscheduler_name=coscheduler_name, + priority=priority, ) + req = KubernetesMCADJob( resource=resource, images_to_push=images_to_push, @@ -917,6 +980,11 @@ def run_opts(self) -> runopts: type_=str, help="The name of the Kubernetes/OpenShift secret set up for private images", ) + opts.add( + "coscheduler_name", + type_=str, + help="Option to run TorchX-MCAD with a co-scheduler. User must provide the co-scheduler name.", + ) return opts def describe(self, app_id: str) -> Optional[DescribeAppResponse]: @@ -1070,12 +1138,21 @@ def create_scheduler(session_name: str, **kwargs: Any) -> KubernetesMCADSchedule # TODO update to Kubernetes standard labels (https://kubernetes.io/docs/concepts/overview/working-with-objects/common-labels/) def pod_labels( - app: AppDef, role_idx: int, role: Role, replica_id: int + app: AppDef, + role_idx: int, + role: Role, + replica_id: int, + coscheduler_name: Optional[str], + app_id: str, ) -> Dict[str, str]: - return { + labels = { LABEL_VERSION: torchx.__version__, LABEL_APP_NAME: app.name, LABEL_ROLE_INDEX: str(role_idx), LABEL_ROLE_NAME: role.name, LABEL_REPLICA_ID: str(replica_id), } + if coscheduler_name is not None: + pod_group = app_id + "-" + cleanup_str(role.name) + "-pg" + labels.update({"pod-group.scheduling.sigs.k8s.io": pod_group}) + return labels diff --git a/torchx/schedulers/test/kubernetes_mcad_scheduler_test.py b/torchx/schedulers/test/kubernetes_mcad_scheduler_test.py index a3b8b036a..f42bfc938 100644 --- a/torchx/schedulers/test/kubernetes_mcad_scheduler_test.py +++ b/torchx/schedulers/test/kubernetes_mcad_scheduler_test.py @@ -22,6 +22,7 @@ from torchx.schedulers.kubernetes_mcad_scheduler import ( app_to_resource, cleanup_str, + create_pod_group, create_scheduler, get_appwrapper_status, get_port_for_service, @@ -170,7 +171,12 @@ def test_app_to_resource_resolved_macros(self) -> None: ) as make_unique_ctx: make_unique_ctx.return_value = unique_app_name resource = app_to_resource( - app, "default", service_account=None, image_secret=None, priority=0 + app, + "default", + service_account=None, + image_secret=None, + coscheduler_name=None, + priority=0, ) actual_cmd = ( resource["spec"]["resources"]["GenericItems"][0]["generictemplate"] @@ -191,7 +197,12 @@ def test_app_to_resource_resolved_macros(self) -> None: def test_retry_policy_not_set(self) -> None: app = _test_app() resource = app_to_resource( - app, "default", service_account=None, image_secret=None, priority=0 + app, + "default", + service_account=None, + image_secret=None, + coscheduler_name=None, + priority=0, ) item0 = resource["spec"]["resources"]["GenericItems"][0] self.assertListEqual( @@ -204,7 +215,12 @@ def test_retry_policy_not_set(self) -> None: for role in app.roles: role.max_retries = 0 resource = app_to_resource( - app, "default", service_account=None, image_secret=None, priority=0 + app, + "default", + service_account=None, + image_secret=None, + coscheduler_name=None, + priority=0, ) item0 = resource["spec"]["resources"]["GenericItems"][0] self.assertFalse("policies" in item0) @@ -230,6 +246,7 @@ def test_role_to_pod(self) -> None: app = _test_app() unique_app_name = "app-name" image_secret = "secret-name" + coscheduler_name = "test-co-scheduler-name" pod = role_to_pod( "app-name-0", unique_app_name, @@ -237,6 +254,7 @@ def test_role_to_pod(self) -> None: app.roles[0], service_account="srvacc", image_secret=image_secret, + coscheduler_name=coscheduler_name, ) imagesecret = V1LocalObjectReference(name=image_secret) @@ -311,6 +329,7 @@ def test_role_to_pod(self) -> None: ), ), ], + scheduler_name=coscheduler_name, node_selector={}, ), metadata=V1ObjectMeta( @@ -328,6 +347,36 @@ def test_role_to_pod(self) -> None: want, ) + def test_create_pod_group(self) -> None: + app = _test_app() + unique_app_name = "app-name" + namespace = "default" + podgroup = create_pod_group(app.roles[0], namespace, unique_app_name) + + pod_group_name = unique_app_name + "-" + cleanup_str(app.roles[0].name) + "-pg" + expected_pod_group: Dict[str, Any] = { + "apiVersion": "scheduling.sigs.k8s.io/v1alpha1", + "kind": "PodGroup", + "metadata": { + "name": pod_group_name, + "namespace": namespace, + "labels": { + "appwrapper.mcad.ibm.com": unique_app_name, + }, + }, + "spec": { + "minMember": 1, + }, + } + want: Dict[str, Any] = { + "replicas": 1, + "generictemplate": expected_pod_group, + } + self.assertEqual( + podgroup, + want, + ) + def test_create_mcad_service(self) -> None: from kubernetes.client.models import ( # noqa: F401 imported but unused V1Container, @@ -435,7 +484,13 @@ def test_get_tasks_status_description(self) -> None: def test_submit_dryrun(self) -> None: scheduler = create_scheduler("test") app = _test_app() - cfg = KubernetesMCADOpts({"priority": 0, "namespace": "test_namespace"}) + cfg = KubernetesMCADOpts( + { + "priority": 0, + "namespace": "test_namespace", + "coscheduler_name": "test_coscheduler", + } + ) with patch( "torchx.schedulers.kubernetes_mcad_scheduler.make_unique" ) as make_unique_ctx: @@ -454,6 +509,17 @@ def test_submit_dryrun(self) -> None: priority: 0 resources: GenericItems: + - generictemplate: + apiVersion: scheduling.sigs.k8s.io/v1alpha1 + kind: PodGroup + metadata: + labels: + appwrapper.mcad.ibm.com: app-name + name: app-name-trainerfoo-pg + namespace: test_namespace + spec: + minMember: 1 + replicas: 1 - generictemplate: apiVersion: v1 kind: Pod @@ -461,6 +527,7 @@ def test_submit_dryrun(self) -> None: annotations: sidecar.istio.io/inject: 'false' labels: + pod-group.scheduling.sigs.k8s.io: app-name-trainerfoo-pg torchx.pytorch.org/app-name: test torchx.pytorch.org/replica-id: '0' torchx.pytorch.org/role-index: '0' @@ -511,6 +578,7 @@ def test_submit_dryrun(self) -> None: - {{}} nodeSelector: {{}} restartPolicy: Never + schedulerName: test_coscheduler subdomain: app-name volumes: - emptyDir: @@ -553,7 +621,6 @@ def test_submit_dryrun(self) -> None: def test_get_role_information( self, get_namespaced_custom_object: MagicMock ) -> None: - get_namespaced_custom_object.return_value = _test_mcad_generic_item() spec = get_namespaced_custom_object.return_value["spec"] @@ -591,7 +658,6 @@ def test_get_role_information( def test_get_role_information_no_generic_items( self, get_namespaced_custom_object: MagicMock ) -> None: - test_generic_item = _test_mcad_generic_item() test_generic_item["spec"]["resources"]["GenericItems"][0] = {} get_namespaced_custom_object.return_value = test_generic_item @@ -610,7 +676,6 @@ def test_get_role_information_no_generic_items( def test_get_role_information_no_metadata( self, get_namespaced_custom_object: MagicMock ) -> None: - test_generic_item = _test_mcad_generic_item() test_generic_item["spec"]["resources"]["GenericItems"][0]["generictemplate"] = { "spec": { @@ -663,7 +728,6 @@ def test_get_role_information_no_metadata( def test_get_role_information_no_label( self, get_namespaced_custom_object: MagicMock ) -> None: - test_generic_item = _test_mcad_generic_item() test_generic_item["spec"]["resources"]["GenericItems"][0]["generictemplate"][ "metadata" @@ -683,7 +747,6 @@ def test_get_role_information_no_label( def test_get_role_information_no_role_name( self, get_namespaced_custom_object: MagicMock ) -> None: - test_generic_item = _test_mcad_generic_item() test_generic_item["spec"]["resources"]["GenericItems"][0]["generictemplate"][ "metadata" @@ -703,7 +766,6 @@ def test_get_role_information_no_role_name( def test_get_role_information_no_specs( self, get_namespaced_custom_object: MagicMock ) -> None: - test_generic_item = _test_mcad_generic_item() test_generic_item["spec"]["resources"]["GenericItems"][0] = { "generictemplate": { @@ -748,7 +810,6 @@ def test_get_role_information_no_specs( def test_get_role_information_no_image_name( self, get_namespaced_custom_object: MagicMock ) -> None: - test_generic_item = _test_mcad_generic_item() test_generic_item["spec"]["resources"]["GenericItems"][0]["generictemplate"][ "spec" @@ -812,7 +873,6 @@ def test_get_role_information_no_image_name( def test_get_role_information_no_resources( self, get_namespaced_custom_object: MagicMock ) -> None: - test_generic_item = _test_mcad_generic_item() test_generic_item["spec"]["resources"]["GenericItems"][0]["generictemplate"][ "spec" @@ -869,7 +929,6 @@ def test_get_role_information_no_resources( def test_get_role_information_no_cpu( self, get_namespaced_custom_object: MagicMock ) -> None: - test_generic_item = _test_mcad_generic_item() test_generic_item["spec"]["resources"]["GenericItems"][0]["generictemplate"][ "spec" @@ -937,7 +996,6 @@ def test_get_role_information_no_cpu( def test_get_role_information_no_memory( self, get_namespaced_custom_object: MagicMock ) -> None: - test_generic_item = _test_mcad_generic_item() test_generic_item["spec"]["resources"]["GenericItems"][0]["generictemplate"][ "spec" @@ -1005,7 +1063,6 @@ def test_get_role_information_no_memory( def test_get_role_information_no_ports( self, get_namespaced_custom_object: MagicMock ) -> None: - test_generic_item = _test_mcad_generic_item() test_generic_item["spec"]["resources"]["GenericItems"][0]["generictemplate"][ "spec" @@ -1073,7 +1130,6 @@ def test_get_role_information_no_ports( def test_get_role_information_no_volume_mounts( self, get_namespaced_custom_object: MagicMock ) -> None: - test_generic_item = _test_mcad_generic_item() test_generic_item["spec"]["resources"]["GenericItems"][0]["generictemplate"][ "spec" @@ -1160,6 +1216,7 @@ def test_volume_mounts(self) -> None: role, service_account="", image_secret="", + coscheduler_name="", ) self.assertEqual( pod.spec.volumes, @@ -1216,6 +1273,7 @@ def test_device_mounts(self) -> None: role, service_account="", image_secret="", + coscheduler_name="", ) self.assertEqual( pod.spec.volumes[1:], @@ -1273,6 +1331,7 @@ def test_resource_devices(self) -> None: role, service_account="", image_secret="", + coscheduler_name="", ) self.assertEqual( pod.spec.containers[0].resources.limits, @@ -1307,6 +1366,7 @@ def test_instance_type(self) -> None: role, service_account="", image_secret="", + coscheduler_name="", ) self.assertEqual( pod.spec.node_selector, @@ -1465,7 +1525,6 @@ def test_submit_job_name_conflict( @patch("kubernetes.client.CustomObjectsApi.get_namespaced_custom_object") def test_describe(self, get_namespaced_custom_object: MagicMock) -> None: - get_namespaced_custom_object.return_value = { "status": { "state": "Running", @@ -1529,7 +1588,6 @@ def test_describe(self, get_namespaced_custom_object: MagicMock) -> None: def test_describe_pending_dispatch( self, get_namespaced_custom_object: MagicMock ) -> None: - get_namespaced_custom_object.return_value = { "status": { "state": "Pending", @@ -1660,6 +1718,7 @@ def test_runopts(self) -> None: "service_account", "priority", "image_secret", + "coscheduler_name", }, )