Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
LABEL_DAG_ID = "dag_id"
LABEL_LOGICAL_DATE = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date"
LABEL_TRY_NUMBER = "try_number"
LABEL_TASK_INSTANCE_ID = "task_instance_id"


@attrs.define(kw_only=True)
Expand Down Expand Up @@ -220,8 +221,24 @@ def escape_label_value(value: str) -> str:
for key, value in self.resource.labels.items():
log_filters.append(f"resource.labels.{escape_label_key(key)}={escape_label_value(value)}")

for key, value in ti_labels.items():
log_filters.append(f"labels.{escape_label_key(key)}={escape_label_value(value)}")
ti_id_val = ti_labels.get(LABEL_TASK_INSTANCE_ID)
legacy_filters = [
f"labels.{escape_label_key(k)}={escape_label_value(v)}"
for k, v in ti_labels.items()
if k != LABEL_TASK_INSTANCE_ID
]

if ti_id_val:
ti_id_filter = (
f"labels.{escape_label_key(LABEL_TASK_INSTANCE_ID)}={escape_label_value(ti_id_val)}"
)
if legacy_filters:
log_filters.append(f"({ti_id_filter} OR ({' AND '.join(legacy_filters)}))")
else:
log_filters.append(ti_id_filter)
else:
log_filters.extend(legacy_filters)

return "\n".join(log_filters)

