Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create ray cluster with ssa #778

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pytest -v src/codeflare_sdk

### Local e2e Testing

- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/e2e.md)
- Please follow the [e2e documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/sphinx/user-docs/e2e.rst)

#### Code Coverage

Expand Down
6 changes: 6 additions & 0 deletions docs/sphinx/user-docs/ray-cluster-interaction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ cluster.up()

| The ``cluster.up()`` function creates a Ray Cluster in the given namespace.

cluster.apply()
------------

| The ``cluster.apply()`` function applies a Ray Cluster in the given namespace. If the cluster already exists, it is updated.
| If it does not exist it is created.

cluster.down()
--------------

Expand Down
133 changes: 125 additions & 8 deletions src/codeflare_sdk/common/kueue/test_kueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ..utils.unit_test_support import (
apply_template,
get_local_queue,
createClusterConfig,
create_cluster_config,
get_template_variables,
apply_template,
)
from unittest.mock import patch
from codeflare_sdk.ray.cluster.cluster import Cluster, ClusterConfiguration
import yaml
import os
import filecmp
from pathlib import Path
from .kueue import list_local_queues
from .kueue import list_local_queues, local_queue_exists, add_queue_label

parent = Path(__file__).resolve().parents[4] # project directory
aw_dir = os.path.expanduser("~/.codeflare/resources/")
Expand Down Expand Up @@ -51,7 +51,7 @@ def test_cluster_creation_no_aw_local_queue(mocker):
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)
config = createClusterConfig()
config = create_cluster_config()
config.name = "unit-test-cluster-kueue"
config.write_to_file = True
config.local_queue = "local-queue-default"
Expand All @@ -67,7 +67,7 @@ def test_cluster_creation_no_aw_local_queue(mocker):
assert cluster_kueue == expected_rc

# With resources loaded in memory, no Local Queue specified.
config = createClusterConfig()
config = create_cluster_config()
config.name = "unit-test-cluster-kueue"
config.write_to_file = False
cluster = Cluster(config)
Expand All @@ -84,7 +84,7 @@ def test_aw_creation_local_queue(mocker):
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)
config = createClusterConfig()
config = create_cluster_config()
config.name = "unit-test-aw-kueue"
config.appwrapper = True
config.write_to_file = True
Expand All @@ -101,7 +101,7 @@ def test_aw_creation_local_queue(mocker):
assert aw_kueue == expected_rc

# With resources loaded in memory, no Local Queue specified.
config = createClusterConfig()
config = create_cluster_config()
config.name = "unit-test-aw-kueue"
config.appwrapper = True
config.write_to_file = False
Expand All @@ -120,7 +120,7 @@ def test_get_local_queue_exists_fail(mocker):
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)
config = createClusterConfig()
config = create_cluster_config()
config.name = "unit-test-aw-kueue"
config.appwrapper = True
config.write_to_file = True
Expand Down Expand Up @@ -175,6 +175,123 @@ def test_list_local_queues(mocker):
assert lqs == []


def test_local_queue_exists_found(mocker):
# Mock Kubernetes client and list_namespaced_custom_object method
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
mock_api_instance = mocker.Mock()
mocker.patch("kubernetes.client.CustomObjectsApi", return_value=mock_api_instance)
mocker.patch("codeflare_sdk.ray.cluster.cluster.config_check")

# Mock return value for list_namespaced_custom_object
mock_api_instance.list_namespaced_custom_object.return_value = {
"items": [
{"metadata": {"name": "existing-queue"}},
{"metadata": {"name": "another-queue"}},
]
}

# Call the function
namespace = "test-namespace"
local_queue_name = "existing-queue"
result = local_queue_exists(namespace, local_queue_name)

# Assertions
assert result is True
mock_api_instance.list_namespaced_custom_object.assert_called_once_with(
group="kueue.x-k8s.io",
version="v1beta1",
namespace=namespace,
plural="localqueues",
)


def test_local_queue_exists_not_found(mocker):
# Mock Kubernetes client and list_namespaced_custom_object method
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
mock_api_instance = mocker.Mock()
mocker.patch("kubernetes.client.CustomObjectsApi", return_value=mock_api_instance)
mocker.patch("codeflare_sdk.ray.cluster.cluster.config_check")

# Mock return value for list_namespaced_custom_object
mock_api_instance.list_namespaced_custom_object.return_value = {
"items": [
{"metadata": {"name": "another-queue"}},
{"metadata": {"name": "different-queue"}},
]
}

# Call the function
namespace = "test-namespace"
local_queue_name = "non-existent-queue"
result = local_queue_exists(namespace, local_queue_name)

# Assertions
assert result is False
mock_api_instance.list_namespaced_custom_object.assert_called_once_with(
group="kueue.x-k8s.io",
version="v1beta1",
namespace=namespace,
plural="localqueues",
)


import pytest
from unittest import mock # If you're also using mocker from pytest-mock


def test_add_queue_label_with_valid_local_queue(mocker):
# Mock the kubernetes.client.CustomObjectsApi and its response
mock_api_instance = mocker.patch("kubernetes.client.CustomObjectsApi")
mock_api_instance.return_value.list_namespaced_custom_object.return_value = {
"items": [
{"metadata": {"name": "valid-queue"}},
]
}

# Mock other dependencies
mocker.patch("codeflare_sdk.common.kueue.local_queue_exists", return_value=True)
mocker.patch(
"codeflare_sdk.common.kueue.get_default_kueue_name",
return_value="default-queue",
)

