forked from aws/sagemaker-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtelemetry_logging.py
More file actions
284 lines (244 loc) · 10.1 KB
/
telemetry_logging.py
File metadata and controls
284 lines (244 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Telemetry module for SageMaker Python SDK to collect usage data and metrics."""
from __future__ import absolute_import
import logging
import platform
import sys
from time import perf_counter
from typing import List
import functools
import requests
import boto3
from sagemaker.session import Session
from sagemaker.utils import resolve_value_from_config
from sagemaker.config.config_schema import TELEMETRY_OPT_OUT_PATH
from sagemaker.telemetry.constants import (
Feature,
Status,
Region,
DEFAULT_AWS_REGION,
)
from sagemaker.user_agent import SDK_VERSION, process_studio_metadata_file
logger = logging.getLogger(__name__)
OS_NAME = platform.system() or "UnresolvedOS"
OS_VERSION = platform.release() or "UnresolvedOSVersion"
OS_NAME_VERSION = "{}/{}".format(OS_NAME, OS_VERSION)
PYTHON_VERSION = "{}.{}.{}".format(
sys.version_info.major, sys.version_info.minor, sys.version_info.micro
)
TELEMETRY_OPT_OUT_MESSAGING = (
"SageMaker Python SDK will collect telemetry to help us better understand our user's needs, "
"diagnose issues, and deliver additional features.\n"
"To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. "
"For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html"
"#configuring-and-using-defaults-with-the-sagemaker-python-sdk."
)
FEATURE_TO_CODE = {
str(Feature.SDK_DEFAULTS): 1,
str(Feature.LOCAL_MODE): 2,
str(Feature.REMOTE_FUNCTION): 3,
str(Feature.MODEL_TRAINER): 4,
str(Feature.ESTIMATOR): 5,
}
STATUS_TO_CODE = {
str(Status.SUCCESS): 1,
str(Status.FAILURE): 0,
}
def _telemetry_emitter(feature: str, func_name: str):
"""Telemetry Emitter
Decorator to emit telemetry logs for SageMaker Python SDK functions. This class needs
sagemaker_session object as a member. Default session object is a pysdk v2 Session object
in this repo. When collecting telemetry for classes using sagemaker-core Session object,
we should be aware of its differences, such as sagemaker_session.sagemaker_config does not
exist in new Session class.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
sagemaker_session = None
if len(args) > 0 and hasattr(args[0], "sagemaker_session"):
# Get the sagemaker_session from the instance method args
sagemaker_session = args[0].sagemaker_session
elif feature == Feature.REMOTE_FUNCTION:
# Get the sagemaker_session from the function keyword arguments for remote function
sagemaker_session = kwargs.get(
"sagemaker_session", _get_default_sagemaker_session()
)
if sagemaker_session:
logger.debug("sagemaker_session found, preparing to emit telemetry...")
logger.info(TELEMETRY_OPT_OUT_MESSAGING)
response = None
caught_ex = None
studio_app_type = process_studio_metadata_file()
# Check if telemetry is opted out
telemetry_opt_out_flag = resolve_value_from_config(
direct_input=None,
config_path=TELEMETRY_OPT_OUT_PATH,
default_value=False,
sagemaker_session=sagemaker_session,
)
logger.debug("TelemetryOptOut flag is set to: %s", telemetry_opt_out_flag)
# Construct the feature list to track feature combinations
feature_list: List[int] = [FEATURE_TO_CODE[str(feature)]]
if (
hasattr(sagemaker_session, "sagemaker_config")
and sagemaker_session.sagemaker_config
and feature != Feature.SDK_DEFAULTS
):
feature_list.append(FEATURE_TO_CODE[str(Feature.SDK_DEFAULTS)])
if (
hasattr(sagemaker_session, "local_mode")
and sagemaker_session.local_mode
and feature != Feature.LOCAL_MODE
):
feature_list.append(FEATURE_TO_CODE[str(Feature.LOCAL_MODE)])
# Construct the extra info to track platform and environment usage metadata
extra = (
f"{func_name}"
f"&x-sdkVersion={SDK_VERSION}"
f"&x-env={PYTHON_VERSION}"
f"&x-sys={OS_NAME_VERSION}"
f"&x-platform={studio_app_type}"
)
# Add endpoint ARN to the extra info if available
if hasattr(sagemaker_session, "endpoint_arn") and sagemaker_session.endpoint_arn:
extra += f"&x-endpointArn={sagemaker_session.endpoint_arn}"
start_timer = perf_counter()
try:
# Call the original function
response = func(*args, **kwargs)
stop_timer = perf_counter()
elapsed = stop_timer - start_timer
extra += f"&x-latency={round(elapsed, 2)}"
if not telemetry_opt_out_flag:
_send_telemetry_request(
STATUS_TO_CODE[str(Status.SUCCESS)],
feature_list,
sagemaker_session,
None,
None,
extra,
)
except Exception as e: # pylint: disable=W0703
stop_timer = perf_counter()
elapsed = stop_timer - start_timer
extra += f"&x-latency={round(elapsed, 2)}"
if not telemetry_opt_out_flag:
_send_telemetry_request(
STATUS_TO_CODE[str(Status.FAILURE)],
feature_list,
sagemaker_session,
str(e),
e.__class__.__name__,
extra,
)
caught_ex = e
finally:
if caught_ex:
raise caught_ex
return response # pylint: disable=W0150
else:
logger.debug(
"Unable to send telemetry for function %s. "
"sagemaker_session is not provided or not valid.",
func_name,
)
return func(*args, **kwargs)
return wrapper
return decorator
def _send_telemetry_request(
status: int,
feature_list: List[int],
session: Session,
failure_reason: str = None,
failure_type: str = None,
extra_info: str = None,
) -> None:
"""Make GET request to an empty object in S3 bucket"""
try:
accountId = _get_accountId(session) if session else "NotAvailable"
region = _get_region_or_default(session)
try:
Region(region) # Validate the region
except ValueError:
logger.warning(
"Region not found in supported regions. Telemetry request will not be emitted."
)
return
url = _construct_url(
accountId,
region,
str(status),
str(
",".join(map(str, feature_list))
), # Remove brackets and quotes to cut down on length
failure_reason,
failure_type,
extra_info,
)
# Send the telemetry request
logger.debug("Sending telemetry request to [%s]", url)
_requests_helper(url, 2)
logger.debug("SageMaker Python SDK telemetry successfully emitted.")
except Exception: # pylint: disable=W0703
logger.debug("SageMaker Python SDK telemetry not emitted!")
def _construct_url(
accountId: str,
region: str,
status: str,
feature: str,
failure_reason: str,
failure_type: str,
extra_info: str,
) -> str:
"""Construct the URL for the telemetry request"""
base_url = (
f"https://sm-pysdk-t-{region}.s3.{region}.amazonaws.com/telemetry?"
f"x-accountId={accountId}"
f"&x-status={status}"
f"&x-feature={feature}"
)
logger.debug("Failure reason: %s", failure_reason)
if failure_reason:
base_url += f"&x-failureReason={failure_reason}"
base_url += f"&x-failureType={failure_type}"
if extra_info:
base_url += f"&x-extra={extra_info}"
return base_url
def _requests_helper(url, timeout):
"""Make a GET request to the given URL"""
response = None
try:
response = requests.get(url, timeout)
except requests.exceptions.RequestException as e:
logger.exception("Request exception: %s", str(e))
return response
def _get_accountId(session):
"""Return the account ID from the boto session"""
try:
sts = session.boto_session.client("sts")
return sts.get_caller_identity()["Account"]
except Exception: # pylint: disable=W0703
return None
def _get_region_or_default(session):
"""Return the region name from the boto session or default to us-west-2"""
try:
return session.boto_session.region_name
except Exception: # pylint: disable=W0703
return DEFAULT_AWS_REGION
def _get_default_sagemaker_session():
"""Return the default sagemaker session"""
boto_session = boto3.Session(region_name=DEFAULT_AWS_REGION)
sagemaker_session = Session(boto_session=boto_session)
return sagemaker_session