def read_logs(
Expand Down Expand Up @@ -268,14 +285,18 @@ def _read_single_logs_page(self, log_filter: str, page_token: str | None = None)

def _task_instance_to_labels(ti) -> dict[str, str]:
"""Convert a task instance to Stackdriver labels."""
return {
LABEL_TASK_ID: ti.task_id,
LABEL_DAG_ID: ti.dag_id,
labels = {
LABEL_TASK_ID: str(ti.task_id),
LABEL_DAG_ID: str(ti.dag_id),
LABEL_LOGICAL_DATE: str(ti.logical_date.isoformat())
if AIRFLOW_V_3_0_PLUS
else str(ti.execution_date.isoformat()),
LABEL_TRY_NUMBER: str(ti.try_number),
}
ti_id = getattr(ti, "id", None)
if ti_id:
labels[LABEL_TASK_INSTANCE_ID] = str(ti_id)
return labels


class StackdriverTaskHandler(logging.Handler):
Expand Down Expand Up @@ -314,6 +335,7 @@ class StackdriverTaskHandler(logging.Handler):
LABEL_DAG_ID = LABEL_DAG_ID
LABEL_LOGICAL_DATE = LABEL_LOGICAL_DATE
LABEL_TRY_NUMBER = LABEL_TRY_NUMBER
LABEL_TASK_INSTANCE_ID = LABEL_TASK_INSTANCE_ID
LOG_VIEWER_BASE_URL = "https://console.cloud.google.com/logs/viewer"
LOG_NAME = "Google Stackdriver"

Expand Down Expand Up @@ -451,14 +473,18 @@ def read(

@classmethod
def _task_instance_to_labels(cls, ti: TaskInstance) -> dict[str, str]:
return {
cls.LABEL_TASK_ID: ti.task_id,
cls.LABEL_DAG_ID: ti.dag_id,
labels = {
cls.LABEL_TASK_ID: str(ti.task_id),
cls.LABEL_DAG_ID: str(ti.dag_id),
cls.LABEL_LOGICAL_DATE: str(ti.logical_date.isoformat())
if AIRFLOW_V_3_0_PLUS
else str(ti.execution_date.isoformat()),
cls.LABEL_TRY_NUMBER: str(ti.try_number),
}
ti_id = getattr(ti, "id", None)
if ti_id:
labels[cls.LABEL_TASK_INSTANCE_ID] = str(ti_id)
return labels

@property
def log_name(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,23 @@ def test_upload_no_delete(self, mock_transport_prop):
def test_prepare_log_filter(self, mock_get_creds_and_project_id):
mock_get_creds_and_project_id.return_value = ("creds", "project_id")

ti_labels = {
"task_id": "test_task",
"dag_id": "test_dag",
"try_number": "1",
"task_instance_id": "test_ti_id",
}
log_filter = self.io.prepare_log_filter(ti_labels)

assert 'resource.type="global"' in log_filter
assert 'logName="projects/project_id/logs/airflow"' in log_filter
expected_or = '(labels.task_instance_id="test_ti_id" OR (labels.task_id="test_task" AND labels.dag_id="test_dag" AND labels.try_number="1"))'
assert expected_or in log_filter

@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")
def test_prepare_log_filter_legacy(self, mock_get_creds_and_project_id):
mock_get_creds_and_project_id.return_value = ("creds", "project_id")

ti_labels = {
"task_id": "test_task",
"dag_id": "test_dag",
Expand All @@ -204,6 +221,7 @@ def test_prepare_log_filter(self, mock_get_creds_and_project_id):
assert 'logName="projects/project_id/logs/airflow"' in log_filter
assert 'labels.task_id="test_task"' in log_filter
assert 'labels.dag_id="test_dag"' in log_filter
assert " OR " not in log_filter

@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")
def test_prepare_log_filter_with_custom_resource(self, mock_get_creds_and_project_id):
Expand Down Expand Up @@ -386,6 +404,7 @@ def test_should_set_labels(self, mock_client, mock_get_creds_and_project_id):
"dag_id": self.DAG_ID,
date_key: "2016-01-01T00:00:00+00:00",
"try_number": "1",
**({"task_instance_id": str(self.ti.id)} if hasattr(self.ti, "id") else {}),
}
resource = Resource(type="global", labels={})
self.transport_mock.return_value.send.assert_called_once_with(
Expand All @@ -411,6 +430,7 @@ def test_should_append_labels(self, mock_client, mock_get_creds_and_project_id):
"dag_id": self.DAG_ID,
date_key: "2016-01-01T00:00:00+00:00",
"try_number": "1",
**({"task_instance_id": str(self.ti.id)} if hasattr(self.ti, "id") else {}),
"product.googleapis.com/task_id": "test-value",
}
resource = Resource(type="global", labels={})
Expand All @@ -431,13 +451,23 @@ def test_should_read_logs_for_all_try(self, mock_client, mock_get_creds_and_proj

date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date"

filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
'labels.task_id="task_for_testing_stackdriver_task_handler"\n'
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n'
f'labels.{date_label}="2016-01-01T00:00:00+00:00"'
)
if hasattr(self.ti, "id"):
filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
f'(labels.task_instance_id="{str(self.ti.id)}" OR ('
'labels.task_id="task_for_testing_stackdriver_task_handler" AND '
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler" AND '
f'labels.{date_label}="2016-01-01T00:00:00+00:00"))'
)
else:
filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
'labels.task_id="task_for_testing_stackdriver_task_handler"\n'
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n'
f'labels.{date_label}="2016-01-01T00:00:00+00:00"'
)
mock_client.return_value.list_log_entries.assert_called_once_with(
request=ListLogEntriesRequest(
resource_names=["projects/project_id"],
Expand All @@ -463,13 +493,23 @@ def test_should_read_logs_for_task_with_quote(self, mock_client, mock_get_creds_

logs, metadata = stackdriver_task_handler.read(self.ti)
date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date"
filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
'labels.task_id="K\\"OT"\n'
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n'
f'labels.{date_label}="2016-01-01T00:00:00+00:00"'
)
if hasattr(self.ti, "id"):
filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
f'(labels.task_instance_id="{str(self.ti.id)}" OR ('
'labels.task_id="K\\"OT" AND '
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler" AND '
f'labels.{date_label}="2016-01-01T00:00:00+00:00"))'
)
else:
filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
'labels.task_id="K\\"OT"\n'
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n'
f'labels.{date_label}="2016-01-01T00:00:00+00:00"'
)
mock_client.return_value.list_log_entries.assert_called_once_with(
request=ListLogEntriesRequest(
resource_names=["projects/project_id"],
Expand All @@ -493,14 +533,25 @@ def test_should_read_logs_for_single_try(self, mock_client, mock_get_creds_and_p

logs, metadata = stackdriver_task_handler.read(self.ti, 3)
date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date"
filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
'labels.task_id="task_for_testing_stackdriver_task_handler"\n'
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n'
f'labels.{date_label}="2016-01-01T00:00:00+00:00"\n'
'labels.try_number="3"'
)
if hasattr(self.ti, "id"):
filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
f'(labels.task_instance_id="{str(self.ti.id)}" OR ('
'labels.task_id="task_for_testing_stackdriver_task_handler" AND '
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler" AND '
f'labels.{date_label}="2016-01-01T00:00:00+00:00" AND '
'labels.try_number="3"))'
)
else:
filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
'labels.task_id="task_for_testing_stackdriver_task_handler"\n'
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n'
f'labels.{date_label}="2016-01-01T00:00:00+00:00"\n'
'labels.try_number="3"'
)
mock_client.return_value.list_log_entries.assert_called_once_with(
request=ListLogEntriesRequest(
resource_names=["projects/project_id"],
Expand All @@ -525,14 +576,25 @@ def test_should_read_logs_with_pagination(self, mock_client, mock_get_creds_and_

logs, metadata1 = stackdriver_task_handler.read(self.ti, 3)
date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date"
filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
'labels.task_id="task_for_testing_stackdriver_task_handler"\n'
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n'
f'labels.{date_label}="2016-01-01T00:00:00+00:00"\n'
'labels.try_number="3"'
)
if hasattr(self.ti, "id"):
filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
f'(labels.task_instance_id="{str(self.ti.id)}" OR ('
'labels.task_id="task_for_testing_stackdriver_task_handler" AND '
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler" AND '
f'labels.{date_label}="2016-01-01T00:00:00+00:00" AND '
'labels.try_number="3"))'
)
else:
filter_str = (
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
'labels.task_id="task_for_testing_stackdriver_task_handler"\n'
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n'
f'labels.{date_label}="2016-01-01T00:00:00+00:00"\n'
'labels.try_number="3"'
)
mock_client.return_value.list_log_entries.assert_called_once_with(
request=ListLogEntriesRequest(
resource_names=["projects/project_id"],
Expand All @@ -551,14 +613,7 @@ def test_should_read_logs_with_pagination(self, mock_client, mock_get_creds_and_
mock_client.return_value.list_log_entries.assert_called_with(
request=ListLogEntriesRequest(
resource_names=["projects/project_id"],
filter=(
'resource.type="global"\n'
'logName="projects/project_id/logs/airflow"\n'
'labels.task_id="task_for_testing_stackdriver_task_handler"\n'
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n'
f'labels.{date_label}="2016-01-01T00:00:00+00:00"\n'
'labels.try_number="3"'
),
filter=filter_str,
order_by="timestamp asc",
page_size=1000,
page_token="TOKEN1",
Expand Down Expand Up @@ -602,16 +657,29 @@ def test_should_read_logs_with_custom_resources(self, mock_client, mock_get_cred

logs, metadata = stackdriver_task_handler.read(self.ti)
date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date"
filter_str = (
'resource.type="cloud_composer_environment"\n'
'logName="projects/project_id/logs/airflow"\n'
'resource.labels."environment.name"="test-instance"\n'
'resource.labels.location="europe-west-3"\n'
'resource.labels.project_id="project_id"\n'
'labels.task_id="task_for_testing_stackdriver_task_handler"\n'
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n'
f'labels.{date_label}="2016-01-01T00:00:00+00:00"'
)
if hasattr(self.ti, "id"):
filter_str = (
'resource.type="cloud_composer_environment"\n'
'logName="projects/project_id/logs/airflow"\n'
'resource.labels."environment.name"="test-instance"\n'
'resource.labels.location="europe-west-3"\n'
'resource.labels.project_id="project_id"\n'
f'(labels.task_instance_id="{str(self.ti.id)}" OR ('
'labels.task_id="task_for_testing_stackdriver_task_handler" AND '
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler" AND '
f'labels.{date_label}="2016-01-01T00:00:00+00:00"))'
)
else:
filter_str = (
'resource.type="cloud_composer_environment"\n'
'logName="projects/project_id/logs/airflow"\n'
'resource.labels."environment.name"="test-instance"\n'
'resource.labels.location="europe-west-3"\n'
'resource.labels.project_id="project_id"\n'
'labels.task_id="task_for_testing_stackdriver_task_handler"\n'
'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n'
f'labels.{date_label}="2016-01-01T00:00:00+00:00"'
)
mock_client.return_value.list_log_entries.assert_called_once_with(
request=ListLogEntriesRequest(
resource_names=["projects/project_id"],
Expand Down Expand Up @@ -663,14 +731,24 @@ def test_should_return_valid_external_url(self, mock_client, mock_get_creds_and_

filter_params = parsed_qs["advancedFilter"][0].splitlines()
date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date"
expected_filter = [
'resource.type="global"',
'logName="projects/project_id/logs/airflow"',
f'labels.task_id="{self.ti.task_id}"',
f'labels.dag_id="{self.DAG_ID}"',
f'labels.{date_label}="{self.ti.logical_date.isoformat() if AIRFLOW_V_3_0_PLUS else self.ti.execution_date.isoformat()}"',
f'labels.try_number="{self.ti.try_number}"',
]
if hasattr(self.ti, "id"):
expected_filter = [
'resource.type="global"',
'logName="projects/project_id/logs/airflow"',
f'(labels.task_instance_id="{str(self.ti.id)}" OR (labels.task_id="{self.ti.task_id}" AND '
f'labels.dag_id="{self.DAG_ID}" AND '
f'labels.{date_label}="{self.ti.logical_date.isoformat() if AIRFLOW_V_3_0_PLUS else self.ti.execution_date.isoformat()}" AND '
f'labels.try_number="{self.ti.try_number}"))',
]
else:
expected_filter = [
'resource.type="global"',
'logName="projects/project_id/logs/airflow"',
f'labels.task_id="{self.ti.task_id}"',
f'labels.dag_id="{self.DAG_ID}"',
f'labels.{date_label}="{self.ti.logical_date.isoformat() if AIRFLOW_V_3_0_PLUS else self.ti.execution_date.isoformat()}"',
f'labels.try_number="{self.ti.try_number}"',
]
assert set(expected_filter) == set(filter_params)


Expand Down
Loading