Skip to content

Commit ebb742f

Browse files
committed
[Observability] Emit metric ClustermgtdHeartbeat to signal clustermgtd heartbeat.
1 parent cf1b339 commit ebb742f

5 files changed

Lines changed: 296 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ This file is used to list changes made in each version of the aws-parallelcluste
88

99
**CHANGES**
1010
- Direct users to slurm_resume log to see EC2 error codes if no instances are launched.
11+
- Emit metric `ClustermgtdHeartbeat` to signal clustermgtd heartbeat.
1112

1213
3.14.1
1314
------
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# A copy of the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
10+
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import logging
14+
from datetime import datetime, timezone
15+
from typing import Dict, List, Optional
16+
17+
import boto3
18+
from botocore.config import Config
19+
20+
logger = logging.getLogger(__name__)
21+
22+
METRICS_NAMESPACE = "ParallelCluster"
23+
METRICS_DIMENSION_CLUSTER_NAME = "ClusterName"
24+
METRICS_DIMENSION_INSTANCE_ID = "InstanceId"
25+
26+
27+
class CloudWatchMetricsPublisher:
28+
"""Class for publishing metrics to CloudWatch."""
29+
30+
def __init__(self, region: str, cluster_name: str, instance_id: str, boto3_config: Config):
31+
"""
32+
Initialize CloudWatchMetricsPublisher.
33+
34+
Args:
35+
region: AWS region
36+
cluster_name: Name of the ParallelCluster cluster
37+
instance_id: EC2 instance ID to include in metric dimensions
38+
boto3_config: Boto3 configuration for retries and proxies
39+
"""
40+
self._region = region
41+
self._cluster_name = cluster_name
42+
self._instance_id = instance_id
43+
self._boto3_config = boto3_config
44+
self._cloudwatch_client = None
45+
46+
@property
47+
def cloudwatch_client(self):
48+
"""Lazy initialization of CloudWatch client."""
49+
if self._cloudwatch_client is None:
50+
self._cloudwatch_client = boto3.client("cloudwatch", region_name=self._region, config=self._boto3_config)
51+
return self._cloudwatch_client
52+
53+
def put_metric(
54+
self,
55+
metric_name: str,
56+
value: float,
57+
unit: str = "Count",
58+
additional_dimensions: Optional[List[Dict[str, str]]] = None,
59+
):
60+
"""
61+
Publish a metric to CloudWatch.
62+
63+
Automatically sets timestamp and includes ClusterName as a dimension.
64+
65+
Args:
66+
metric_name: Name of the metric to publish
67+
value: Metric value
68+
unit: CloudWatch unit (default: "Count")
69+
additional_dimensions: Optional list of additional dimensions [{"Name": "...", "Value": "..."}]
70+
"""
71+
dimensions = [
72+
{"Name": METRICS_DIMENSION_CLUSTER_NAME, "Value": self._cluster_name},
73+
{"Name": METRICS_DIMENSION_INSTANCE_ID, "Value": self._instance_id},
74+
]
75+
if additional_dimensions:
76+
dimensions.extend(additional_dimensions)
77+
78+
try:
79+
self.cloudwatch_client.put_metric_data(
80+
Namespace=METRICS_NAMESPACE,
81+
MetricData=[
82+
{
83+
"MetricName": metric_name,
84+
"Dimensions": dimensions,
85+
"Timestamp": datetime.now(tz=timezone.utc),
86+
"Value": value,
87+
"Unit": unit,
88+
}
89+
],
90+
)
91+
logger.debug("Published metric %s with value %s", metric_name, value)
92+
except Exception as e:
93+
logger.error("Failed to publish metric %s: %s", metric_name, e)

