Skip to content

Commit cc7311e

Browse files
committed
[DOP-31721] Log applied SQL transformation
1 parent 3245a95 commit cc7311e

4 files changed

Lines changed: 20 additions & 0 deletions

File tree

syncmaster/worker/handlers/db/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from __future__ import annotations
55

6+
import logging
67
from abc import abstractmethod
78
from typing import TYPE_CHECKING, Any, ClassVar
89

@@ -20,6 +21,9 @@
2021
from syncmaster.dto.transfers import DBTransferDTO
2122

2223

24+
logger = logging.getLogger(__name__)
25+
26+
2327
class DBHandler(Handler):
2428
connection: BaseDBConnection
2529
transfer_dto: DBTransferDTO
@@ -62,6 +66,7 @@ def read(self) -> DataFrame:
6266

6367
sql_query = self._get_sql_query()
6468
if sql_query:
69+
logger.info("Applying SQL transformation:\n%s", sql_query)
6570
df.createOrReplaceTempView("source")
6671
df = self.connection.spark.sql(sql_query)
6772

syncmaster/worker/handlers/file/local_df.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from __future__ import annotations
55

6+
import logging
67
from pathlib import Path
78
from typing import TYPE_CHECKING
89

@@ -18,6 +19,9 @@
1819
from pyspark.sql import DataFrame
1920

2021

22+
logger = logging.getLogger(__name__)
23+
24+
2125
class LocalDFFileHandler(FileHandler):
2226
local_df_connection: SparkLocalFS
2327

@@ -60,6 +64,7 @@ def read(self) -> DataFrame:
6064

6165
sql_query = self._get_sql_query()
6266
if sql_query:
67+
logger.info("Applying SQL transformation:\n%s", sql_query)
6368
df.createOrReplaceTempView("source")
6469
df = self.df_connection.spark.sql(sql_query)
6570

syncmaster/worker/handlers/file/remote_df.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from __future__ import annotations
44

5+
import logging
56
from pathlib import Path
67
from typing import TYPE_CHECKING
78

@@ -14,6 +15,9 @@
1415
from pyspark.sql import DataFrame
1516

1617

18+
logger = logging.getLogger(__name__)
19+
20+
1721
class RemoteDFFileHandler(FileHandler):
1822
def read(self) -> DataFrame:
1923
from pyspark.sql.types import StructType # noqa: PLC0415
@@ -37,6 +41,7 @@ def read(self) -> DataFrame:
3741

3842
sql_query = self._get_sql_query()
3943
if sql_query:
44+
logger.info("Applying SQL transformation:\n%s", sql_query)
4045
df.createOrReplaceTempView("source")
4146
df = self.df_connection.spark.sql(sql_query)
4247

syncmaster/worker/handlers/file/s3.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from __future__ import annotations
55

6+
import logging
67
from typing import TYPE_CHECKING
78

89
from onetl.file import FileDFReader
@@ -16,6 +17,9 @@
1617
from syncmaster.dto.connections import S3ConnectionDTO
1718

1819

20+
logger = logging.getLogger(__name__)
21+
22+
1923
@support_hooks
2024
class S3Handler(RemoteDFFileHandler):
2125
connection_dto: S3ConnectionDTO
@@ -73,6 +77,7 @@ def read(self) -> DataFrame:
7377

7478
sql_query = self._get_sql_query()
7579
if sql_query:
80+
logger.info("Applying SQL transformation:\n%s", sql_query)
7681
df.createOrReplaceTempView("source")
7782
df = self.df_connection.spark.sql(sql_query)
7883

0 commit comments

Comments
 (0)