-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathalerts.py
More file actions
388 lines (342 loc) · 13 KB
/
alerts.py
File metadata and controls
388 lines (342 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
from contextlib import nullcontext
from datetime import timezone
from typing import Optional
from typing import Union
import click
import requests
from boltons.iterutils import bucketize
from boltons.iterutils import chunked
from click import BadOptionUsage
from click import File
from pydantic import Field
from rich.panel import Panel
from _incydr_cli import console
from _incydr_cli import get_user_project_path
from _incydr_cli import logging_options
from _incydr_cli import render
from _incydr_cli.cmds.options.alert_filter_options import advanced_query_option
from _incydr_cli.cmds.options.alert_filter_options import filter_options
from _incydr_cli.cmds.options.output_options import columns_option
from _incydr_cli.cmds.options.output_options import input_format_option
from _incydr_cli.cmds.options.output_options import output_options
from _incydr_cli.cmds.options.output_options import single_format_option
from _incydr_cli.cmds.options.output_options import SingleFormat
from _incydr_cli.cmds.options.output_options import table_format_option
from _incydr_cli.cmds.options.output_options import TableFormat
from _incydr_cli.cmds.options.utils import checkpoint_option
from _incydr_cli.cmds.utils import deprecation_warning
from _incydr_cli.cmds.utils import warn_interrupt
from _incydr_cli.core import IncydrCommand
from _incydr_cli.core import IncydrGroup
from _incydr_cli.cursor import CursorStore
from _incydr_cli.file_readers import AutoDecodedFile
from _incydr_cli.logger import get_server_logger
from _incydr_sdk.alerts.models.alert import AlertSummary
from _incydr_sdk.core.client import Client
from _incydr_sdk.core.models import CSVModel
from _incydr_sdk.core.models import Model
from _incydr_sdk.queries.alerts import AlertQuery
from _incydr_sdk.utils import model_as_card
# Deprecated April 2024.
DEPRECATION_TEXT = "DeprecationWarning: Alerts commands are deprecated. Use the 'incydr sessions' command group instead."
@click.group(cls=IncydrGroup)
@logging_options
def alerts():
"""DEPRECATED. Use the Sessions command group instead. View and manage alerts."""
deprecation_warning(DEPRECATION_TEXT)
@alerts.command(cls=IncydrCommand)
@checkpoint_option
@logging_options
@columns_option
@table_format_option
@output_options
@advanced_query_option
@filter_options
def search(
format_: TableFormat,
columns: Optional[str],
output: Optional[str],
certs: Optional[str],
ignore_cert_validation: Optional[bool],
advanced_query: Optional[Union[str, File]],
start: Optional[str],
end: Optional[str],
on: Optional[str],
alert_id: Optional[str],
type_: Optional[str],
name: Optional[str],
actor: Optional[str],
actor_id: Optional[str],
risk_severity: Optional[str],
state: Optional[str],
rule_id: Optional[str],
alert_severity: Optional[str],
checkpoint_name: Optional[str],
):
"""
Search alerts. Various options are provided to filter query results.
Results will be output to the console by default, use the `--output` option to send data to a server.
Checkpointing is available through the `--checkpoint <checkpoint-name>` option and will only return new results
on subsequent queries with that same checkpoint. Checkpointing filters by timestamp, additional filter
options will need to be included in each run.
"""
client = Client()
cursor = _get_cursor_store(client.settings.api_client_id)
if output:
format_ = TableFormat.json_lines
# Use stored checkpoint timestamp for start filter if applicable
if checkpoint_name:
checkpoint = cursor.get(checkpoint_name)
if checkpoint:
start = float(checkpoint)
if advanced_query:
if not isinstance(advanced_query, str):
advanced_query = advanced_query.read()
query = AlertQuery.parse_raw(advanced_query)
else:
if not any([start, on, end]):
raise BadOptionUsage(
"start",
"--start, --end, or --on options are required if not using the --advanced-query option "
"or using an existing checkpoint.",
)
query = _create_query(
start=start,
end=end,
on=on,
alert_id=alert_id,
type_=type_,
name=name,
actor=actor,
actor_id=actor_id,
risk_severity=risk_severity,
state=state,
rule_id=rule_id,
alert_severity=alert_severity,
)
alerts_gen = client.alerts.v1.iter_all(query)
if checkpoint_name:
alerts_gen = _update_checkpoint(cursor, checkpoint_name, alerts_gen)
with warn_interrupt() if checkpoint_name else nullcontext():
if output:
logger = get_server_logger(output, certs, ignore_cert_validation)
for alert_ in alerts_gen:
logger.info(alert_.json())
return
if format_ == TableFormat.table:
columns = columns or [
"created_at",
"risk_severity",
"state",
"actor",
"actor_id",
"name",
"description",
"watchlists",
"state_last_modified_by",
"state_last_modified_at",
"id",
]
render.table(AlertSummary, alerts_gen, columns=columns, flat=False)
elif format_ == TableFormat.csv:
render.csv(AlertSummary, alerts_gen, columns=columns, flat=True)
else:
printed = False
for alert_ in alerts_gen:
printed = True
if format_ == TableFormat.json_pretty:
console.print_json(alert_.json())
else:
click.echo(alert_.json())
if not printed:
console.print("No results found.")
@alerts.command()
@click.argument("checkpoint-name")
def clear_checkpoint(checkpoint_name: str):
"""Remove the saved alerts checkpoint from searches made with `--checkpoint` mode."""
client = Client()
cursor = _get_cursor_store(client.settings.api_client_id)
cursor.delete(checkpoint_name)
# Future enhancement: add functionality to show human-readable summaries for multiple alerts
@alerts.command(cls=IncydrCommand)
@logging_options
@single_format_option
@click.argument("alert-id")
def show(
alert_id: str,
format_: SingleFormat,
):
"""
Show the details of a single alert.
"""
client = Client()
alert = client.alerts.v1.get_details(alert_id)[0]
if format_ == SingleFormat.rich:
console.print(Panel.fit(model_as_card(alert)))
elif format_ == SingleFormat.json_pretty:
console.print_json(alert.json())
else:
click.echo(alert.json())
@alerts.command(cls=IncydrCommand)
@logging_options
@click.argument("alert-id")
@click.argument("note")
def add_note(
alert_id: str,
note: str,
):
"""
Add an optional note to an alert.
"""
client = Client()
client.alerts.v1.add_note(alert_id, note)
console.print("Note added.")
@alerts.command(cls=IncydrCommand)
@logging_options
@click.argument("alert-id")
@click.argument("state")
@click.option(
"--note",
default=None,
help="Optional note to indicate the reason for the state change.",
)
def update_state(
alert_id: str,
state: str,
note: Optional[str],
):
"""
Change the state of an alert, and optionally add a note.
"""
client = Client()
client.alerts.v1.change_state(alert_id, state, note)
console.print("State changed successfully.")
@alerts.command(cls=IncydrCommand)
@click.argument("file", type=AutoDecodedFile())
@input_format_option
@click.option(
"--state",
type=click.Choice(["OPEN", "RESOLVED", "IN_PROGRESS", "PENDING"]),
help="Override CSV/JSON input's `state` value with this value.",
)
@click.option("--note", help="Override CSV/JSON input's `note` value with this value.")
@logging_options
def bulk_update_state(
file,
format_,
state: str,
note: str,
):
"""
Bulk update multiple alerts from a file.
Takes a single arg `FILE` which specifies the path to the file (use "-" to read from stdin).
File format can either be CSV or [JSON Lines format](https://jsonlines.org) (Default is CSV).
The --state and --note options to this command will override respective columns/keys in the CSV/JSON input.
This allows you to bulk change a set of alerts without having manually modify the state/note value for each CSV or
JSON Lines row in the file. For example, to close all currently "PENDING" alerts older than <DATE>:
incydr alerts search --end <DATE> --state PENDING --format json-lines | incydr alerts bulk-update-state - --state RESOLVED --note "bulk resolved alerts older than <DATE>"
If --state is not provided, the CSV/JSON input _must_ have a `state` column/key for each row/object.
"""
# if --state is provided, we want that column/key to be optional on input data, otherwise required
state_type = Optional[str] if state else str
class AlertBulkCSV(CSVModel):
alert_id: str = Field(csv_aliases=["id", "alert_id"])
state: state_type = Field(csv_aliases=["state"])
note: Optional[str]
class AlertBulkJSON(Model):
alert_id: str = Field(alias="id")
state: state_type
note: Optional[str]
client = Client()
if format_ == "csv":
alerts_ = AlertBulkCSV.parse_csv(file)
else:
alerts_ = AlertBulkJSON.parse_json_lines(file)
# group alerts where state and note are the same, so we can batch API calls
buckets = bucketize(
alerts_,
key=lambda alert: (state or alert.state, note or alert.note),
value_transform=lambda alert: alert.alert_id,
)
for bucket in buckets:
state_, note_ = bucket
alert_ids = buckets[bucket]
# backend allows max of 100 alerts per request
for chunk in chunked(alert_ids, size=100):
try:
client.alerts.v1.change_state(chunk, state_, note_)
console.print(
f"{len(chunk)} alerts successfully set to '{state_}' with note: '{note_}'"
)
except requests.HTTPError as err:
# backend doesn't specify _which_ alert_id doesn't exist, so we need to try them 1 by 1
if err.response.status_code == 404:
console.print(
f"[red]Error processing batch of {len(chunk)} alerts, trying individually..."
)
for id_ in chunk:
try:
client.alerts.v1.change_state(id_, state_, note_)
console.print(
f"Successfully set alert_id '{id_}' to '{state_}' with note: '{note_}'"
)
except requests.HTTPError as err:
console.print(
f"[red]Error updating alert_id[/red] '{id_}': {err.response.status_code}"
)
field_option_map = {
"alert_id": "AlertId",
"type_": "Type",
"name": "Name",
"actor": "Actor",
"actor_id": "ActorId",
"risk_severity": "RiskSeverity",
"state": "State",
"rule_id": "RuleId",
"alert_severity": "AlertSeverity",
}
def _create_query(**kwargs):
query = AlertQuery(
start_date=kwargs["start"], end_date=kwargs["end"], on=kwargs["on"]
)
for k, v in kwargs.items():
if v:
if k in ["start", "end", "on"]:
continue
query = query.equals(field_option_map[k], v)
return query
def _get_cursor_store(api_key):
"""
Get cursor store for alerts search checkpoints.
"""
dir_path = get_user_project_path(
"checkpoints",
api_key,
"alert_checkpoints",
)
return CursorStore(dir_path, "alerts")
def _update_checkpoint(cursor, checkpoint_name, alerts_gen):
"""
De-duplicates events across checkpointed runs.
Since using the timestamp of the last event
processed as the `--start` time of the next run causes the last event to show up again in the
next results, store the last alert IDs in the cursor to
filter out on the next run.
It's also possible that two events have the exact same timestamp, so
`checkpoint_alerts` needs to be a list of alert IDs so we can filter out everything that's actually
been processed.
"""
checkpoint_alerts = cursor.get_items(checkpoint_name)
new_timestamp = None
new_alerts = []
for alert in alerts_gen:
alert_id = alert.id
if alert_id not in checkpoint_alerts:
if not new_timestamp or alert.created_at > new_timestamp:
new_timestamp = alert.created_at
new_alerts.clear()
new_alerts.append(alert_id)
yield alert
new_timestamp.replace(tzinfo=timezone.utc)
cursor.replace(checkpoint_name, new_timestamp.timestamp())
cursor.replace_items(checkpoint_name, new_alerts)