Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This file is used to list changes made in each version of the aws-parallelcluste

**CHANGES**
- Direct users to slurm_resume log to see EC2 error codes if no instances are launched.
- Emit metric `ClustermgtdHeartbeat` to signal clustermgtd heartbeat.
- Emit clustermgtd heartbeat as a structured event to CloudWatch Logs for metric filter-based monitoring.

3.14.1
------
Expand Down
93 changes: 0 additions & 93 deletions src/slurm_plugin/cloudwatch_utils.py

This file was deleted.

35 changes: 35 additions & 0 deletions src/slurm_plugin/cluster_event_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@
"message": "Count of EC2 instances backing the cluster compute fleet",
"event_type": "cluster-instance-count",
}
CLUSTERMGTD_HEARTBEAT = {
"message": "Clustermgtd heartbeat",
"event_type": "clustermgtd-heartbeat",
}


class ClusterEventPublisher:
Expand Down Expand Up @@ -474,6 +478,37 @@ def publish_compute_node_events(self, compute_nodes: List[SlurmNode], cluster_in
event_supplier=({"detail": self._describe_node(node)} for node in compute_nodes),
)

# Example event generated by this function:
# {
# "datetime": "2023-04-03T18:10:13.089+00:00",
# "version": 0,
# "scheduler": "slurm",
# "cluster-name": "integ-tests-9hbu2op3liukbqqz-develop",
# "node-role": "HeadNode",
# "component": "clustermgtd",
# "level": "INFO",
# "instance-id": "i-04886f8b56e5967ee",
# "event-type": "clustermgtd-heartbeat",
# "message": "Clustermgtd heartbeat",
# "detail": {
# "heartbeat-timestamp": "2023-04-03T18:10:13.089+00:00"
# }
# }
@log_exception(logger, "publish_heartbeat_event", catch_exception=Exception, raise_on_error=False)
def publish_heartbeat_event(self, heartbeat_timestamp: datetime):
"""Publish heartbeat event."""
timestamp = ClusterEventPublisher.timestamp()
self.publish_event(
logging.INFO,
**CLUSTERMGTD_HEARTBEAT,
timestamp=timestamp,
detail={
"heartbeat-timestamp": (
heartbeat_timestamp.isoformat(timespec="milliseconds") if heartbeat_timestamp else None
),
},
)

# Slurm Resume Events
# Note: This uses the same schema as `publish_unhealthy_static_node_events` from clustermgtd
# Example event generated by this function:
Expand Down
22 changes: 4 additions & 18 deletions src/slurm_plugin/clustermgtd.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
from common.utils import check_command_output, read_json, sleep_remaining_loop_time, time_is_up, wait_remaining_time
from retrying import retry
from slurm_plugin.capacity_block_manager import CapacityBlockManager
from slurm_plugin.cloudwatch_utils import CloudWatchMetricsPublisher
from slurm_plugin.cluster_event_publisher import ClusterEventPublisher
from slurm_plugin.common import TIMESTAMP_FORMAT, ScalingStrategy, log_exception, print_with_count
from slurm_plugin.console_logger import ConsoleLogger
Expand All @@ -61,7 +60,6 @@
LOOP_TIME = 60
CONSOLE_OUTPUT_WAIT_TIME = 5 * 60
MAXIMUM_TASK_BACKLOG = 100
CW_METRICS_HEARTBEAT = "ClustermgtdHeartbeat"
log = logging.getLogger(__name__)
compute_logger = log.getChild("console_output")
event_logger = log.getChild("events")
Expand Down Expand Up @@ -403,7 +401,6 @@ def __init__(self, config):
self._event_publisher = None
self._partition_nodelist_mapping_instance = None
self._capacity_block_manager = None
self._metrics_publisher = None
self.set_config(config)

def set_config(self, config: ClustermgtdConfig):
Expand All @@ -429,7 +426,6 @@ def set_config(self, config: ClustermgtdConfig):
self._instance_manager = self._initialize_instance_manager(config)
self._console_logger = self._initialize_console_logger(config)
self._capacity_block_manager = self._initialize_capacity_block_manager(config)
self._metrics_publisher = self._initialize_metrics_publisher(config)

def shutdown(self):
if self._task_executor:
Expand Down Expand Up @@ -484,16 +480,6 @@ def _initialize_capacity_block_manager(config):
region=config.region, fleet_config=config.fleet_config, boto3_config=config.boto3_config
)

@staticmethod
def _initialize_metrics_publisher(config):
"""Initialize CloudWatch metrics publisher."""
return CloudWatchMetricsPublisher(
region=config.region,
cluster_name=config.cluster_name,
instance_id=config.head_node_instance_id,
boto3_config=config.boto3_config,
)

def _update_compute_fleet_status(self, status):
log.info("Updating compute fleet status from %s to %s", self._compute_fleet_status, status)
self._compute_fleet_status_manager.update_status(status)
Expand Down Expand Up @@ -588,16 +574,16 @@ def manage_cluster(self):
# Write clustermgtd heartbeat to file
self._write_timestamp_to_file()

# Publish heartbeat metric to CloudWatch
self._metrics_publisher.put_metric(metric_name=CW_METRICS_HEARTBEAT, value=1)

def _write_timestamp_to_file(self):
"""Write timestamp into shared file so compute nodes can determine if head node is online."""
# Make clustermgtd heartbeat readable to all users
heartbeat_time = datetime.now(tz=timezone.utc)
with open(os.open(self._config.heartbeat_file_path, os.O_WRONLY | os.O_CREAT, 0o644), "w") as timestamp_file:
# Note: heartbeat must be written with datetime.strftime to convert localized datetime into str
# datetime.strptime will not work with str(datetime)
timestamp_file.write(datetime.now(tz=timezone.utc).strftime(TIMESTAMP_FORMAT))
timestamp_file.write(heartbeat_time.strftime(TIMESTAMP_FORMAT))
# Publish heartbeat event to events log
self._event_publisher.publish_heartbeat_event(heartbeat_time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we wrap this with a try-catch to prevent it from failing clustermgtd logic?
Here we can deal case by case. In general, we should wrap all non-critical stuffs with try catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this need. This is done in

@log_exception(logger, "publish_heartbeat_event", catch_exception=Exception, raise_on_error=False)
def publish_heartbeat_event(self, heartbeat_timestamp: datetime):
.

Every function of the event publ;isher has the same mechanism to not fail on exceptions


@staticmethod
@retry(stop_max_attempt_number=2, wait_fixed=1000)
Expand Down
175 changes: 0 additions & 175 deletions tests/slurm_plugin/test_cloudwatch_utils.py

This file was deleted.

Loading