src/slurm_plugin/clustermgtd.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from common.utils import check_command_output, read_json, sleep_remaining_loop_time, time_is_up, wait_remaining_time
4242
from retrying import retry
4343
from slurm_plugin.capacity_block_manager import CapacityBlockManager
44+
from slurm_plugin.cloudwatch_utils import CloudWatchMetricsPublisher
4445
from slurm_plugin.cluster_event_publisher import ClusterEventPublisher
4546
from slurm_plugin.common import TIMESTAMP_FORMAT, ScalingStrategy, log_exception, print_with_count
4647
from slurm_plugin.console_logger import ConsoleLogger
@@ -60,6 +61,7 @@
6061
LOOP_TIME = 60
6162
CONSOLE_OUTPUT_WAIT_TIME = 5 * 60
6263
MAXIMUM_TASK_BACKLOG = 100
64+
CW_METRICS_HEARTBEAT = "ClustermgtdHeartbeat"
6365
log = logging.getLogger(__name__)
6466
compute_logger = log.getChild("console_output")
6567
event_logger = log.getChild("events")
@@ -401,6 +403,7 @@ def __init__(self, config):
401403
self._event_publisher = None
402404
self._partition_nodelist_mapping_instance = None
403405
self._capacity_block_manager = None
406+
self._metrics_publisher = None
404407
self.set_config(config)
405408

406409
def set_config(self, config: ClustermgtdConfig):
@@ -426,6 +429,7 @@ def set_config(self, config: ClustermgtdConfig):
426429
self._instance_manager = self._initialize_instance_manager(config)
427430
self._console_logger = self._initialize_console_logger(config)
428431
self._capacity_block_manager = self._initialize_capacity_block_manager(config)
432+
self._metrics_publisher = self._initialize_metrics_publisher(config)
429433

430434
def shutdown(self):
431435
if self._task_executor:
@@ -480,6 +484,16 @@ def _initialize_capacity_block_manager(config):
480484
region=config.region, fleet_config=config.fleet_config, boto3_config=config.boto3_config
481485
)
482486

487+
@staticmethod
488+
def _initialize_metrics_publisher(config):
489+
"""Initialize CloudWatch metrics publisher."""
490+
return CloudWatchMetricsPublisher(
491+
region=config.region,
492+
cluster_name=config.cluster_name,
493+
instance_id=config.head_node_instance_id,
494+
boto3_config=config.boto3_config,
495+
)
496+
483497
def _update_compute_fleet_status(self, status):
484498
log.info("Updating compute fleet status from %s to %s", self._compute_fleet_status, status)
485499
self._compute_fleet_status_manager.update_status(status)
@@ -574,6 +588,9 @@ def manage_cluster(self):
574588
# Write clustermgtd heartbeat to file
575589
self._write_timestamp_to_file()
576590

