11import asyncio
22import coolname
3+ import datetime
34import itertools
45import logging
56import os
67import pandas as pd
78import polars as pl
89import ray
9- from collections .abc import Iterable
10+ import re
11+ import requests
12+ import time
13+ from collections import defaultdict
14+ from dataclasses import dataclass
1015from packaging .version import Version
1116from ray .serve import shutdown
1217from ray .serve .handle import DeploymentHandle
13- from typing import Dict , List , Optional
18+ from typing import Dict , Iterable , List , Optional
1419
1520from .schema import schema as default_schema
1621
@@ -89,6 +94,7 @@ def __init__(
8994 namespace : str ,
9095 path : Optional [str ] = None ,
9196 enable_perspective_dashboard : bool = False ,
97+ scrape_prometheus_metrics : bool = False ,
9298 ):
9399 """An async Ray Actor Class to track task level metadata.
94100
@@ -166,6 +172,98 @@ def __init__(
166172 "error_message" : "str" ,
167173 },
168174 )
175+ if scrape_prometheus_metrics :
176+ self .__scraping_job = self .scrape_prometheus_metrics ()
177+
178+ def scrape_prometheus_metrics (self ):
179+ """
180+ Provide a heper method to parse perspective style metrics, a helper dataclass, and launch a job which indefinitely
181+ scrapes the NodeManagerAddress:MetricsExportPort/metrics and updates the appropriate perspective tables with metric values.
182+ """
183+ from prometheus_client .openmetrics import parser
184+
185+ @dataclass
186+ class ParsedOpenMetricsData :
187+ metric_name : str
188+ metric_description : str
189+ metric_type : str
190+ metric_value : str
191+ metric_metadata : str
192+
193+ def _parse_response (text ):
194+ parsed_data = []
195+ metric_name = None
196+ metric_description = None
197+ for line in text .split ("\n " ):
198+ if len (line ) > 0 :
199+ if line .startswith ("# HELP " ):
200+ metric_description = " " .join (line .split (" " )[3 :])
201+ elif line .startswith ("# TYPE " ):
202+ _ , _ , metric_name , metric_type = line .split (" " )
203+ else :
204+ matches = re .search (r".*\{(.*)\}(.*)" , line )
205+ if matches is not None :
206+ metric_metadata , metric_value = matches .groups ()
207+ metric_metadata = parser ._parse_labels_with_state_machine (metric_metadata )[0 ]
208+ else :
209+ _ , metric_value = line .split (" " )
210+ metric_metadata = dict ()
211+ parsed_data .append (
212+ ParsedOpenMetricsData (
213+ metric_name = metric_name ,
214+ metric_description = metric_description ,
215+ metric_type = metric_type ,
216+ metric_value = eval (metric_value ),
217+ metric_metadata = metric_metadata ,
218+ )
219+ )
220+ return parsed_data
221+
222+ @ray .remote
223+ def _scrape_prometheus_metrics ():
224+ """
225+ Refreshing every os.environ.get("RAYDAR_PROMETHEUS_METRICS_REFRESH_INTERVAL_S", 2) seconds, attempt to connect to the
226+ NodeManagerAddress:MetricsExportPort/metrics endpoint and parse the prometheus metrics provided by this endpoint.
227+ Publish the parsed metrics to the appropriate perspective table (tables are created based on metrics_name).
228+ """
229+ metrics = set ()
230+ while True :
231+ time .sleep (int (os .environ .get ("RAYDAR_PROMETHEUS_METRICS_REFRESH_INTERVAL_S" , 2 )))
232+ for node in ray .nodes ():
233+ all_values = defaultdict (list )
234+ if node .get ("Alive" , False ):
235+ response = requests .get (f"http://{ node .get ('NodeManagerAddress' )} :{ node .get ('MetricsExportPort' )} /metrics" )
236+ if response .status_code == 200 :
237+ parsed_values = _parse_response (response .text )
238+ for parsed_value in parsed_values :
239+ data = dict (
240+ metric_name = parsed_value .metric_name ,
241+ metric_description = parsed_value .metric_description ,
242+ metric_type = parsed_value .metric_type ,
243+ metric_value = parsed_value .metric_value ,
244+ timestamp = datetime .datetime .now (),
245+ )
246+ for key , value in parsed_value .metric_metadata .items ():
247+ data [key ] = value
248+ all_values [parsed_value .metric_name ].append (data )
249+
250+ for key , values in all_values .items ():
251+ if key not in metrics :
252+ metrics .add (key )
253+ self .proxy_server .remote ("new" , key , values )
254+ else :
255+ self .proxy_server .remote ("update" , key , values )
256+
257+ return _scrape_prometheus_metrics .remote ()
258+
259+ def get_proxy_server (self ):
260+ """A getter for this actors proxy server attribute. Can be used to create custom perspective visuals.
261+
262+ Returns: this actor's proxy_server attribute
263+ """
264+ if self .proxy_server :
265+ return self .proxy_server
266+ raise Exception ("This task_tracker has no active proxy_server." )
169267
170268 def callback (self , tasks : Iterable [ray .ObjectRef ]) -> None :
171269 """A remote function used by this actor's processor actor attribute. Will be called by a separate actor
@@ -218,7 +316,9 @@ def update_perspective_dashboard(self, completed_tasks) -> None:
218316 That proxy_server serves perspective tables which anticipate the data formats we provide.
219317
220318 Args:
221- completed_tasks: A list of tuples of the form (ObjectReference, TaskMetadata), where the ObjectReferences are neither Running nor Pending Assignment.
319+ completed_tasks: A list of tuples of the form (ObjectReference, TaskMetadata), where the
320+ ObjectReferences are neither Running nor Pending Assignment.
321+
222322 """
223323 data = [
224324 dict (
@@ -297,14 +397,6 @@ def get_df(self) -> pl.DataFrame:
297397 )
298398 return self .df
299399
300- def get_proxy_server (self ) -> ray .serve .handle .DeploymentHandle :
301- """A getter for this actors proxy server attribute. Can be used to create custom perspective visuals.
302- Returns: this actors proxy_server attribute
303- """
304- if self .proxy_server :
305- return self .proxy_server
306- raise Exception ("This task_tracker has no active proxy_server." )
307-
308400 def save_df (self ) -> None :
309401 """Saves the internally maintained dataframe of task related information from the ray GCS"""
310402 self .get_df ()
@@ -323,7 +415,7 @@ def clear_df(self) -> None:
323415
324416
325417class RayTaskTracker :
326- def __init__ (self , name : str = "task_tracker" , namespace : str = None , ** kwargs ):
418+ def __init__ (self , name : Optional [ str ] = "task_tracker" , namespace : Optional [ str ] = None , ** kwargs ):
327419 """A utility to construct AsyncMetadataTracker actors.
328420
329421 Wraps several remote AsyncMetadataTracker functions in a ray.get() call for convenience.
@@ -362,11 +454,11 @@ def get_df(self, process_user_metadata_column=False) -> pl.DataFrame:
362454 return df_with_user_metadata
363455 return df
364456
365- def save_df (self ) -> None :
457+ def save_df (self ):
366458 """Save the dataframe used by this object's AsyncMetadataTracker actor"""
367459 return ray .get (self .tracker .save_df .remote ())
368460
369- def clear (self ) -> None :
461+ def clear (self ):
370462 """Clear the dataframe used by this object's AsyncMetadataTracker actor"""
371463 return ray .get (self .tracker .clear_df .remote ())
372464
0 commit comments