Skip to content

Commit

Permalink
[CLI] Improve ray status for placement groups (ray-project#18289)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkooo567 authored Sep 14, 2021
1 parent 344f2d9 commit 31e1638
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 41 deletions.
101 changes: 75 additions & 26 deletions python/ray/autoscaler/_private/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,59 +418,81 @@ def parse_placement_group_resource_str(
placement_group_resource_str: str) -> Tuple[str, Optional[str]]:
"""Parse placement group resource in the form of following 3 cases:
{resource_name}_group_{bundle_id}_{group_name};
-> This case is ignored as it is duplicated to the case below.
{resource_name}_group_{group_name};
{resource_name}
Returns:
Tuple of (resource_name, placement_group_name). placement_group_name
could be None if its not a placement group resource.
Tuple of (resource_name, placement_group_name, is_countable_resource).
placement_group_name could be None if its not a placement group
resource. is_countable_resource is True if the resource
doesn't contain bundle index. We shouldn't count resources
with bundle index because it will
have duplicated resource information as
wildcard resources (resource name without bundle index).
"""
result = PLACEMENT_GROUP_RESOURCE_BUNDLED_PATTERN.match(
placement_group_resource_str)
if result:
return (result.group(1), result.group(3))
return (result.group(1), result.group(3), False)
result = PLACEMENT_GROUP_RESOURCE_PATTERN.match(
placement_group_resource_str)
if result:
return (result.group(1), result.group(2))
return (placement_group_resource_str, None)
return (result.group(1), result.group(2), True)
return (placement_group_resource_str, None, True)


def get_usage_report(lm_summary: LoadMetricsSummary) -> str:
# first collect resources used in placement groups
placement_group_resource_usage = collections.defaultdict(float)
placement_group_resource_usage = {}
placement_group_resource_total = collections.defaultdict(float)
for resource, (used, total) in lm_summary.usage.items():
(pg_resource_name,
pg_name) = parse_placement_group_resource_str(resource)
(pg_resource_name, pg_name,
is_countable) = parse_placement_group_resource_str(resource)
if pg_name:
placement_group_resource_usage[pg_resource_name] += used
if pg_resource_name not in placement_group_resource_usage:
placement_group_resource_usage[pg_resource_name] = 0
if is_countable:
placement_group_resource_usage[pg_resource_name] += used
placement_group_resource_total[pg_resource_name] += total
continue

usage_lines = []
for resource, (used, total) in sorted(lm_summary.usage.items()):
if "node:" in resource:
continue # Skip the auto-added per-node "node:<ip>" resource.

(_, pg_name) = parse_placement_group_resource_str(resource)
(_, pg_name, _) = parse_placement_group_resource_str(resource)
if pg_name:
continue # Skip resource used by placement groups

used_in_pg = placement_group_resource_usage[resource]

line = f" {used}/{total} {resource}"
if used_in_pg != 0:
line = line + f" ({used_in_pg} reserved in placement groups)"
pg_used = 0
pg_total = 0
used_in_pg = resource in placement_group_resource_usage
if used_in_pg:
pg_used = placement_group_resource_usage[resource]
pg_total = placement_group_resource_total[resource]
# Used includes pg_total because when pgs are created
# it allocates resources.
# To get the real resource usage, we should subtract the pg
# reserved resources from the usage and add pg used instead.
used = used - pg_total + pg_used

if resource in ["memory", "object_store_memory"]:
to_GiB = 1 / 2**30
used *= to_GiB
total *= to_GiB
used_in_pg *= to_GiB
line = f" {used:.2f}/{total:.3f} GiB {resource}"
if used_in_pg != 0:
line = line + f" ({used_in_pg:.2f} GiB reserved" \
+ " in placement groups)"
usage_lines.append(line)
line = (f" {(used * to_GiB):.2f}/"
f"{(total * to_GiB):.3f} GiB {resource}")
if used_in_pg:
line = line + (f" ({(pg_used * to_GiB):.2f} used of "
f"{(pg_total * to_GiB):.2f} GiB " +
"reserved in placement groups)")
usage_lines.append(line)
else:
line = f" {used}/{total} {resource}"
if used_in_pg:
line += (f" ({pg_used} used of "
f"{pg_total} reserved in placement groups)")
usage_lines.append(line)
usage_report = "\n".join(usage_lines)
return usage_report

Expand All @@ -488,8 +510,8 @@ def filter_placement_group_from_bundle(bundle: ResourceBundle):
using_placement_group = False
result_bundle = dict()
for pg_resource_str, resource_count in bundle.items():
(resource_name,
pg_name) = parse_placement_group_resource_str(pg_resource_str)
(resource_name, pg_name,
_) = parse_placement_group_resource_str(pg_resource_str)
result_bundle[resource_name] = resource_count
if pg_name:
using_placement_group = True
Expand Down Expand Up @@ -600,6 +622,32 @@ def format_info_string(lm_summary, autoscaler_summary, time=None):
return formatted_output


def format_no_node_type_string(node_type: dict):
placement_group_resource_usage = {}
regular_resource_usage = collections.defaultdict(float)
for resource, total in node_type.items():
(pg_resource_name, pg_name,
is_countable) = parse_placement_group_resource_str(resource)
if pg_name:
if not is_countable:
continue
if pg_resource_name not in placement_group_resource_usage:
placement_group_resource_usage[pg_resource_name] = 0
placement_group_resource_usage[pg_resource_name] += total
else:
regular_resource_usage[resource] += total

output_lines = [""]
for resource, total in regular_resource_usage.items():
output_line = f"{resource}: {total}"
if resource in placement_group_resource_usage:
pg_resource = placement_group_resource_usage[resource]
output_line += f" ({pg_resource} reserved in placement groups)"
output_lines.append(output_line)

return "\n ".join(output_lines)


def format_info_string_no_node_types(lm_summary, time=None):
if time is None:
time = datetime.now()
Expand All @@ -608,7 +656,8 @@ def format_info_string_no_node_types(lm_summary, time=None):

node_lines = []
for node_type, count in lm_summary.node_types:
line = f" {count} node(s) with resources: {node_type}"
line = (f" {count} node(s) with resources:"
f"{format_no_node_type_string(node_type)}")
node_lines.append(line)
node_report = "\n".join(node_lines)

Expand Down
3 changes: 1 addition & 2 deletions python/ray/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,8 @@ def _check_output_via_pattern(name, result):
expected_lines = _load_output_pattern(name)

if result.exception is not None:
print(result.output)
raise result.exception from None

print(result.output)
expected = r" *\n".join(expected_lines) + "\n?"
if re.fullmatch(expected, result.output) is None:
_debug_check_line_by_line(result, expected_lines)
Expand Down
6 changes: 5 additions & 1 deletion python/ray/tests/test_cli_patterns/test_ray_status.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
======== Cluster status: .+
Node status
------------------------------------------------------------
1 node\(s\) with resources: .+
1 node\(s\) with resources:
.+
.+
.+
.+

Resources
------------------------------------------------------------
Expand Down
39 changes: 39 additions & 0 deletions python/ray/tests/test_placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -1980,5 +1980,44 @@ def is_usage_updated():
assert demand_output["demand"] == "(no resource demands)"


def test_placement_group_status(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)

@ray.remote(num_cpus=1)
class A:
def ready(self):
pass

pg = ray.util.placement_group([{"CPU": 1}])
ray.get(pg.ready())

# Wait until the usage is updated, which is
# when the demand is also updated.
def is_usage_updated():
demand_output = get_ray_status_output(cluster.address)
return demand_output["usage"] != ""

wait_for_condition(is_usage_updated)
demand_output = get_ray_status_output(cluster.address)
cpu_usage = demand_output["usage"].split("\n")[0]
expected = "0.0/4.0 CPU (0.0 used of 1.0 reserved in placement groups)"
assert cpu_usage == expected

# 2 CPU + 1 PG CPU == 3.0/4.0 CPU (1 used by pg)
actors = [A.remote() for _ in range(2)]
actors_in_pg = [A.options(placement_group=pg).remote() for _ in range(1)]

ray.get([actor.ready.remote() for actor in actors])
ray.get([actor.ready.remote() for actor in actors_in_pg])
# Wait long enough until the usage is propagated to GCS.
time.sleep(5)
demand_output = get_ray_status_output(cluster.address)
cpu_usage = demand_output["usage"].split("\n")[0]
expected = "3.0/4.0 CPU (1.0 used of 1.0 reserved in placement groups)"
assert cpu_usage == expected


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
35 changes: 23 additions & 12 deletions python/ray/tests/test_resource_demand_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2391,7 +2391,7 @@ def test_info_string():
lm_summary = LoadMetricsSummary(
head_ip="0.0.0.0",
usage={
"CPU": (530, 544),
"CPU": (530.0, 544.0),
"GPU": (2, 2),
"AcceleratorType:V100": (0, 2),
"memory": (2 * 2**30, 2**33),
Expand Down Expand Up @@ -2439,7 +2439,7 @@ def test_info_string():
Usage:
0/2 AcceleratorType:V100
530/544 CPU
530.0/544.0 CPU
2/2 GPU
2.00/8.000 GiB memory
3.14/16.000 GiB object_store_memory
Expand All @@ -2461,7 +2461,7 @@ def test_info_string_failed_node_cap():
lm_summary = LoadMetricsSummary(
head_ip="0.0.0.0",
usage={
"CPU": (530, 544),
"CPU": (530.0, 544.0),
"GPU": (2, 2),
"AcceleratorType:V100": (0, 2),
"memory": (2 * 2**30, 2**33),
Expand Down Expand Up @@ -2532,7 +2532,7 @@ def test_info_string_failed_node_cap():
Usage:
0/2 AcceleratorType:V100
530/544 CPU (2.0 reserved in placement groups)
530.0/544.0 CPU (2.0 used of 2.0 reserved in placement groups)
2/2 GPU
2.00/8.000 GiB memory
3.14/16.000 GiB object_store_memory
Expand All @@ -2556,13 +2556,16 @@ def test_info_string_no_node_type():
lm_summary = LoadMetricsSummary(
head_ip="0.0.0.0",
usage={
"CPU": (530, 544),
"CPU": (530.0, 544.0),
"GPU": (2, 2),
"AcceleratorType:V100": (0, 2),
"memory": (2 * 2**30, 2**33),
"memory": (6 * 2**30, 2**33),
"object_store_memory": (3.14 * 2**30, 2**34),
"CPU_group_4a82a217aadd8326a3a49f02700ac5c2": (2.0, 2.0),
"memory_group_4a82a217aadd8326a3a49f02700ac5c2": (2**32, 2.0)
"CPU_group_4a82a217aadd8326a3a49f02700ac5c2": (1.0, 2.0),
"CPU_group_1_4a82a217aadd8326a3a49f02700ac5c2": (0.0, 1.0),
"CPU_group_2_4a82a217aadd8326a3a49f02700ac5c2": (1.0, 1.0),
"memory_group_4a82a217aadd8326a3a49f02700ac5c2": (2**32, 2**32),
"memory_group_0_4a82a217aadd8326a3a49f02700ac5c2": (2**32, 2**32)
},
resource_demand=[({
"GPU": 0.5,
Expand All @@ -2585,22 +2588,30 @@ def test_info_string_no_node_type():
"CPU": 16
}, 100)],
node_types=[({
"CPU": 16
"CPU": 16,
"CPU_group_4a82a217aadd8326a3a49f02700ac5c2": 2.0,
"CPU_group_1_4a82a217aadd8326a3a49f02700ac5c2": 1.0,
"CPU_group_2_4a82a217aadd8326a3a49f02700ac5c2": 1.0,
"memory": 2**33,
"memory_group_4a82a217aadd8326a3a49f02700ac5c2": 4 * 2**30,
"memory_group_0_4a82a217aadd8326a3a49f02700ac5c2": 4 * 2**30,
}, 1)])

expected = """
======== Cluster status: 2020-12-28 01:02:03 ========
Node status
-----------------------------------------------------
1 node(s) with resources: {'CPU': 16}
1 node(s) with resources:
CPU: 16.0 (2.0 reserved in placement groups)
memory: 8589934592.0 (4294967296 reserved in placement groups)
Resources
-----------------------------------------------------
Usage:
0/2 AcceleratorType:V100
530/544 CPU (2.0 reserved in placement groups)
529.0/544.0 CPU (1.0 used of 2.0 reserved in placement groups)
2/2 GPU
2.00/8.000 GiB memory (4.00 GiB reserved in placement groups)
6.00/8.000 GiB memory (4.00 used of 4.00 GiB reserved in placement groups)
3.14/16.000 GiB object_store_memory
Demands:
Expand Down

0 comments on commit 31e1638

Please sign in to comment.