diff --git a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py index 262f141fabb70..5109b16233829 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py @@ -78,6 +78,7 @@ LABEL_DAG_ID = "dag_id" LABEL_LOGICAL_DATE = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date" LABEL_TRY_NUMBER = "try_number" +LABEL_TASK_INSTANCE_ID = "task_instance_id" @attrs.define(kw_only=True) @@ -220,8 +221,24 @@ def escape_label_value(value: str) -> str: for key, value in self.resource.labels.items(): log_filters.append(f"resource.labels.{escape_label_key(key)}={escape_label_value(value)}") - for key, value in ti_labels.items(): - log_filters.append(f"labels.{escape_label_key(key)}={escape_label_value(value)}") + ti_id_val = ti_labels.get(LABEL_TASK_INSTANCE_ID) + legacy_filters = [ + f"labels.{escape_label_key(k)}={escape_label_value(v)}" + for k, v in ti_labels.items() + if k != LABEL_TASK_INSTANCE_ID + ] + + if ti_id_val: + ti_id_filter = ( + f"labels.{escape_label_key(LABEL_TASK_INSTANCE_ID)}={escape_label_value(ti_id_val)}" + ) + if legacy_filters: + log_filters.append(f"({ti_id_filter} OR ({' AND '.join(legacy_filters)}))") + else: + log_filters.append(ti_id_filter) + else: + log_filters.extend(legacy_filters) + return "\n".join(log_filters) def read_logs( @@ -268,14 +285,18 @@ def _read_single_logs_page(self, log_filter: str, page_token: str | None = None) def _task_instance_to_labels(ti) -> dict[str, str]: """Convert a task instance to Stackdriver labels.""" - return { - LABEL_TASK_ID: ti.task_id, - LABEL_DAG_ID: ti.dag_id, + labels = { + LABEL_TASK_ID: str(ti.task_id), + LABEL_DAG_ID: str(ti.dag_id), LABEL_LOGICAL_DATE: str(ti.logical_date.isoformat()) if AIRFLOW_V_3_0_PLUS else str(ti.execution_date.isoformat()), LABEL_TRY_NUMBER: str(ti.try_number), } + ti_id = getattr(ti, "id", None) + if ti_id: + labels[LABEL_TASK_INSTANCE_ID] = str(ti_id) + return labels class StackdriverTaskHandler(logging.Handler): @@ -314,6 +335,7 @@ class StackdriverTaskHandler(logging.Handler): LABEL_DAG_ID = LABEL_DAG_ID LABEL_LOGICAL_DATE = LABEL_LOGICAL_DATE LABEL_TRY_NUMBER = LABEL_TRY_NUMBER + LABEL_TASK_INSTANCE_ID = LABEL_TASK_INSTANCE_ID LOG_VIEWER_BASE_URL = "https://console.cloud.google.com/logs/viewer" LOG_NAME = "Google Stackdriver" @@ -451,14 +473,18 @@ def read( @classmethod def _task_instance_to_labels(cls, ti: TaskInstance) -> dict[str, str]: - return { - cls.LABEL_TASK_ID: ti.task_id, - cls.LABEL_DAG_ID: ti.dag_id, + labels = { + cls.LABEL_TASK_ID: str(ti.task_id), + cls.LABEL_DAG_ID: str(ti.dag_id), cls.LABEL_LOGICAL_DATE: str(ti.logical_date.isoformat()) if AIRFLOW_V_3_0_PLUS else str(ti.execution_date.isoformat()), cls.LABEL_TRY_NUMBER: str(ti.try_number), } + ti_id = getattr(ti, "id", None) + if ti_id: + labels[cls.LABEL_TASK_INSTANCE_ID] = str(ti_id) + return labels @property def log_name(self): diff --git a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py index 0eb6c209f8fdb..8b23ebcfd81a3 100644 --- a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py @@ -193,6 +193,23 @@ def test_upload_no_delete(self, mock_transport_prop): def test_prepare_log_filter(self, mock_get_creds_and_project_id): mock_get_creds_and_project_id.return_value = ("creds", "project_id") + ti_labels = { + "task_id": "test_task", + "dag_id": "test_dag", + "try_number": "1", + "task_instance_id": "test_ti_id", + } + log_filter = self.io.prepare_log_filter(ti_labels) + + assert 'resource.type="global"' in log_filter + assert 'logName="projects/project_id/logs/airflow"' in log_filter + expected_or = '(labels.task_instance_id="test_ti_id" OR (labels.task_id="test_task" AND labels.dag_id="test_dag" AND labels.try_number="1"))' + assert expected_or in log_filter + + @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id") + def test_prepare_log_filter_legacy(self, mock_get_creds_and_project_id): + mock_get_creds_and_project_id.return_value = ("creds", "project_id") + ti_labels = { "task_id": "test_task", "dag_id": "test_dag", @@ -204,6 +221,7 @@ def test_prepare_log_filter(self, mock_get_creds_and_project_id): assert 'logName="projects/project_id/logs/airflow"' in log_filter assert 'labels.task_id="test_task"' in log_filter assert 'labels.dag_id="test_dag"' in log_filter + assert " OR " not in log_filter @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id") def test_prepare_log_filter_with_custom_resource(self, mock_get_creds_and_project_id): @@ -386,6 +404,7 @@ def test_should_set_labels(self, mock_client, mock_get_creds_and_project_id): "dag_id": self.DAG_ID, date_key: "2016-01-01T00:00:00+00:00", "try_number": "1", + **({"task_instance_id": str(self.ti.id)} if hasattr(self.ti, "id") else {}), } resource = Resource(type="global", labels={}) self.transport_mock.return_value.send.assert_called_once_with( @@ -411,6 +430,7 @@ def test_should_append_labels(self, mock_client, mock_get_creds_and_project_id): "dag_id": self.DAG_ID, date_key: "2016-01-01T00:00:00+00:00", "try_number": "1", + **({"task_instance_id": str(self.ti.id)} if hasattr(self.ti, "id") else {}), "product.googleapis.com/task_id": "test-value", } resource = Resource(type="global", labels={}) @@ -431,13 +451,23 @@ def test_should_read_logs_for_all_try(self, mock_client, mock_get_creds_and_proj date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date" - filter_str = ( - 'resource.type="global"\n' - 'logName="projects/project_id/logs/airflow"\n' - 'labels.task_id="task_for_testing_stackdriver_task_handler"\n' - 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n' - f'labels.{date_label}="2016-01-01T00:00:00+00:00"' - ) + if hasattr(self.ti, "id"): + filter_str = ( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + f'(labels.task_instance_id="{str(self.ti.id)}" OR (' + 'labels.task_id="task_for_testing_stackdriver_task_handler" AND ' + 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler" AND ' + f'labels.{date_label}="2016-01-01T00:00:00+00:00"))' + ) + else: + filter_str = ( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + 'labels.task_id="task_for_testing_stackdriver_task_handler"\n' + 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n' + f'labels.{date_label}="2016-01-01T00:00:00+00:00"' + ) mock_client.return_value.list_log_entries.assert_called_once_with( request=ListLogEntriesRequest( resource_names=["projects/project_id"], @@ -463,13 +493,23 @@ def test_should_read_logs_for_task_with_quote(self, mock_client, mock_get_creds_ logs, metadata = stackdriver_task_handler.read(self.ti) date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date" - filter_str = ( - 'resource.type="global"\n' - 'logName="projects/project_id/logs/airflow"\n' - 'labels.task_id="K\\"OT"\n' - 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n' - f'labels.{date_label}="2016-01-01T00:00:00+00:00"' - ) + if hasattr(self.ti, "id"): + filter_str = ( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + f'(labels.task_instance_id="{str(self.ti.id)}" OR (' + 'labels.task_id="K\\"OT" AND ' + 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler" AND ' + f'labels.{date_label}="2016-01-01T00:00:00+00:00"))' + ) + else: + filter_str = ( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + 'labels.task_id="K\\"OT"\n' + 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n' + f'labels.{date_label}="2016-01-01T00:00:00+00:00"' + ) mock_client.return_value.list_log_entries.assert_called_once_with( request=ListLogEntriesRequest( resource_names=["projects/project_id"], @@ -493,14 +533,25 @@ def test_should_read_logs_for_single_try(self, mock_client, mock_get_creds_and_p logs, metadata = stackdriver_task_handler.read(self.ti, 3) date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date" - filter_str = ( - 'resource.type="global"\n' - 'logName="projects/project_id/logs/airflow"\n' - 'labels.task_id="task_for_testing_stackdriver_task_handler"\n' - 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n' - f'labels.{date_label}="2016-01-01T00:00:00+00:00"\n' - 'labels.try_number="3"' - ) + if hasattr(self.ti, "id"): + filter_str = ( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + f'(labels.task_instance_id="{str(self.ti.id)}" OR (' + 'labels.task_id="task_for_testing_stackdriver_task_handler" AND ' + 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler" AND ' + f'labels.{date_label}="2016-01-01T00:00:00+00:00" AND ' + 'labels.try_number="3"))' + ) + else: + filter_str = ( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + 'labels.task_id="task_for_testing_stackdriver_task_handler"\n' + 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n' + f'labels.{date_label}="2016-01-01T00:00:00+00:00"\n' + 'labels.try_number="3"' + ) mock_client.return_value.list_log_entries.assert_called_once_with( request=ListLogEntriesRequest( resource_names=["projects/project_id"], @@ -525,14 +576,25 @@ def test_should_read_logs_with_pagination(self, mock_client, mock_get_creds_and_ logs, metadata1 = stackdriver_task_handler.read(self.ti, 3) date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date" - filter_str = ( - 'resource.type="global"\n' - 'logName="projects/project_id/logs/airflow"\n' - 'labels.task_id="task_for_testing_stackdriver_task_handler"\n' - 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n' - f'labels.{date_label}="2016-01-01T00:00:00+00:00"\n' - 'labels.try_number="3"' - ) + if hasattr(self.ti, "id"): + filter_str = ( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + f'(labels.task_instance_id="{str(self.ti.id)}" OR (' + 'labels.task_id="task_for_testing_stackdriver_task_handler" AND ' + 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler" AND ' + f'labels.{date_label}="2016-01-01T00:00:00+00:00" AND ' + 'labels.try_number="3"))' + ) + else: + filter_str = ( + 'resource.type="global"\n' + 'logName="projects/project_id/logs/airflow"\n' + 'labels.task_id="task_for_testing_stackdriver_task_handler"\n' + 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n' + f'labels.{date_label}="2016-01-01T00:00:00+00:00"\n' + 'labels.try_number="3"' + ) mock_client.return_value.list_log_entries.assert_called_once_with( request=ListLogEntriesRequest( resource_names=["projects/project_id"], @@ -551,14 +613,7 @@ def test_should_read_logs_with_pagination(self, mock_client, mock_get_creds_and_ mock_client.return_value.list_log_entries.assert_called_with( request=ListLogEntriesRequest( resource_names=["projects/project_id"], - filter=( - 'resource.type="global"\n' - 'logName="projects/project_id/logs/airflow"\n' - 'labels.task_id="task_for_testing_stackdriver_task_handler"\n' - 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n' - f'labels.{date_label}="2016-01-01T00:00:00+00:00"\n' - 'labels.try_number="3"' - ), + filter=filter_str, order_by="timestamp asc", page_size=1000, page_token="TOKEN1", @@ -602,16 +657,29 @@ def test_should_read_logs_with_custom_resources(self, mock_client, mock_get_cred logs, metadata = stackdriver_task_handler.read(self.ti) date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date" - filter_str = ( - 'resource.type="cloud_composer_environment"\n' - 'logName="projects/project_id/logs/airflow"\n' - 'resource.labels."environment.name"="test-instance"\n' - 'resource.labels.location="europe-west-3"\n' - 'resource.labels.project_id="project_id"\n' - 'labels.task_id="task_for_testing_stackdriver_task_handler"\n' - 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n' - f'labels.{date_label}="2016-01-01T00:00:00+00:00"' - ) + if hasattr(self.ti, "id"): + filter_str = ( + 'resource.type="cloud_composer_environment"\n' + 'logName="projects/project_id/logs/airflow"\n' + 'resource.labels."environment.name"="test-instance"\n' + 'resource.labels.location="europe-west-3"\n' + 'resource.labels.project_id="project_id"\n' + f'(labels.task_instance_id="{str(self.ti.id)}" OR (' + 'labels.task_id="task_for_testing_stackdriver_task_handler" AND ' + 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler" AND ' + f'labels.{date_label}="2016-01-01T00:00:00+00:00"))' + ) + else: + filter_str = ( + 'resource.type="cloud_composer_environment"\n' + 'logName="projects/project_id/logs/airflow"\n' + 'resource.labels."environment.name"="test-instance"\n' + 'resource.labels.location="europe-west-3"\n' + 'resource.labels.project_id="project_id"\n' + 'labels.task_id="task_for_testing_stackdriver_task_handler"\n' + 'labels.dag_id="dag_for_testing_stackdriver_file_task_handler"\n' + f'labels.{date_label}="2016-01-01T00:00:00+00:00"' + ) mock_client.return_value.list_log_entries.assert_called_once_with( request=ListLogEntriesRequest( resource_names=["projects/project_id"], @@ -663,14 +731,24 @@ def test_should_return_valid_external_url(self, mock_client, mock_get_creds_and_ filter_params = parsed_qs["advancedFilter"][0].splitlines() date_label = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date" - expected_filter = [ - 'resource.type="global"', - 'logName="projects/project_id/logs/airflow"', - f'labels.task_id="{self.ti.task_id}"', - f'labels.dag_id="{self.DAG_ID}"', - f'labels.{date_label}="{self.ti.logical_date.isoformat() if AIRFLOW_V_3_0_PLUS else self.ti.execution_date.isoformat()}"', - f'labels.try_number="{self.ti.try_number}"', - ] + if hasattr(self.ti, "id"): + expected_filter = [ + 'resource.type="global"', + 'logName="projects/project_id/logs/airflow"', + f'(labels.task_instance_id="{str(self.ti.id)}" OR (labels.task_id="{self.ti.task_id}" AND ' + f'labels.dag_id="{self.DAG_ID}" AND ' + f'labels.{date_label}="{self.ti.logical_date.isoformat() if AIRFLOW_V_3_0_PLUS else self.ti.execution_date.isoformat()}" AND ' + f'labels.try_number="{self.ti.try_number}"))', + ] + else: + expected_filter = [ + 'resource.type="global"', + 'logName="projects/project_id/logs/airflow"', + f'labels.task_id="{self.ti.task_id}"', + f'labels.dag_id="{self.DAG_ID}"', + f'labels.{date_label}="{self.ti.logical_date.isoformat() if AIRFLOW_V_3_0_PLUS else self.ti.execution_date.isoformat()}"', + f'labels.try_number="{self.ti.try_number}"', + ] assert set(expected_filter) == set(filter_params)