From 9d8295df837d90821831c426ae58e893728d1063 Mon Sep 17 00:00:00 2001 From: Akram Ben Aissi Date: Fri, 6 Dec 2024 06:57:39 +0100 Subject: [PATCH] Adds RayCluster.apply() - Adds RayCluster.apply() implementation - Adds e2e tests for apply - Adds unit tests for apply --- CONTRIBUTING.md | 2 +- src/codeflare_sdk/common/kueue/test_kueue.py | 12 +- .../common/utils/unit_test_support.py | 63 +++++-- .../common/widgets/test_widgets.py | 4 +- src/codeflare_sdk/ray/cluster/cluster.py | 60 ++++++- src/codeflare_sdk/ray/cluster/test_cluster.py | 52 +++++- src/codeflare_sdk/ray/cluster/test_config.py | 7 +- tests/e2e/cluster_apply_kind_test.py | 156 ++++++++++++++++++ tests/e2e/support.py | 16 ++ 9 files changed, 342 insertions(+), 30 deletions(-) create mode 100644 tests/e2e/cluster_apply_kind_test.py diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 884632da6..820ad9139 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -76,7 +76,7 @@ pytest -v src/codeflare_sdk ### Local e2e Testing -- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/e2e.md) +- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/sphinx/user-docs/e2e.rst) #### Code Coverage diff --git a/src/codeflare_sdk/common/kueue/test_kueue.py b/src/codeflare_sdk/common/kueue/test_kueue.py index 77095d4d9..19593dd17 100644 --- a/src/codeflare_sdk/common/kueue/test_kueue.py +++ b/src/codeflare_sdk/common/kueue/test_kueue.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ..utils.unit_test_support import get_local_queue, createClusterConfig +from ..utils.unit_test_support import get_local_queue, create_cluster_config from unittest.mock import patch from codeflare_sdk.ray.cluster.cluster import Cluster, ClusterConfiguration import yaml @@ -46,7 +46,7 @@ def test_cluster_creation_no_aw_local_queue(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-cluster-kueue" config.write_to_file = True config.local_queue = "local-queue-default" @@ -59,7 +59,7 @@ def test_cluster_creation_no_aw_local_queue(mocker): ) # With resources loaded in memory, no Local Queue specified. - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-cluster-kueue" config.write_to_file = False cluster = Cluster(config) @@ -79,7 +79,7 @@ def test_aw_creation_local_queue(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-aw-kueue" config.appwrapper = True config.write_to_file = True @@ -93,7 +93,7 @@ def test_aw_creation_local_queue(mocker): ) # With resources loaded in memory, no Local Queue specified. - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-aw-kueue" config.appwrapper = True config.write_to_file = False @@ -114,7 +114,7 @@ def test_get_local_queue_exists_fail(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-aw-kueue" config.appwrapper = True config.write_to_file = True diff --git a/src/codeflare_sdk/common/utils/unit_test_support.py b/src/codeflare_sdk/common/utils/unit_test_support.py index 8e034378f..256a76665 100644 --- a/src/codeflare_sdk/common/utils/unit_test_support.py +++ b/src/codeflare_sdk/common/utils/unit_test_support.py @@ -26,11 +26,11 @@ aw_dir = os.path.expanduser("~/.codeflare/resources/") -def createClusterConfig(): +def create_cluster_config(num_workers=2): config = ClusterConfiguration( name="unit-test-cluster", namespace="ns", - num_workers=2, + num_workers=num_workers, worker_cpu_requests=3, worker_cpu_limits=4, worker_memory_requests=5, @@ -41,17 +41,18 @@ def createClusterConfig(): return config -def createClusterWithConfig(mocker): - mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") - mocker.patch( - "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", - return_value={"spec": {"domain": "apps.cluster.awsroute.org"}}, - ) - cluster = Cluster(createClusterConfig()) +def create_cluster(mocker, num_workers=2): + cluster = Cluster(create_cluster_config(num_workers)) return cluster -def createClusterWrongType(): +def patch_cluster_with_dynamic_client(mocker, cluster, dynamic_client=None): + mocker.patch.object(cluster, "get_dynamic_client", return_value=dynamic_client) + mocker.patch.object(cluster, "down", return_value=None) + mocker.patch.object(cluster, "config_check", return_value=None) + + +def create_cluster_wrong_type(): config = ClusterConfiguration( name="unit-test-cluster", namespace="ns", @@ -383,6 +384,48 @@ def mocked_ingress(port, cluster_name="unit-test-cluster", annotations: dict = N return mock_ingress +# Global dictionary to maintain state in the mock +cluster_state = {} + + +# The mock side_effect function for server_side_apply +def mock_server_side_apply(resource, body=None, name=None, namespace=None, **kwargs): + # Simulate the behavior of server_side_apply: + # Update a mock state that represents the cluster's current configuration. + # Stores the state in a global dictionary for simplicity. + + global cluster_state + + if not resource or not body or not name or not namespace: + raise ValueError("Missing required parameters for server_side_apply") + + # Extract worker count from the body if it exists + try: + worker_count = ( + body["spec"]["workerGroupSpecs"][0]["replicas"] + if "spec" in body and "workerGroupSpecs" in body["spec"] + else None + ) + except KeyError: + worker_count = None + + # Apply changes to the cluster_state mock + cluster_state[name] = { + "namespace": namespace, + "worker_count": worker_count, + "body": body, + } + + # Return a response that mimics the behavior of a successful apply + return { + "status": "success", + "applied": True, + "name": name, + "namespace": namespace, + "worker_count": worker_count, + } + + @patch.dict("os.environ", {"NB_PREFIX": "test-prefix"}) def create_cluster_all_config_params(mocker, cluster_name, is_appwrapper) -> Cluster: mocker.patch( diff --git a/src/codeflare_sdk/common/widgets/test_widgets.py b/src/codeflare_sdk/common/widgets/test_widgets.py index 12c238544..a7d3de923 100644 --- a/src/codeflare_sdk/common/widgets/test_widgets.py +++ b/src/codeflare_sdk/common/widgets/test_widgets.py @@ -15,7 +15,7 @@ import codeflare_sdk.common.widgets.widgets as cf_widgets import pandas as pd from unittest.mock import MagicMock, patch -from ..utils.unit_test_support import get_local_queue, createClusterConfig +from ..utils.unit_test_support import get_local_queue, create_cluster_config from codeflare_sdk.ray.cluster.cluster import Cluster from codeflare_sdk.ray.cluster.status import ( RayCluster, @@ -38,7 +38,7 @@ def test_cluster_up_down_buttons(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = Cluster(createClusterConfig()) + cluster = Cluster(create_cluster_config()) with patch("ipywidgets.Button") as MockButton, patch( "ipywidgets.Checkbox" diff --git a/src/codeflare_sdk/ray/cluster/cluster.py b/src/codeflare_sdk/ray/cluster/cluster.py index a3f345547..d1c3a118d 100644 --- a/src/codeflare_sdk/ray/cluster/cluster.py +++ b/src/codeflare_sdk/ray/cluster/cluster.py @@ -52,6 +52,10 @@ import requests from kubernetes import config +from kubernetes.dynamic import DynamicClient +from kubernetes import client as k8s_client +from kubernetes.client.rest import ApiException + from kubernetes.client.rest import ApiException import warnings @@ -84,6 +88,14 @@ def __init__(self, config: ClusterConfiguration): if is_notebook(): cluster_up_down_buttons(self) + def get_dynamic_client(self): + """Return a dynamic client, optionally mocked in tests.""" + return DynamicClient(get_api_client()) + + def config_check(self): + """Return a dynamic client, optionally mocked in tests.""" + return config_check() + @property def _client_headers(self): k8_client = get_api_client() @@ -139,10 +151,8 @@ def up(self): Applies the Cluster yaml, pushing the resource request onto the Kueue localqueue. """ - # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError self._throw_for_no_raycluster() - namespace = self.config.namespace try: @@ -176,6 +186,50 @@ def up(self): except Exception as e: # pragma: no cover return _kube_api_error_handling(e) + def apply(self, force=False): + """ + Applies the Cluster yaml using server-side apply. + If 'force' is set to True, conflicts will be forced. + """ + # check if RayCluster CustomResourceDefinition exists if not throw RuntimeError + self._throw_for_no_raycluster() + namespace = self.config.namespace + # Ensure Kubernetes configuration is loaded + try: + self.config_check() + # Get the RayCluster custom resource definition + api = self.get_dynamic_client().resources.get( + api_version="ray.io/v1", kind="RayCluster" + ) + except Exception as e: + raise RuntimeError("Failed to get RayCluster resource: " + str(e)) + + # Read the YAML file and parse it into a dictionary + try: + with open(self.resource_yaml, "r") as f: + resource_body = yaml.safe_load(f) + except FileNotFoundError: + raise RuntimeError(f"Resource YAML file '{self.resource_yaml}' not found.") + except yaml.YAMLError as e: + raise ValueError(f"Failed to parse resource YAML: {e}") + + # Extract the resource name from the metadata + resource_name = resource_body.get("metadata", {}).get("name") + if not resource_name: + raise ValueError("The resource must have a 'metadata.name' field.") + try: + # Use server-side apply + resp = api.server_side_apply( + body=resource_body, + name=resource_name, + namespace=self.config.namespace, + field_manager="cluster-manager", + force_conflicts=force, # Allow forcing conflicts if needed + ) + print(f"Cluster '{self.config.name}' applied successfully.") + except ApiException as e: + raise RuntimeError(f"Failed to apply cluster: {e.reason}") + def _throw_for_no_raycluster(self): api_instance = client.CustomObjectsApi(get_api_client()) try: @@ -204,7 +258,7 @@ def down(self): resource_name = self.config.name self._throw_for_no_raycluster() try: - config_check() + self.config_check() api_instance = client.CustomObjectsApi(get_api_client()) if self.config.appwrapper: api_instance.delete_namespaced_custom_object( diff --git a/src/codeflare_sdk/ray/cluster/test_cluster.py b/src/codeflare_sdk/ray/cluster/test_cluster.py index 5e83c82a8..05e6faee4 100644 --- a/src/codeflare_sdk/ray/cluster/test_cluster.py +++ b/src/codeflare_sdk/ray/cluster/test_cluster.py @@ -19,16 +19,17 @@ list_all_queued, ) from codeflare_sdk.common.utils.unit_test_support import ( - createClusterWithConfig, + create_cluster, arg_check_del_effect, ingress_retrieval, arg_check_apply_effect, get_local_queue, - createClusterConfig, + create_cluster_config, get_ray_obj, get_obj_none, get_ray_obj_with_status, get_aw_obj_with_status, + patch_cluster_with_dynamic_client, ) from codeflare_sdk.ray.cluster.cluster import _is_openshift_cluster from pathlib import Path @@ -67,11 +68,50 @@ def test_cluster_up_down(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = cluster = createClusterWithConfig(mocker) + cluster = create_cluster(mocker) cluster.up() cluster.down() +def test_cluster_apply_scale_up_scale_down(mocker): + mock_dynamic_client = mocker.Mock() + mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") + mocker.patch( + "kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock + ) + mocker.patch( + "codeflare_sdk.ray.cluster.cluster.Cluster.create_resource", + return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml", + ) + mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") + mocker.patch( + "kubernetes.client.CustomObjectsApi.get_cluster_custom_object", + return_value={"spec": {"domain": "apps.cluster.awsroute.org"}}, + ) + + # Initialize test + initial_num_workers = 1 + scaled_up_num_workers = 2 + + # Step 1: Create cluster with initial workers + cluster = create_cluster(mocker, initial_num_workers) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + cluster.apply() + + # Step 2: Scale up the cluster + cluster = create_cluster(mocker, scaled_up_num_workers) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + cluster.apply() + + # Step 3: Scale down the cluster + cluster = create_cluster(mocker, initial_num_workers) + patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client) + cluster.apply() + + # Tear down + cluster.down() + + def test_cluster_up_down_no_mcad(mocker): mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster") mocker.patch("kubernetes.config.load_kube_config", return_value="ignore") @@ -98,7 +138,7 @@ def test_cluster_up_down_no_mcad(mocker): "kubernetes.client.CustomObjectsApi.list_cluster_custom_object", return_value={"items": []}, ) - config = createClusterConfig() + config = create_cluster_config() config.name = "unit-test-cluster-ray" config.appwrapper = False cluster = Cluster(config) @@ -117,7 +157,7 @@ def test_cluster_uris(mocker): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = cluster = createClusterWithConfig(mocker) + cluster = create_cluster(mocker) mocker.patch( "kubernetes.client.NetworkingV1Api.list_namespaced_ingress", return_value=ingress_retrieval( @@ -159,7 +199,7 @@ def ray_addr(self, *args): "kubernetes.client.CustomObjectsApi.list_namespaced_custom_object", return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"), ) - cluster = cluster = createClusterWithConfig(mocker) + cluster = create_cluster(mocker) mocker.patch( "ray.job_submission.JobSubmissionClient._check_connection_and_version_with_url", return_value="None", diff --git a/src/codeflare_sdk/ray/cluster/test_config.py b/src/codeflare_sdk/ray/cluster/test_config.py index 3416fc28c..2d3aa7796 100644 --- a/src/codeflare_sdk/ray/cluster/test_config.py +++ b/src/codeflare_sdk/ray/cluster/test_config.py @@ -13,7 +13,7 @@ # limitations under the License. from codeflare_sdk.common.utils.unit_test_support import ( - createClusterWrongType, + create_cluster_wrong_type, get_local_queue, create_cluster_all_config_params, ) @@ -55,6 +55,7 @@ def test_default_appwrapper_creation(mocker): assert cluster.resource_yaml == expected_aw +@pytest.mark.filterwarnings("ignore::UserWarning") def test_config_creation_all_parameters(mocker): from codeflare_sdk.ray.cluster.config import DEFAULT_RESOURCE_MAPPING @@ -98,6 +99,7 @@ def test_config_creation_all_parameters(mocker): ) +@pytest.mark.filterwarnings("ignore::UserWarning") def test_all_config_params_aw(mocker): create_cluster_all_config_params(mocker, "aw-all-params", True) assert filecmp.cmp( @@ -109,11 +111,12 @@ def test_all_config_params_aw(mocker): def test_config_creation_wrong_type(): with pytest.raises(TypeError) as error_info: - createClusterWrongType() + create_cluster_wrong_type() assert len(str(error_info.value).splitlines()) == 4 +@pytest.mark.filterwarnings("ignore::UserWarning") def test_cluster_config_deprecation_conversion(mocker): config = ClusterConfiguration( name="test", diff --git a/tests/e2e/cluster_apply_kind_test.py b/tests/e2e/cluster_apply_kind_test.py new file mode 100644 index 000000000..398bf73b1 --- /dev/null +++ b/tests/e2e/cluster_apply_kind_test.py @@ -0,0 +1,156 @@ +from codeflare_sdk import Cluster, ClusterConfiguration +import pytest +from kubernetes import client + +from support import ( + initialize_kubernetes_client, + create_namespace, + delete_namespace, + get_ray_cluster, +) + + +@pytest.mark.kind +class TestRayClusterApply: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + + def test_cluster_apply(self): + self.setup_method() + create_namespace(self) + + cluster_name = "test-cluster-apply" + namespace = self.namespace + + # Initial configuration with 1 worker + initial_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="1", + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Create the cluster + cluster = Cluster(initial_config) + cluster.apply() + + # Wait for the cluster to be ready + cluster.wait_ready() + status = cluster.status() + assert status["ready"], f"Cluster {cluster_name} is not ready: {status}" + + # Verify the cluster is created + ray_cluster = get_ray_cluster(cluster_name, namespace) + assert ray_cluster is not None, "Cluster was not created successfully" + assert ( + ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] == 1 + ), "Initial worker count does not match" + + # Update configuration with 3 workers + updated_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=2, + head_cpu_requests="500m", + head_cpu_limits="1", + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Apply the updated configuration + cluster.config = updated_config + cluster.apply() + + # Wait for the updated cluster to be ready + cluster.wait_ready() + updated_status = cluster.status() + assert updated_status[ + "ready" + ], f"Cluster {cluster_name} is not ready after update: {updated_status}" + + # Verify the cluster is updated + updated_ray_cluster = get_ray_cluster(cluster_name, namespace) + assert ( + updated_ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] == 2 + ), "Worker count was not updated" + + # Clean up + cluster.down() + ray_cluster = get_ray_cluster(cluster_name, namespace) + assert ray_cluster is None, "Cluster was not deleted successfully" + + def test_apply_invalid_update(self): + self.setup_method() + create_namespace(self) + + cluster_name = "test-cluster-apply-invalid" + namespace = self.namespace + + # Initial configuration + initial_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="1", + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Create the cluster + cluster = Cluster(initial_config) + cluster.apply() + + # Wait for the cluster to be ready + cluster.wait_ready() + status = cluster.status() + assert status["ready"], f"Cluster {cluster_name} is not ready: {status}" + + # Update with an invalid configuration (e.g., immutable field change) + invalid_config = ClusterConfiguration( + name=cluster_name, + namespace=namespace, + num_workers=2, + head_cpu_requests="1", + head_cpu_limits="2", # Changing CPU limits (immutable) + head_memory_requests="1Gi", + head_memory_limits="2Gi", + worker_cpu_requests="500m", + worker_cpu_limits="1", + worker_memory_requests="1Gi", + worker_memory_limits="2Gi", + write_to_file=True, + verify_tls=False, + ) + + # Try to apply the invalid configuration and expect failure + cluster.config = invalid_config + with pytest.raises(RuntimeError, match="Immutable fields detected"): + cluster.apply() + + # Clean up + cluster.down() diff --git a/tests/e2e/support.py b/tests/e2e/support.py index 2ff33e911..5e4ddbdf2 100644 --- a/tests/e2e/support.py +++ b/tests/e2e/support.py @@ -11,6 +11,22 @@ ) +def get_ray_cluster(cluster_name, namespace): + api = client.CustomObjectsApi() + try: + return api.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=namespace, + plural="rayclusters", + name=cluster_name, + ) + except client.exceptions.ApiException as e: + if e.status == 404: + return None + raise + + def get_ray_image(): default_ray_image = "quay.io/modh/ray@sha256:0d715f92570a2997381b7cafc0e224cfa25323f18b9545acfd23bc2b71576d06" return os.getenv("RAY_IMAGE", default_ray_image)