591+
# Publish heartbeat metric to CloudWatch
592+
self._metrics_publisher.put_metric(metric_name=CW_METRICS_HEARTBEAT, value=1)
593+
577594
def _write_timestamp_to_file(self):
578595
"""Write timestamp into shared file so compute nodes can determine if head node is online."""
579596
# Make clustermgtd heartbeat readable to all users
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
# Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
4+
# the License. A copy of the License is located at
5+
#
6+
# http://aws.amazon.com/apache2.0/
7+
#
8+
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
9+
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
12+
import logging
13+
from datetime import datetime, timezone
14+
from unittest.mock import MagicMock
15+
16+
import pytest
17+
from assertpy import assert_that
18+
from botocore.config import Config
19+
from botocore.exceptions import ClientError
20+
from slurm_plugin.cloudwatch_utils import METRICS_NAMESPACE, CloudWatchMetricsPublisher
21+
22+
23+
class TestCloudWatchMetricsPublisher:
24+
"""Tests for CloudWatchMetricsPublisher class."""
25+
26+
@pytest.fixture
27+
def boto3_config(self):
28+
return Config(retries={"max_attempts": 1, "mode": "standard"})
29+
30+
@pytest.fixture
31+
def metrics_publisher(self, boto3_config):
32+
return CloudWatchMetricsPublisher(
33+
region="us-east-1",
34+
cluster_name="test-cluster",
35+
instance_id="i-1234567890abcdef0",
36+
boto3_config=boto3_config,
37+
)
38+
39+
def test_init(self, metrics_publisher, boto3_config):
40+
"""Test CloudWatchMetricsPublisher initialization."""
41+
assert_that(metrics_publisher._region).is_equal_to("us-east-1")
42+
assert_that(metrics_publisher._cluster_name).is_equal_to("test-cluster")
43+
assert_that(metrics_publisher._boto3_config).is_equal_to(boto3_config)
44+
assert_that(metrics_publisher._instance_id).is_equal_to("i-1234567890abcdef0")
45+
assert_that(metrics_publisher._cloudwatch_client).is_none()
46+
47+
def test_cloudwatch_client_lazy_initialization(self, metrics_publisher, mocker):
48+
"""Test that CloudWatch client is lazily initialized."""
49+
mock_client = MagicMock()
50+
mock_boto3 = mocker.patch("slurm_plugin.cloudwatch_utils.boto3")
51+
mock_boto3.client.return_value = mock_client
52+
53+
# First access should create the client
54+
client = metrics_publisher.cloudwatch_client
55+
assert_that(client).is_equal_to(mock_client)
56+
mock_boto3.client.assert_called_once_with(
57+
"cloudwatch",
58+
region_name="us-east-1",
59+
config=metrics_publisher._boto3_config,
60+
)
61+
62+
# Second access should return the cached client
63+
mock_boto3.client.reset_mock()
64+
client2 = metrics_publisher.cloudwatch_client
65+
assert_that(client2).is_equal_to(mock_client)
66+
mock_boto3.client.assert_not_called()
67+
68+
@pytest.mark.parametrize(
69+
"metric_name, value, unit, additional_dimensions, expected_dimensions",
70+
[
71+
pytest.param(
72+
"TestMetric",
73+
42,
74+
"Count",
75+
None,
76+
[
77+
{"Name": "ClusterName", "Value": "test-cluster"},
78+
{"Name": "InstanceId", "Value": "i-1234567890abcdef0"},
79+
],
80+
id="basic",
81+
),
82+
pytest.param(
83+
"HeadNodeDaemonHeartbeat",
84+
1,
85+
"Count",
86+
[{"Name": "DaemonName", "Value": "clustermgtd"}],
87+
[
88+
{"Name": "ClusterName", "Value": "test-cluster"},
89+
{"Name": "InstanceId", "Value": "i-1234567890abcdef0"},
90+
{"Name": "DaemonName", "Value": "clustermgtd"},
91+
],
92+
id="with_additional_dimension",
93+
),
94+
pytest.param(
95+
"LatencyMetric",
96+
150.5,
97+
"Milliseconds",
98+
None,
99+
[
100+
{"Name": "ClusterName", "Value": "test-cluster"},
101+
{"Name": "InstanceId", "Value": "i-1234567890abcdef0"},
102+
],
103+
id="with_custom_unit",
104+
),
105+
pytest.param(
106+
"CustomMetric",
107+
100,
108+
"Count",
109+
[
110+
{"Name": "DaemonName", "Value": "clustermgtd"},
111+
{"Name": "NodeType", "Value": "HeadNode"},
112+
],
113+
[
114+
{"Name": "ClusterName", "Value": "test-cluster"},
115+
{"Name": "InstanceId", "Value": "i-1234567890abcdef0"},
116+
{"Name": "DaemonName", "Value": "clustermgtd"},
117+
{"Name": "NodeType", "Value": "HeadNode"},
118+
],
119+
id="with_multiple_additional_dimensions",
120+
),
121+
],
122+
)
123+
def test_put_metric(
124+
self,
125+
metrics_publisher,
126+
mocker,
127+
metric_name: str,
128+
value: float,
129+
unit: str,
130+
additional_dimensions: list,
131+
expected_dimensions: list,
132+
):
133+
"""Test put_metric with various parameter combinations."""
134+
mock_client = MagicMock()
135+
metrics_publisher._cloudwatch_client = mock_client
136+
mock_datetime = mocker.patch("slurm_plugin.cloudwatch_utils.datetime")
137+
fixed_time = datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
138+
mock_datetime.now.return_value = fixed_time
139+
140+
metrics_publisher.put_metric(
141+
metric_name=metric_name,
142+
value=value,
143+
unit=unit,
144+
additional_dimensions=additional_dimensions,
145+
)
146+
147+
mock_client.put_metric_data.assert_called_once_with(
148+
Namespace=METRICS_NAMESPACE,
149+
MetricData=[
150+
{
151+
"MetricName": metric_name,
152+
"Dimensions": expected_dimensions,
153+
"Timestamp": fixed_time,
154+
"Value": value,
155+
"Unit": unit,
156+
}
157+
],
158+
)
159+
160+
def test_put_metric_handles_exception(self, metrics_publisher, caplog):
161+
"""Test that put_metric handles exceptions gracefully."""
162+
mock_client = MagicMock()
163+
mock_client.put_metric_data.side_effect = ClientError(
164+
{"Error": {"Code": "WHATEVER_CODE", "Message": "WHATEVER_MESSAGE"}},
165+
"PutMetricData",
166+
)
167+
metrics_publisher._cloudwatch_client = mock_client
168+
169+
with caplog.at_level(logging.WARNING):
170+
# Should not raise exception
171+
metrics_publisher.put_metric(metric_name="WHATEVER_METRIC_NAME", value=1)
172+
173+
assert_that(caplog.text).matches(
174+
r"Failed to publish metric WHATEVER_METRIC_NAME:.*WHATEVER_CODE.*WHATEVER_MESSAGE"
175+
)

