Skip to content

Commit

Permalink
Adds RayCluster.apply()
Browse files Browse the repository at this point in the history
- Adds  RayCluster.apply() implementation
- Adds e2e tests for apply
- Adds unit tests for apply
  • Loading branch information
akram committed Dec 11, 2024
1 parent be9763a commit 9d8295d
Show file tree
Hide file tree
Showing 9 changed files with 342 additions and 30 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pytest -v src/codeflare_sdk

### Local e2e Testing

- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/e2e.md)
- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/sphinx/user-docs/e2e.rst)

#### Code Coverage

Expand Down
12 changes: 6 additions & 6 deletions src/codeflare_sdk/common/kueue/test_kueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..utils.unit_test_support import get_local_queue, createClusterConfig
from ..utils.unit_test_support import get_local_queue, create_cluster_config
from unittest.mock import patch
from codeflare_sdk.ray.cluster.cluster import Cluster, ClusterConfiguration
import yaml
Expand Down Expand Up @@ -46,7 +46,7 @@ def test_cluster_creation_no_aw_local_queue(mocker):
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)
config = createClusterConfig()
config = create_cluster_config()
config.name = "unit-test-cluster-kueue"
config.write_to_file = True
config.local_queue = "local-queue-default"
Expand All @@ -59,7 +59,7 @@ def test_cluster_creation_no_aw_local_queue(mocker):
)

# With resources loaded in memory, no Local Queue specified.
config = createClusterConfig()
config = create_cluster_config()
config.name = "unit-test-cluster-kueue"
config.write_to_file = False
cluster = Cluster(config)
Expand All @@ -79,7 +79,7 @@ def test_aw_creation_local_queue(mocker):
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)
config = createClusterConfig()
config = create_cluster_config()
config.name = "unit-test-aw-kueue"
config.appwrapper = True
config.write_to_file = True
Expand All @@ -93,7 +93,7 @@ def test_aw_creation_local_queue(mocker):
)

# With resources loaded in memory, no Local Queue specified.
config = createClusterConfig()
config = create_cluster_config()
config.name = "unit-test-aw-kueue"
config.appwrapper = True
config.write_to_file = False
Expand All @@ -114,7 +114,7 @@ def test_get_local_queue_exists_fail(mocker):
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)
config = createClusterConfig()
config = create_cluster_config()
config.name = "unit-test-aw-kueue"
config.appwrapper = True
config.write_to_file = True
Expand Down
63 changes: 53 additions & 10 deletions src/codeflare_sdk/common/utils/unit_test_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
aw_dir = os.path.expanduser("~/.codeflare/resources/")


