diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8a87bad48..1d6371db8 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -76,7 +76,7 @@ pytest -v src/codeflare_sdk ### Local e2e Testing -- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/e2e.md) +- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/sphinx/user-docs/e2e.rst) #### Code Coverage diff --git a/docs/sphinx/user-docs/ray-cluster-interaction.rst b/docs/sphinx/user-docs/ray-cluster-interaction.rst index 8e7929b4d..717f80678 100644 --- a/docs/sphinx/user-docs/ray-cluster-interaction.rst +++ b/docs/sphinx/user-docs/ray-cluster-interaction.rst @@ -66,6 +66,12 @@ cluster.up() | The ``cluster.up()`` function creates a Ray Cluster in the given namespace. +cluster.apply() +------------ + +| The ``cluster.apply()`` function applies a Ray Cluster in the given namespace. If the cluster already exists, it is updated. +| If it does not exist it is created. + cluster.down() -------------- diff --git a/src/codeflare_sdk/common/kueue/test_kueue.py b/src/codeflare_sdk/common/kueue/test_kueue.py index 0093058cd..bbc54e9e0 100644 --- a/src/codeflare_sdk/common/kueue/test_kueue.py +++ b/src/codeflare_sdk/common/kueue/test_kueue.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. from ..utils.unit_test_support import ( - apply_template, get_local_queue, - createClusterConfig, + create_cluster_config, get_template_variables, + apply_template, ) from unittest.mock import patch from codeflare_sdk.ray.cluster.cluster import Cluster, ClusterConfiguration @@ -23,7 +23,7 @@ import os import filecmp from pathlib import Path -from .kueue import list_local_queues +from .kueue import list_local_queues, local_queue_exists, add_queue_label parent = Path(__file__).resolve().parents[4] # project directory aw_dir = os.path.expanduser("~/.codeflare/resources/") @@ -51,7 +51,7 @@ def test_cluster_creation_no_aw_local_queue(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-cluster-kueue" config.write_to_file = True config.local_queue = "local-queue-default" @@ -67,7 +67,7 @@ def test_cluster_creation_no_aw_local_queue(mocker): assert cluster_kueue == expected_rc # With resources loaded in memory, no Local Queue specified. - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-cluster-kueue" config.write_to_file = False cluster = Cluster(config) @@ -84,7 +84,7 @@ def test_aw_creation_local_queue(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-aw-kueue" config.appwrapper = True config.write_to_file = True @@ -101,7 +101,7 @@ def test_aw_creation_local_queue(mocker): assert aw_kueue == expected_rc # With resources loaded in memory, no Local Queue specified. - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-aw-kueue" config.appwrapper = True config.write_to_file = False @@ -120,7 +120,7 @@ def test_get_local_queue_exists_fail(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-aw-kueue" config.appwrapper = True config.write_to_file = True @@ -175,6 +175,123 @@ def test_list_local_queues(mocker): assert lqs == [] +def test_local_queue_exists_found(mocker): + # Mock Kubernetes client and list_namespaced_custom_object method + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mock_api_instance = mocker.Mock() + mocker.patch("kubernetes.client.CustomObjectsApi", return_value=mock_api_instance) + mocker.patch("codeflare_sdk.ray.cluster.cluster.config_check") + + # Mock return value for list_namespaced_custom_object + mock_api_instance.list_namespaced_custom_object.return_value = { + "items": [ + {"metadata": {"name": "existing-queue"}}, + {"metadata": {"name": "another-queue"}}, + ] + } + + # Call the function + namespace = "test-namespace" + local_queue_name = "existing-queue" + result = local_queue_exists(namespace, local_queue_name) + + # Assertions + assert result is True + mock_api_instance.list_namespaced_custom_object.assert_called_once_with( + group="kueue.x-k8s.io", + version="v1beta1", + namespace=namespace, + plural="localqueues", + ) + + +def test_local_queue_exists_not_found(mocker): + # Mock Kubernetes client and list_namespaced_custom_object method + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mock_api_instance = mocker.Mock() + mocker.patch("kubernetes.client.CustomObjectsApi", return_value=mock_api_instance) + mocker.patch("codeflare_sdk.ray.cluster.cluster.config_check") + + # Mock return value for list_namespaced_custom_object + mock_api_instance.list_namespaced_custom_object.return_value = { + "items": [ + {"metadata": {"name": "another-queue"}}, + {"metadata": {"name": "different-queue"}}, + ] + } + + # Call the function + namespace = "test-namespace" + local_queue_name = "non-existent-queue" + result = local_queue_exists(namespace, local_queue_name) + + # Assertions + assert result is False + mock_api_instance.list_namespaced_custom_object.assert_called_once_with( + group="kueue.x-k8s.io", + version="v1beta1", + namespace=namespace, + plural="localqueues", + ) + + +import pytest +from unittest import mock # If you're also using mocker from pytest-mock + + +def test_add_queue_label_with_valid_local_queue(mocker): + # Mock the kubernetes.client.CustomObjectsApi and its response + mock_api_instance = mocker.patch("kubernetes.client.CustomObjectsApi") + mock_api_instance.return_value.list_namespaced_custom_object.return_value = { + "items": [ + {"metadata": {"name": "valid-queue"}}, + ] + } + + # Mock other dependencies + mocker.patch("codeflare_sdk.common.kueue.local_queue_exists", return_value=True) + mocker.patch( + "codeflare_sdk.common.kueue.get_default_kueue_name", + return_value="default-queue", + ) + + # Define input item and parameters + item = {"metadata": {}} + namespace = "test-namespace" + local_queue = "valid-queue" + + # Call the function + add_queue_label(item, namespace, local_queue) + + # Assert that the label is added to the item + assert item["metadata"]["labels"] == {"kueue.x-k8s.io/queue-name": "valid-queue"} + + +def test_add_queue_label_with_invalid_local_queue(mocker): + # Mock the kubernetes.client.CustomObjectsApi and its response + mock_api_instance = mocker.patch("kubernetes.client.CustomObjectsApi") + mock_api_instance.return_value.list_namespaced_custom_object.return_value = { + "items": [ + {"metadata": {"name": "valid-queue"}}, + ] + } + + # Mock the local_queue_exists function to return False + mocker.patch("codeflare_sdk.common.kueue.local_queue_exists", return_value=False) + + # Define input item and parameters + item = {"metadata": {}} + namespace = "test-namespace" + local_queue = "invalid-queue" + + # Call the function and expect a ValueError + with pytest.raises( + ValueError, + match="local_queue provided does not exist or is not in this namespace", + ): + add_queue_label(item, namespace, local_queue) + + # Make sure to always keep this function last def test_cleanup(): os.remove(f"{aw_dir}unit-test-cluster-kueue.yaml") diff --git a/src/codeflare_sdk/common/utils/unit_test_support.py b/src/codeflare_sdk/common/utils/unit_test_support.py index 373283b82..28a303813 100644 --- a/src/codeflare_sdk/common/utils/unit_test_support.py +++ b/src/codeflare_sdk/common/utils/unit_test_support.py @@ -29,32 +29,34 @@ aw_dir = os.path.expanduser("~/.codeflare/resources/") -def createClusterConfig(): +def create_cluster_config(num_workers=2, write_to_file=False): config = ClusterConfiguration( name="unit-test-cluster", namespace="ns", - num_workers=2, + num_workers=num_workers, worker_cpu_requests=3, worker_cpu_limits=4, worker_memory_requests=5, worker_memory_limits=6, appwrapper=True, - write_to_file=False, + write_to_file=write_to_file, ) return config -def createClusterWithConfig(mocker): - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", - return_value={"spec": {"domain": "apps.cluster.awsroute.org"}}, - ) - cluster = Cluster(createClusterConfig()) +def create_cluster(mocker, num_workers=2, write_to_file=False): + cluster = Cluster(create_cluster_config(num_workers, write_to_file)) return cluster -def createClusterWrongType(): +def patch_cluster_with_dynamic_client(mocker, cluster, dynamic_client=None): + mocker.patch.object(cluster, "get_dynamic_client", return_value=dynamic_client) + mocker.patch.object(cluster, "down", return_value=None) + mocker.patch.object(cluster, "config_check", return_value=None) + # mocker.patch.object(cluster, "_throw_for_no_raycluster", return_value=None) + + +def create_cluster_wrong_type(): config = ClusterConfiguration( name="unit-test-cluster", namespace="ns", @@ -412,6 +414,48 @@ def mocked_ingress(port, cluster_name="unit-test-cluster", annotations: dict = N return mock_ingress +# Global dictionary to maintain state in the mock +cluster_state = {} + + +# The mock side_effect function for server_side_apply +def mock_server_side_apply(resource, body=None, name=None, namespace=None, **kwargs): + # Simulate the behavior of server_side_apply: + # Update a mock state that represents the cluster's current configuration. + # Stores the state in a global dictionary for simplicity. + + global cluster_state + + if not resource or not body or not name or not namespace: + raise ValueError("Missing required parameters for server_side_apply") + + # Extract worker count from the body if it exists + try: + worker_count = ( + body["spec"]["workerGroupSpecs"][0]["replicas"] + if "spec" in body and "workerGroupSpecs" in body["spec"] + else None + ) + except KeyError: + worker_count = None + + # Apply changes to the cluster_state mock + cluster_state[name] = { + "namespace": namespace, + "worker_count": worker_count, + "body": body, + } + + # Return a response that mimics the behavior of a successful apply + return { + "status": "success", + "applied": True, + "name": name, + "namespace": namespace, + "worker_count": worker_count, + } + + @patch.dict("os.environ", {"NB_PREFIX": "test-prefix"}) def create_cluster_all_config_params(mocker, cluster_name, is_appwrapper) -> Cluster: mocker.patch( diff --git a/src/codeflare_sdk/common/widgets/test_widgets.py b/src/codeflare_sdk/common/widgets/test_widgets.py index 12c238544..a7d3de923 100644 --- a/src/codeflare_sdk/common/widgets/test_widgets.py +++ b/src/codeflare_sdk/common/widgets/test_widgets.py @@ -15,7 +15,7 @@ import codeflare_sdk.common.widgets.widgets as cf_widgets import pandas as pd from unittest.mock import MagicMock, patch -from ..utils.unit_test_support import get_local_queue, createClusterConfig +from ..utils.unit_test_support import get_local_queue, create_cluster_config from codeflare_sdk.ray.cluster.cluster import Cluster from codeflare_sdk.ray.cluster.status import ( RayCluster, @@ -38,7 +38,7 @@ def test_cluster_up_down_buttons(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = Cluster(createClusterConfig()) + cluster = Cluster(create_cluster_config()) with patch("ipywidgets.Button") as MockButton, patch( "ipywidgets.Checkbox" diff --git a/src/codeflare_sdk/ray/cluster/cluster.py b/src/codeflare_sdk/ray/cluster/cluster.py index a3f345547..b42efd87e 100644 --- a/src/codeflare_sdk/ray/cluster/cluster.py +++ b/src/codeflare_sdk/ray/cluster/cluster.py @@ -52,9 +52,15 @@ import requests from kubernetes import config +from kubernetes.dynamic import DynamicClient +from kubernetes import client as k8s_client +from kubernetes.client.rest import ApiException + from kubernetes.client.rest import ApiException import warnings +CF_SDK_FIELD_MANAGER = "codeflare-sdk" + class Cluster: """ @@ -84,6 +90,12 @@ def __init__(self, config: ClusterConfiguration): if is_notebook(): cluster_up_down_buttons(self) + def get_dynamic_client(self): # pragma: no cover + return DynamicClient(get_api_client()) + + def config_check(self): + return config_check() + @property def _client_headers(self): k8_client = get_api_client() @@ -95,9 +107,7 @@ def _client_headers(self): @property def _client_verify_tls(self): - if not _is_openshift_cluster or not self.config.verify_tls: - return False - return True + return _is_openshift_cluster and self.config.verify_tls @property def job_client(self): @@ -121,7 +131,6 @@ def create_resource(self): Called upon cluster object creation, creates an AppWrapper yaml based on the specifications of the ClusterConfiguration. """ - if self.config.namespace is None: self.config.namespace = get_current_namespace() if self.config.namespace is None: @@ -130,7 +139,6 @@ def create_resource(self): raise TypeError( f"Namespace {self.config.namespace} is of type {type(self.config.namespace)}. Check your Kubernetes Authentication." ) - return build_ray_cluster(self) # creates a new cluster with the provided or default spec @@ -139,10 +147,11 @@ def up(self): Applies the Cluster yaml, pushing the resource request onto the Kueue localqueue. """ - + print( + "WARNING: The up() function is planned for deprecation in favor of apply()." + ) # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError self._throw_for_no_raycluster() - namespace = self.config.namespace try: @@ -176,6 +185,54 @@ def up(self): except Exception as e: # pragma: no cover return _kube_api_error_handling(e) + # Applies a new cluster with the provided or default spec + def apply(self, force=False): + """ + Applies the Cluster yaml using server-side apply. + If 'force' is set to True, conflicts will be forced. + """ + # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError + self._throw_for_no_raycluster() + namespace = self.config.namespace + name = self.config.name + try: + self.config_check() + api_instance = client.CustomObjectsApi(get_api_client()) + crds = self.get_dynamic_client().resources + if self.config.appwrapper: + api_version = "workload.codeflare.dev/v1beta2" + api_instance = crds.get(api_version=api_version, kind="AppWrapper") + # defaulting body to resource_yaml + body = self.resource_yaml + if self.config.write_to_file: + # if write_to_file is True, load the file from AppWrapper yaml and update body + with open(self.resource_yaml) as f: + aw = yaml.load(f, Loader=yaml.FullLoader) + body = aw + api_instance.server_side_apply( + field_manager=CF_SDK_FIELD_MANAGER, + group="workload.codeflare.dev", + version="v1beta2", + namespace=namespace, + plural="appwrappers", + body=body, + force_conflicts=force, + ) + print( + f"AppWrapper: '{name}' configuration has successfully been applied" + ) + else: + api_version = "ray.io/v1" + api_instance = crds.get(api_version=api_version, kind="RayCluster") + self._component_resources_apply( + namespace=namespace, api_instance=api_instance + ) + print(f"Ray Cluster: '{name}' has successfully been applied") + except AttributeError as e: + raise RuntimeError(f"Failed to initialize DynamicClient: {e}") + except Exception as e: # pragma: no cover + return _kube_api_error_handling(e) + def _throw_for_no_raycluster(self): api_instance = client.CustomObjectsApi(get_api_client()) try: @@ -204,7 +261,7 @@ def down(self): resource_name = self.config.name self._throw_for_no_raycluster() try: - config_check() + self.config_check() api_instance = client.CustomObjectsApi(get_api_client()) if self.config.appwrapper: api_instance.delete_namespaced_custom_object( @@ -507,6 +564,16 @@ def _component_resources_up( else: _create_resources(self.resource_yaml, namespace, api_instance) + def _component_resources_apply( + self, namespace: str, api_instance: client.CustomObjectsApi + ): + if self.config.write_to_file: + with open(self.resource_yaml) as f: + ray_cluster = yaml.safe_load(f) + _apply_ray_cluster(ray_cluster, namespace, api_instance) + else: + _apply_ray_cluster(self.resource_yaml, namespace, api_instance) + def _component_resources_down( self, namespace: str, api_instance: client.CustomObjectsApi ): @@ -744,6 +811,20 @@ def _create_resources(yamls, namespace: str, api_instance: client.CustomObjectsA ) +def _apply_ray_cluster( + yamls, namespace: str, api_instance: client.CustomObjectsApi, force=False +): + api_instance.server_side_apply( + field_manager=CF_SDK_FIELD_MANAGER, + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + body=yamls, + force_conflicts=force, # Allow forcing conflicts if needed + ) + + def _check_aw_exists(name: str, namespace: str) -> bool: try: config_check() diff --git a/src/codeflare_sdk/ray/cluster/test_cluster.py b/src/codeflare_sdk/ray/cluster/test_cluster.py index 5e83c82a8..298c416ed 100644 --- a/src/codeflare_sdk/ray/cluster/test_cluster.py +++ b/src/codeflare_sdk/ray/cluster/test_cluster.py @@ -19,16 +19,18 @@ list_all_queued, ) from codeflare_sdk.common.utils.unit_test_support import ( - createClusterWithConfig, + create_cluster, arg_check_del_effect, ingress_retrieval, arg_check_apply_effect, get_local_queue, - createClusterConfig, + create_cluster_config, get_ray_obj, get_obj_none, get_ray_obj_with_status, get_aw_obj_with_status, + patch_cluster_with_dynamic_client, + route_list_retrieval, ) from codeflare_sdk.ray.cluster.cluster import _is_openshift_cluster from pathlib import Path @@ -67,11 +69,189 @@ def test_cluster_up_down(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = cluster = createClusterWithConfig(mocker) + cluster = create_cluster(mocker) cluster.up() cluster.down() +def test_cluster_apply_scale_up_scale_down(mocker): + mocker.patch("kubernetes.client.ApisApi.get_api_versions") + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mock_dynamic_client = mocker.Mock() + mocker.patch( + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", + return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + ) + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", + return_value={"spec": {"domain": "apps.cluster.awsroute.org"}}, + ) + + # Initialize test + initial_num_workers = 1 + scaled_up_num_workers = 2 + + # Step 1: Create cluster with initial workers + cluster = create_cluster(mocker, initial_num_workers) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), + ) + cluster.apply() + + # Step 2: Scale up the cluster + cluster = create_cluster(mocker, scaled_up_num_workers) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + cluster.apply() + + # Step 3: Scale down the cluster + cluster = create_cluster(mocker, initial_num_workers) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + cluster.apply() + + # Tear down + cluster.down() + + +def test_cluster_apply_with_file(mocker): + mocker.patch("kubernetes.client.ApisApi.get_api_versions") + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mock_dynamic_client = mocker.Mock() + mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") + mocker.patch( + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", + return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + ) + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", + return_value={"spec": {"domain": "apps.cluster.awsroute.org"}}, + ) + + # Step 1: Create cluster with initial workers + cluster = create_cluster(mocker, 1, write_to_file=True) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), + ) + cluster.apply() + # Tear down + cluster.down() + + +def test_cluster_apply_with_appwrapper(mocker): + # Mock Kubernetes client and dynamic client methods + mocker.patch("kubernetes.client.ApisApi.get_api_versions") + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "codeflare_sdk.ray.cluster.cluster._check_aw_exists", + return_value=True, + ) + mock_dynamic_client = mocker.Mock() + mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") + mocker.patch( + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", + return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + ) + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + + # Create a cluster configuration with appwrapper set to False + cluster = create_cluster(mocker, 1, write_to_file=False) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + + # Mock listing RayCluster to simulate it doesn't exist + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), + ) + # Call the apply method + cluster.apply() + + # Assertions + print("Cluster applied without AppWrapper.") + + +def test_cluster_apply_without_appwrapper_write_to_file(mocker): + # Mock Kubernetes client and dynamic client methods + mocker.patch("kubernetes.client.ApisApi.get_api_versions") + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "codeflare_sdk.ray.cluster.cluster._check_aw_exists", + return_value=True, + ) + mock_dynamic_client = mocker.Mock() + mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") + mocker.patch( + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", + return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + ) + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + + # Create a cluster configuration with appwrapper set to False + cluster = create_cluster(mocker, 1, write_to_file=True) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + cluster.config.appwrapper = False + + # Mock listing RayCluster to simulate it doesn't exist + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), + ) + # Call the apply method + cluster.apply() + + # Assertions + print("Cluster applied without AppWrapper.") + + +def test_cluster_apply_without_appwrapper(mocker): + # Mock Kubernetes client and dynamic client methods + mocker.patch("kubernetes.client.ApisApi.get_api_versions") + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mock_dynamic_client = mocker.Mock() + mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") + mocker.patch( + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", + return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + ) + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + + # Create a cluster configuration with appwrapper set to False + cluster = create_cluster(mocker, 1, write_to_file=False) + cluster.config.appwrapper = None + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + + # Mock listing RayCluster to simulate it doesn't exist + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value=get_obj_none("ray.io", "v1", "ns", "rayclusters"), + ) + + # Call the apply method + cluster.apply() + + # Assertions + print("Cluster applied without AppWrapper.") + + def test_cluster_up_down_no_mcad(mocker): mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") @@ -98,7 +278,7 @@ def test_cluster_up_down_no_mcad(mocker): "kubernetes.client.CustomObjectsApi.list_cluster_custom_object", return_value={"items": []}, ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-cluster-ray" config.appwrapper = False cluster = Cluster(config) @@ -117,7 +297,7 @@ def test_cluster_uris(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = cluster = createClusterWithConfig(mocker) + cluster = create_cluster(mocker) mocker.patch( "kubernetes.client.NetworkingV1Api.list_namespaced_ingress", return_value=ingress_retrieval( @@ -147,6 +327,52 @@ def test_cluster_uris(mocker): == "Dashboard not available yet, have you run cluster.up()?" ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster._is_openshift_cluster", return_value=True + ) + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value={ + "items": [ + { + "metadata": { + "name": "ray-dashboard-unit-test-cluster", + }, + "spec": { + "host": "ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org", + "tls": {}, # Indicating HTTPS + }, + } + ] + }, + ) + cluster = create_cluster(mocker) + assert ( + cluster.cluster_dashboard_uri() + == "http://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org" + ) + mocker.patch( + "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", + return_value={ + "items": [ + { + "metadata": { + "name": "ray-dashboard-unit-test-cluster", + }, + "spec": { + "host": "ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org", + "tls": {"termination": "passthrough"}, # Indicating HTTPS + }, + } + ] + }, + ) + cluster = create_cluster(mocker) + assert ( + cluster.cluster_dashboard_uri() + == "https://ray-dashboard-unit-test-cluster-ns.apps.cluster.awsroute.org" + ) + def test_ray_job_wrapping(mocker): import ray @@ -159,7 +385,7 @@ def ray_addr(self, *args): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = cluster = createClusterWithConfig(mocker) + cluster = create_cluster(mocker) mocker.patch( "ray.job_submission.JobSubmissionClient._check_connection_and_version_with_url", return_value="None", diff --git a/src/codeflare_sdk/ray/cluster/test_config.py b/src/codeflare_sdk/ray/cluster/test_config.py index 5302e0eb3..34cc4237b 100644 --- a/src/codeflare_sdk/ray/cluster/test_config.py +++ b/src/codeflare_sdk/ray/cluster/test_config.py @@ -14,8 +14,8 @@ from codeflare_sdk.common.utils.unit_test_support import ( apply_template, - createClusterWrongType, get_example_extended_storage_opts, + create_cluster_wrong_type, create_cluster_all_config_params, get_template_variables, ) @@ -59,6 +59,7 @@ def test_default_appwrapper_creation(mocker): assert cluster.resource_yaml == expected_aw +@pytest.mark.filterwarnings("ignore::UserWarning") def test_config_creation_all_parameters(mocker): from codeflare_sdk.ray.cluster.config import DEFAULT_RESOURCE_MAPPING @@ -110,6 +111,7 @@ def test_config_creation_all_parameters(mocker): ) +@pytest.mark.filterwarnings("ignore::UserWarning") def test_all_config_params_aw(mocker): create_cluster_all_config_params(mocker, "aw-all-params", True) assert filecmp.cmp( @@ -121,11 +123,12 @@ def test_all_config_params_aw(mocker): def test_config_creation_wrong_type(): with pytest.raises(TypeError) as error_info: - createClusterWrongType() + create_cluster_wrong_type() assert len(str(error_info.value).splitlines()) == 4 +@pytest.mark.filterwarnings("ignore::UserWarning") def test_cluster_config_deprecation_conversion(mocker): config = ClusterConfiguration( name="test", diff --git a/tests/e2e/cluster_apply_kind_test.py b/tests/e2e/cluster_apply_kind_test.py new file mode 100644 index 000000000..398bf73b1 --- /dev/null +++ b/tests/e2e/cluster_apply_kind_test.py @@ -0,0 +1,156 @@ +from codeflare_sdk import Cluster, ClusterConfiguration +import pytest +from kubernetes import client + +from support import ( + initialize_kubernetes_client, + create_namespace, + delete_namespace, + get_ray_cluster, +) + + +@pytest.mark.kind +class TestRayClusterApply: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + + def test_cluster_apply(self): + self.setup_method() + create_namespace(self) + + cluster_name = "test-cluster-apply" + namespace = self.namespace + + # Initial configuration with 1 worker + initial_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="1", + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Create the cluster + cluster = Cluster(initial_config) + cluster.apply() + + # Wait for the cluster to be ready + cluster.wait_ready() + status = cluster.status() + assert status["ready"], f"Cluster {cluster_name} is not ready: {status}" + + # Verify the cluster is created + ray_cluster = get_ray_cluster(cluster_name, namespace) + assert ray_cluster is not None, "Cluster was not created successfully" + assert ( + ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] == 1 + ), "Initial worker count does not match" + + # Update configuration with 3 workers + updated_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=2, + head_cpu_requests="500m", + head_cpu_limits="1", + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Apply the updated configuration + cluster.config = updated_config + cluster.apply() + + # Wait for the updated cluster to be ready + cluster.wait_ready() + updated_status = cluster.status() + assert updated_status[ + "ready" + ], f"Cluster {cluster_name} is not ready after update: {updated_status}" + + # Verify the cluster is updated + updated_ray_cluster = get_ray_cluster(cluster_name, namespace) + assert ( + updated_ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] == 2 + ), "Worker count was not updated" + + # Clean up + cluster.down() + ray_cluster = get_ray_cluster(cluster_name, namespace) + assert ray_cluster is None, "Cluster was not deleted successfully" + + def test_apply_invalid_update(self): + self.setup_method() + create_namespace(self) + + cluster_name = "test-cluster-apply-invalid" + namespace = self.namespace + + # Initial configuration + initial_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="1", + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Create the cluster + cluster = Cluster(initial_config) + cluster.apply() + + # Wait for the cluster to be ready + cluster.wait_ready() + status = cluster.status() + assert status["ready"], f"Cluster {cluster_name} is not ready: {status}" + + # Update with an invalid configuration (e.g., immutable field change) + invalid_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=2, + head_cpu_requests="1", + head_cpu_limits="2", # Changing CPU limits (immutable) + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Try to apply the invalid configuration and expect failure + cluster.config = invalid_config + with pytest.raises(RuntimeError, match="Immutable fields detected"): + cluster.apply() + + # Clean up + cluster.down() diff --git a/tests/e2e/support.py b/tests/e2e/support.py index 2ff33e911..5e4ddbdf2 100644 --- a/tests/e2e/support.py +++ b/tests/e2e/support.py @@ -11,6 +11,22 @@ ) +def get_ray_cluster(cluster_name, namespace): + api = client.CustomObjectsApi() + try: + return api.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + name=cluster_name, + ) + except client.exceptions.ApiException as e: + if e.status == 404: + return None + raise + + def get_ray_image(): default_ray_image = "quay.io/modh/ray@sha256:0d715f92570a2997381b7cafc0e224cfa25323f18b9545acfd23bc2b71576d06" return os.getenv("RAY_IMAGE", default_ray_image)