-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathlogs.py
More file actions
94 lines (73 loc) · 3.63 KB
/
logs.py
File metadata and controls
94 lines (73 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import logging
import time
from concurrent.futures import ThreadPoolExecutor
# pylint: disable=no-name-in-module
from ...aws import cloudformation, logs
_logger = logging.getLogger("braintrust.install.logs")
def build_parser(subparsers, parents):
parser = subparsers.add_parser("logs", help="Capture recent logs", parents=parents)
parser.add_argument("name", help="Name of the CloudFormation stack to collect logs from")
parser.add_argument("--service", help="Name of the service", default="api", choices=["api", "brainstore", "all"])
parser.add_argument("--hours", help="Number of hours in the past to collect logs from", default=1, type=float)
parser.set_defaults(func=main)
def main(args):
stacks = cloudformation.describe_stacks(StackName=args.name)["Stacks"]
if len(stacks) == 0:
raise ValueError(f"Stack with name {args.name} does not exist")
if len(stacks) > 1:
raise ValueError(f"Multiple stacks with name {args.name} exist")
stack = stacks[0]
_logger.debug(stack)
if args.service == "all":
services = ["api", "brainstore"]
else:
services = [args.service]
log_group_names = []
for service in services:
if service == "api":
for name in ["APIHandlerJSName", "AIProxyFnName"]:
lambda_function = [x for x in stack["Outputs"] if x["OutputKey"] == name]
if len(lambda_function) > 1:
raise ValueError(f"Expected 1 APIHandlerName, found {len(lambda_function)} ({lambda_function}))")
if len(lambda_function) == 0:
_logger.warning(f"Could not find {name}, skipping...")
continue
log_group_names.append(f"/aws/lambda/{lambda_function[0]['OutputValue']}")
elif service == "brainstore":
log_group_names.append(f"/braintrust/{args.name}/brainstore")
start_time = int(time.time() - 3600 * args.hours) * 1000
for log_group_name in log_group_names:
print(f"--- LOG GROUP: {log_group_name}")
log_groups = logs.describe_log_groups(logGroupNamePrefix=log_group_name)["logGroups"]
if not any(group["logGroupName"] == log_group_name for group in log_groups):
print(f"Log group {log_group_name} does not exist")
continue
all_streams = []
first_start_time = None
nextToken = None
while first_start_time is None or first_start_time >= start_time:
kwargs = {}
if nextToken is not None:
kwargs["nextToken"] = nextToken
stream_resp = logs.describe_log_streams(
logGroupName=log_group_name, descending=True, orderBy="LastEventTime", **kwargs
)
first_start_time = min(s["firstEventTimestamp"] for s in stream_resp["logStreams"])
nextToken = stream_resp.get("nextToken")
streams = [s for s in stream_resp["logStreams"] if s["firstEventTimestamp"] >= start_time]
streams.sort(key=lambda x: x["firstEventTimestamp"])
all_streams = streams + all_streams
_logger.debug(all_streams)
def get_events(stream):
return logs.get_log_events(
logGroupName=log_group_name,
logStreamName=stream["logStreamName"],
startTime=start_time,
startFromHead=True,
)
with ThreadPoolExecutor(8) as executor:
events = executor.map(get_events, all_streams)
for stream, log in zip(all_streams, events):
print(f"---- LOG STREAM: {stream['logStreamName']}")
for event in log["events"]:
print(event)