def createClusterConfig():
def create_cluster_config(num_workers=2):
config = ClusterConfiguration(
name="unit-test-cluster",
namespace="ns",
num_workers=2,
num_workers=num_workers,
worker_cpu_requests=3,
worker_cpu_limits=4,
worker_memory_requests=5,
Expand All @@ -41,17 +41,18 @@ def createClusterConfig():
return config


def createClusterWithConfig(mocker):
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
mocker.patch(
"kubernetes.client.CustomObjectsApi.get_cluster_custom_object",
return_value={"spec": {"domain": "apps.cluster.awsroute.org"}},
)
cluster = Cluster(createClusterConfig())
def create_cluster(mocker, num_workers=2):
cluster = Cluster(create_cluster_config(num_workers))
return cluster


def createClusterWrongType():
def patch_cluster_with_dynamic_client(mocker, cluster, dynamic_client=None):
mocker.patch.object(cluster, "get_dynamic_client", return_value=dynamic_client)
mocker.patch.object(cluster, "down", return_value=None)
mocker.patch.object(cluster, "config_check", return_value=None)


def create_cluster_wrong_type():
config = ClusterConfiguration(
name="unit-test-cluster",
namespace="ns",
Expand Down Expand Up @@ -383,6 +384,48 @@ def mocked_ingress(port, cluster_name="unit-test-cluster", annotations: dict = N
return mock_ingress


# Global dictionary to maintain state in the mock
cluster_state = {}


# The mock side_effect function for server_side_apply
def mock_server_side_apply(resource, body=None, name=None, namespace=None, **kwargs):
# Simulate the behavior of server_side_apply:
# Update a mock state that represents the cluster's current configuration.
# Stores the state in a global dictionary for simplicity.

global cluster_state

if not resource or not body or not name or not namespace:
raise ValueError("Missing required parameters for server_side_apply")

# Extract worker count from the body if it exists
try:
worker_count = (
body["spec"]["workerGroupSpecs"][0]["replicas"]
if "spec" in body and "workerGroupSpecs" in body["spec"]
else None
)
except KeyError:
worker_count = None

# Apply changes to the cluster_state mock
cluster_state[name] = {
"namespace": namespace,
"worker_count": worker_count,
"body": body,
}

# Return a response that mimics the behavior of a successful apply
return {
"status": "success",
"applied": True,
"name": name,
"namespace": namespace,
"worker_count": worker_count,
}


@patch.dict("os.environ", {"NB_PREFIX": "test-prefix"})
def create_cluster_all_config_params(mocker, cluster_name, is_appwrapper) -> Cluster:
mocker.patch(
Expand Down
4 changes: 2 additions & 2 deletions src/codeflare_sdk/common/widgets/test_widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import codeflare_sdk.common.widgets.widgets as cf_widgets
import pandas as pd
from unittest.mock import MagicMock, patch
from ..utils.unit_test_support import get_local_queue, createClusterConfig
from ..utils.unit_test_support import get_local_queue, create_cluster_config
from codeflare_sdk.ray.cluster.cluster import Cluster
from codeflare_sdk.ray.cluster.status import (
RayCluster,
Expand All @@ -38,7 +38,7 @@ def test_cluster_up_down_buttons(mocker):
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)
cluster = Cluster(createClusterConfig())
cluster = Cluster(create_cluster_config())

with patch("ipywidgets.Button") as MockButton, patch(
"ipywidgets.Checkbox"
Expand Down
60 changes: 57 additions & 3 deletions src/codeflare_sdk/ray/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
import requests

from kubernetes import config
from kubernetes.dynamic import DynamicClient
from kubernetes import client as k8s_client
from kubernetes.client.rest import ApiException

from kubernetes.client.rest import ApiException
import warnings

Expand Down Expand Up @@ -84,6 +88,14 @@ def __init__(self, config: ClusterConfiguration):
if is_notebook():
cluster_up_down_buttons(self)

def get_dynamic_client(self):
"""Return a dynamic client, optionally mocked in tests."""
return DynamicClient(get_api_client())

def config_check(self):
"""Return a dynamic client, optionally mocked in tests."""
return config_check()

@property
def _client_headers(self):
k8_client = get_api_client()
Expand Down Expand Up @@ -139,10 +151,8 @@ def up(self):
Applies the Cluster yaml, pushing the resource request onto
the Kueue localqueue.
"""

# check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
self._throw_for_no_raycluster()

namespace = self.config.namespace

try:
Expand Down Expand Up @@ -176,6 +186,50 @@ def up(self):
except Exception as e: # pragma: no cover
return _kube_api_error_handling(e)

def apply(self, force=False):
"""
Applies the Cluster yaml using server-side apply.
If 'force' is set to True, conflicts will be forced.
"""
# check if RayCluster CustomResourceDefinition exists if not throw RuntimeError
self._throw_for_no_raycluster()
namespace = self.config.namespace
# Ensure Kubernetes configuration is loaded
try:
self.config_check()
# Get the RayCluster custom resource definition
api = self.get_dynamic_client().resources.get(
api_version="ray.io/v1", kind="RayCluster"
)
except Exception as e:
raise RuntimeError("Failed to get RayCluster resource: " + str(e))

# Read the YAML file and parse it into a dictionary
try:
with open(self.resource_yaml, "r") as f:
resource_body = yaml.safe_load(f)
except FileNotFoundError:
raise RuntimeError(f"Resource YAML file '{self.resource_yaml}' not found.")
except yaml.YAMLError as e:
raise ValueError(f"Failed to parse resource YAML: {e}")

# Extract the resource name from the metadata
resource_name = resource_body.get("metadata", {}).get("name")
if not resource_name:
raise ValueError("The resource must have a 'metadata.name' field.")
try:
# Use server-side apply
resp = api.server_side_apply(
body=resource_body,
name=resource_name,
namespace=self.config.namespace,
field_manager="cluster-manager",
force_conflicts=force, # Allow forcing conflicts if needed
)
print(f"Cluster '{self.config.name}' applied successfully.")
except ApiException as e:
raise RuntimeError(f"Failed to apply cluster: {e.reason}")

def _throw_for_no_raycluster(self):
api_instance = client.CustomObjectsApi(get_api_client())
try:
Expand Down Expand Up @@ -204,7 +258,7 @@ def down(self):
resource_name = self.config.name
self._throw_for_no_raycluster()
try:
config_check()
self.config_check()
api_instance = client.CustomObjectsApi(get_api_client())
if self.config.appwrapper:
api_instance.delete_namespaced_custom_object(
Expand Down
52 changes: 46 additions & 6 deletions src/codeflare_sdk/ray/cluster/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
list_all_queued,
)
from codeflare_sdk.common.utils.unit_test_support import (
createClusterWithConfig,
create_cluster,
arg_check_del_effect,
ingress_retrieval,
arg_check_apply_effect,
get_local_queue,
createClusterConfig,
create_cluster_config,
get_ray_obj,
get_obj_none,
get_ray_obj_with_status,
get_aw_obj_with_status,
patch_cluster_with_dynamic_client,
)
from codeflare_sdk.ray.cluster.cluster import _is_openshift_cluster
from pathlib import Path
Expand Down Expand Up @@ -67,11 +68,50 @@ def test_cluster_up_down(mocker):
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)
cluster = cluster = createClusterWithConfig(mocker)
cluster = create_cluster(mocker)
cluster.up()
cluster.down()


def test_cluster_apply_scale_up_scale_down(mocker):
mock_dynamic_client = mocker.Mock()
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster")
mocker.patch(
"kubernetes.dynamic.DynamicClient.resources", new_callable=mocker.PropertyMock
)
mocker.patch(
"codeflare_sdk.ray.cluster.cluster.Cluster.create_resource",
return_value="./tests/test_cluster_yamls/ray/default-ray-cluster.yaml",
)
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
mocker.patch(
"kubernetes.client.CustomObjectsApi.get_cluster_custom_object",
return_value={"spec": {"domain": "apps.cluster.awsroute.org"}},
)

# Initialize test
initial_num_workers = 1
scaled_up_num_workers = 2

# Step 1: Create cluster with initial workers
cluster = create_cluster(mocker, initial_num_workers)
patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client)
cluster.apply()

# Step 2: Scale up the cluster
cluster = create_cluster(mocker, scaled_up_num_workers)
patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client)
cluster.apply()

# Step 3: Scale down the cluster
cluster = create_cluster(mocker, initial_num_workers)
patch_cluster_with_dynamic_client(mocker, cluster, mock_dynamic_client)
cluster.apply()

# Tear down
cluster.down()


def test_cluster_up_down_no_mcad(mocker):
mocker.patch("codeflare_sdk.ray.cluster.cluster.Cluster._throw_for_no_raycluster")
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
Expand All @@ -98,7 +138,7 @@ def test_cluster_up_down_no_mcad(mocker):
"kubernetes.client.CustomObjectsApi.list_cluster_custom_object",
return_value={"items": []},
)
config = createClusterConfig()
config = create_cluster_config()
config.name = "unit-test-cluster-ray"
config.appwrapper = False
cluster = Cluster(config)
Expand All @@ -117,7 +157,7 @@ def test_cluster_uris(mocker):
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)
cluster = cluster = createClusterWithConfig(mocker)
cluster = create_cluster(mocker)
mocker.patch(
"kubernetes.client.NetworkingV1Api.list_namespaced_ingress",
return_value=ingress_retrieval(
Expand Down Expand Up @@ -159,7 +199,7 @@ def ray_addr(self, *args):
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)
cluster = cluster = createClusterWithConfig(mocker)
cluster = create_cluster(mocker)
mocker.patch(
"ray.job_submission.JobSubmissionClient._check_connection_and_version_with_url",
return_value="None",
Expand Down
Loading

0 comments on commit 9d8295d

Please sign in to comment.