tests/slurm_plugin/test_clustermgtd.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1643,7 +1643,14 @@ def test_manage_cluster(
16431643
fleet_config={},
16441644
)
16451645
mocker.patch("time.sleep")
1646+
cloudwatch_metrics_publisher_mock = mocker.patch("slurm_plugin.clustermgtd.CloudWatchMetricsPublisher")
16461647
cluster_manager = ClusterManager(mock_sync_config)
1648+
cloudwatch_metrics_publisher_mock.assert_called_once_with(
1649+
region="us-east-2",
1650+
cluster_name="hit-test",
1651+
instance_id="i-instance-id",
1652+
boto3_config=mock_sync_config.boto3_config,
1653+
)
16471654
cluster_manager._current_time = "current_time"
16481655
cluster_manager._static_nodes_in_replacement = {}
16491656
# Set up function mocks
@@ -1670,6 +1677,7 @@ def test_manage_cluster(
16701677
get_ec2_instances_mock = mocker.patch.object(
16711678
ClusterManager, "_get_ec2_instances", autospec=True, return_value=mock_cluster_instances
16721679
)
1680+
metrics_publisher_mock = cloudwatch_metrics_publisher_mock.return_value
16731681
get_node_info_with_retry_mock = mocker.patch.object(
16741682
ClusterManager,
16751683
"_get_node_info_with_retry",
@@ -1683,6 +1691,7 @@ def test_manage_cluster(
16831691
# Assert function calls
16841692
initialize_instance_manager_mock.assert_called_once()
16851693
write_timestamp_to_file_mock.assert_called_once()
1694+
metrics_publisher_mock.put_metric.assert_called_once_with(metric_name="ClustermgtdHeartbeat", value=1)
16861695
compute_fleet_status_manager_mock.get_status.assert_called_once()
16871696
if disable_cluster_management:
16881697
perform_health_check_actions_mock.assert_not_called()
@@ -2255,6 +2264,7 @@ def test_manage_cluster_boto3(
22552264
boto3_stubber("ec2", mocked_boto3_request)
22562265
mocker.patch("slurm_plugin.clustermgtd.datetime").now.return_value = datetime(2020, 1, 2, 0, 0, 0)
22572266
mocker.patch("slurm_plugin.clustermgtd.read_json", side_effect=[FLEET_CONFIG, LAUNCH_OVERRIDES, LAUNCH_OVERRIDES])
2267+
mocker.patch("slurm_plugin.clustermgtd.CloudWatchMetricsPublisher")
22582268
sync_config = ClustermgtdConfig(test_datadir / config_file)
22592269
sync_config.launch_overrides = {"dynamic": {"c5.xlarge": {"InstanceType": "t2.micro"}}}
22602270
cluster_manager = ClusterManager(sync_config)

0 commit comments

Comments
 (0)