Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 44 additions & 28 deletions src/intugle/adapters/types/bigquery/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time

from typing import TYPE_CHECKING, Any, Optional

import numpy as np
Expand All @@ -8,7 +9,12 @@
from intugle.adapters.factory import AdapterFactory
from intugle.adapters.models import ColumnProfile, DataSetData, ProfilingOutput
from intugle.adapters.types.bigquery.models import BigQueryConfig, BigQueryConnectionConfig
from intugle.adapters.utils import convert_to_native
from intugle.adapters.utils import (
convert_to_native,
quote_identifier,
quote_identifier_parts,
split_identifier_path,
)
from intugle.core import settings
from intugle.core.utilities.processing import string_standardization

Expand Down Expand Up @@ -97,16 +103,13 @@ def connect(self):

def _get_fqn(self, identifier: str) -> str:
"""Gets the fully qualified name for a table identifier."""
if "." in identifier:
# Already has project or dataset prefix
parts = identifier.split(".")
if len(parts) == 2:
# dataset.table format
return f"`{self._project_id}.{identifier}`"
elif len(parts) == 3:
# project.dataset.table format
return f"`{identifier}`"
return f"`{self._project_id}.{self._dataset_id}.{identifier}`"
parts = split_identifier_path(identifier, max_parts=3)
if len(parts) == 1:
parts = [self._project_id, self._dataset_id, parts[0]]
elif len(parts) == 2:
parts = [self._project_id, parts[0], parts[1]]

return quote_identifier_parts(parts, quote_char="`", compound=True)

@staticmethod
def check_data(data: Any) -> BigQueryConfig:
Expand All @@ -133,20 +136,32 @@ def profile(self, data: BigQueryConfig, table_name: str) -> ProfilingOutput:
"""Profile a BigQuery table."""
data = self.check_data(data)
fqn = self._get_fqn(data.identifier)
identifier_parts = split_identifier_path(data.identifier, max_parts=3)
if len(identifier_parts) == 1:
project_id, dataset_id, table_identifier = self._project_id, self._dataset_id, identifier_parts[0]
elif len(identifier_parts) == 2:
project_id, dataset_id, table_identifier = self._project_id, identifier_parts[0], identifier_parts[1]
else:
project_id, dataset_id, table_identifier = identifier_parts

# Get total count
count_query = f"SELECT COUNT(*) as count FROM {fqn}"
total_count = self._execute_sql(count_query)[0]["count"]

# Get column information from INFORMATION_SCHEMA
information_schema = quote_identifier_parts(
[project_id, dataset_id, "INFORMATION_SCHEMA", "COLUMNS"],
quote_char="`",
compound=True,
)
schema_query = f"""
SELECT column_name, data_type
FROM `{self._project_id}.{self._dataset_id}.INFORMATION_SCHEMA.COLUMNS`
FROM {information_schema}
WHERE table_name = @table_name
ORDER BY ordinal_position
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("table_name", "STRING", data.identifier)]
query_parameters=[bigquery.ScalarQueryParameter("table_name", "STRING", table_identifier)]
)
query_job = self.client.query(schema_query, job_config=job_config)
rows = [dict(row) for row in query_job.result()]
Expand All @@ -172,13 +187,14 @@ def column_profile(
"""Profile a specific column in a BigQuery table."""
data = self.check_data(data)
fqn = self._get_fqn(data.identifier)
safe_column_name = quote_identifier(column_name, quote_char="`")
start_ts = time.time()

# Null and distinct counts
query = f"""
SELECT
COUNTIF(`{column_name}` IS NULL) as null_count,
COUNT(DISTINCT `{column_name}`) as distinct_count
COUNTIF({safe_column_name} IS NULL) as null_count,
COUNT(DISTINCT {safe_column_name}) as distinct_count
FROM {fqn}
"""
result = self._execute_sql(query)[0]
Expand All @@ -188,9 +204,9 @@ def column_profile(

# Sampling for distinct values
sample_query = f"""
SELECT DISTINCT CAST(`{column_name}` AS STRING) as value
SELECT DISTINCT CAST({safe_column_name} AS STRING) as value
FROM {fqn}
WHERE `{column_name}` IS NOT NULL
WHERE {safe_column_name} IS NOT NULL
LIMIT {dtype_sample_limit}
"""
distinct_values_result = self._execute_sql(sample_query)
Expand All @@ -209,9 +225,9 @@ def column_profile(
remaining_sample_size = dtype_sample_limit - len(distinct_values)
if remaining_sample_size > 0:
additional_samples_query = f"""
SELECT CAST(`{column_name}` AS STRING) as value
SELECT CAST({safe_column_name} AS STRING) as value
FROM {fqn}
WHERE `{column_name}` IS NOT NULL
WHERE {safe_column_name} IS NOT NULL
ORDER BY RAND()
LIMIT {remaining_sample_size}
"""
Expand Down Expand Up @@ -295,18 +311,20 @@ def intersect_count(
data2 = self.check_data(table2.data)
fqn1 = self._get_fqn(data1.identifier)
fqn2 = self._get_fqn(data2.identifier)
col1 = quote_identifier(column1_name, quote_char="`")
col2 = quote_identifier(column2_name, quote_char="`")

query = f"""
SELECT COUNT(*) as count
FROM (
SELECT DISTINCT `{column1_name}` as key
SELECT DISTINCT {col1} as key
FROM {fqn1}
WHERE `{column1_name}` IS NOT NULL
WHERE {col1} IS NOT NULL
) t1
INNER JOIN (
SELECT DISTINCT `{column2_name}` as key
SELECT DISTINCT {col2} as key
FROM {fqn2}
WHERE `{column2_name}` IS NOT NULL
WHERE {col2} IS NOT NULL
) t2
ON t1.key = t2.key
"""
Expand All @@ -319,8 +337,7 @@ def get_composite_key_uniqueness(
data = self.check_data(dataset_data)
fqn = self._get_fqn(data.identifier)

# Build column list with backticks
safe_columns = [f"`{col}`" for col in columns]
safe_columns = [quote_identifier(col, quote_char="`") for col in columns]
columns_str = ", ".join(safe_columns)

# Build null filter
Expand Down Expand Up @@ -352,9 +369,8 @@ def intersect_composite_keys_count(
fqn1 = self._get_fqn(data1.identifier)
fqn2 = self._get_fqn(data2.identifier)

# Build column lists with backticks
safe_columns1 = [f"`{col}`" for col in columns1]
safe_columns2 = [f"`{col}`" for col in columns2]
safe_columns1 = [quote_identifier(col, quote_char="`") for col in columns1]
safe_columns2 = [quote_identifier(col, quote_char="`") for col in columns2]

# Subquery for distinct keys from table 1
distinct_cols1 = ", ".join(safe_columns1)
Expand Down
Loading
Loading