# Define input item and parameters
item = {"metadata": {}}
namespace = "test-namespace"
local_queue = "valid-queue"

# Call the function
add_queue_label(item, namespace, local_queue)

# Assert that the label is added to the item
assert item["metadata"]["labels"] == {"kueue.x-k8s.io/queue-name": "valid-queue"}


def test_add_queue_label_with_invalid_local_queue(mocker):
# Mock the kubernetes.client.CustomObjectsApi and its response
mock_api_instance = mocker.patch("kubernetes.client.CustomObjectsApi")
mock_api_instance.return_value.list_namespaced_custom_object.return_value = {
"items": [
{"metadata": {"name": "valid-queue"}},
]
}

# Mock the local_queue_exists function to return False
mocker.patch("codeflare_sdk.common.kueue.local_queue_exists", return_value=False)

# Define input item and parameters
item = {"metadata": {}}
namespace = "test-namespace"
local_queue = "invalid-queue"

# Call the function and expect a ValueError
with pytest.raises(
ValueError,
match="local_queue provided does not exist or is not in this namespace",
):
add_queue_label(item, namespace, local_queue)


# Make sure to always keep this function last
def test_cleanup():
os.remove(f"{aw_dir}unit-test-cluster-kueue.yaml")
Expand Down
66 changes: 55 additions & 11 deletions src/codeflare_sdk/common/utils/unit_test_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,34 @@
aw_dir = os.path.expanduser("~/.codeflare/resources/")


def createClusterConfig():
def create_cluster_config(num_workers=2, write_to_file=False):
config = ClusterConfiguration(
name="unit-test-cluster",
namespace="ns",
num_workers=2,
num_workers=num_workers,
worker_cpu_requests=3,
worker_cpu_limits=4,
worker_memory_requests=5,
worker_memory_limits=6,
appwrapper=True,
write_to_file=False,
write_to_file=write_to_file,
)
return config


def createClusterWithConfig(mocker):
mocker.patch("kubernetes.config.load_kube_config", return_value="ignore")
mocker.patch(
"kubernetes.client.CustomObjectsApi.get_cluster_custom_object",
return_value={"spec": {"domain": "apps.cluster.awsroute.org"}},
)
cluster = Cluster(createClusterConfig())
def create_cluster(mocker, num_workers=2, write_to_file=False):
cluster = Cluster(create_cluster_config(num_workers, write_to_file))
return cluster


def createClusterWrongType():
def patch_cluster_with_dynamic_client(mocker, cluster, dynamic_client=None):
mocker.patch.object(cluster, "get_dynamic_client", return_value=dynamic_client)
mocker.patch.object(cluster, "down", return_value=None)
mocker.patch.object(cluster, "config_check", return_value=None)
# mocker.patch.object(cluster, "_throw_for_no_raycluster", return_value=None)


def create_cluster_wrong_type():
config = ClusterConfiguration(
name="unit-test-cluster",
namespace="ns",
Expand Down Expand Up @@ -412,6 +414,48 @@ def mocked_ingress(port, cluster_name="unit-test-cluster", annotations: dict = N
return mock_ingress


# Global dictionary to maintain state in the mock
cluster_state = {}


# The mock side_effect function for server_side_apply
def mock_server_side_apply(resource, body=None, name=None, namespace=None, **kwargs):
# Simulate the behavior of server_side_apply:
# Update a mock state that represents the cluster's current configuration.
# Stores the state in a global dictionary for simplicity.

global cluster_state

if not resource or not body or not name or not namespace:
raise ValueError("Missing required parameters for server_side_apply")

# Extract worker count from the body if it exists
try:
worker_count = (
body["spec"]["workerGroupSpecs"][0]["replicas"]
if "spec" in body and "workerGroupSpecs" in body["spec"]
else None
)
except KeyError:
worker_count = None

# Apply changes to the cluster_state mock
cluster_state[name] = {
"namespace": namespace,
"worker_count": worker_count,
"body": body,
}

# Return a response that mimics the behavior of a successful apply
return {
"status": "success",
"applied": True,
"name": name,
"namespace": namespace,
"worker_count": worker_count,
}


@patch.dict("os.environ", {"NB_PREFIX": "test-prefix"})
def create_cluster_all_config_params(mocker, cluster_name, is_appwrapper) -> Cluster:
mocker.patch(
Expand Down
4 changes: 2 additions & 2 deletions src/codeflare_sdk/common/widgets/test_widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import codeflare_sdk.common.widgets.widgets as cf_widgets
import pandas as pd
from unittest.mock import MagicMock, patch
from ..utils.unit_test_support import get_local_queue, createClusterConfig
from ..utils.unit_test_support import get_local_queue, create_cluster_config
from codeflare_sdk.ray.cluster.cluster import Cluster
from codeflare_sdk.ray.cluster.status import (
RayCluster,
Expand All @@ -38,7 +38,7 @@ def test_cluster_up_down_buttons(mocker):
"kubernetes.client.CustomObjectsApi.list_namespaced_custom_object",
return_value=get_local_queue("kueue.x-k8s.io", "v1beta1", "ns", "localqueues"),
)
cluster = Cluster(createClusterConfig())
cluster = Cluster(create_cluster_config())

with patch("ipywidgets.Button") as MockButton, patch(
"ipywidgets.Checkbox"
Expand Down
Loading
Loading