Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Metaflow API Docs

- [BatchInferencePipeline](docs/metaflow/batch_inference_pipeline.md)
- [create_ownership_registry_view](docs/metaflow/create_ownership_registry_view.md)
- [make_pydantic_parser_fn](docs/metaflow/make_pydantic_parser_fn.md)
- [publish](docs/metaflow/publish.md)
- [publish_pandas](docs/metaflow/publish_pandas.md)
Expand Down
46 changes: 46 additions & 0 deletions docs/metaflow/create_ownership_registry_view.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# `create_ownership_registry_view`

Source: `ds_platform_utils.metaflow.registry.create_ownership_registry_view`

Creates (or replaces) the central **table-ownership registry view**,
`PATTERN_DB.DATA_SCIENCE.TABLE_OWNERSHIP_REGISTRY`. The view pivots the object tags
applied by [`publish`](publish.md) / [`publish_pandas`](publish_pandas.md) into one row
per table, exposing `owner`, `team`, `domain`, `project`, `status`, `sla` and `contact`.

This is a one-time admin helper.

## Signature

```python
create_ownership_registry_view(conn: SnowflakeConnection | None = None) -> str
```

| Parameter | Type | Required | Description |
| --------- | ----------------------------- | -------: | ------------------------------------------------------------------------ |
| `conn` | `SnowflakeConnection \| None` | No | Open Snowflake connection. If omitted, one is created via `get_snowflake_connection()`. |

**Returns:** the executed `CREATE OR REPLACE VIEW` SQL string.

## Usage

```python
from ds_platform_utils.metaflow import create_ownership_registry_view

create_ownership_registry_view()
```

Then query it:

```sql
SELECT * FROM PATTERN_DB.DATA_SCIENCE.TABLE_OWNERSHIP_REGISTRY
ORDER BY team, table_name;
```

## Notes

- **No refresh needed.** A view is not materialized — it re-runs its query on every read,
so it is always live.
- **~2h lag.** The view reads `SNOWFLAKE.ACCOUNT_USAGE.TAG_REFERENCES`, which itself lags
up to ~2 hours. For the current value of a single table's tag, use
`SYSTEM$GET_TAG('PATTERN_DB.DATA_SCIENCE.TABLE_OWNER', '<table>', 'table')` instead.
- **Adoption-based.** Only tables that have at least one ownership tag appear in the view.
48 changes: 48 additions & 0 deletions docs/metaflow/publish.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ publish(
ctx: dict[str, Any] | None = None,
warehouse: Literal["XS", "MED", "XL"] = None,
use_utc: bool = True,
tags: dict[str, str] | None = None,
) -> None
```

Expand All @@ -22,6 +23,8 @@ publish(
- Reads SQL from a string or `.sql` path.
- Runs write/audit/publish operations through Snowflake.
- Adds operation details and table links to the Metaflow card when available.
- **Automatically applies ownership object tags to production tables** (see
[Ownership tags](#ownership-tags) below).

## Parameters

Expand All @@ -33,6 +36,7 @@ publish(
| `ctx` | `dict[str, Any] \| None` | No | Optional template substitution context for SQL operations. |
| `warehouse` | `Literal["XS", "MED", "XL"] \| None` | No | Snowflake warehouse override for this operation. Supports `XS`/`MED`/`XL` shortcuts or a full warehouse name. |
| `use_utc` | `bool` | No | If `True`, uses UTC timezone for Snowflake session. |
| `tags` | `dict[str, str] \| None` | No | Overrides for the ownership object tags applied to the published table. See [Ownership tags](#ownership-tags).|

**Returns:** `None`

Expand All @@ -47,3 +51,47 @@ publish(
audits=["SELECT COUNT(*) > 0 FROM PATTERN_DB.{{schema}}.{{table_name}}"],
)
```

## Ownership tags

When publishing to **production**, `publish()` automatically applies the table-ownership
object tags from the table-ownership RFC. The seven tags are:

| Tag | Source | Always set? |
| --------------- | ------------------------------------------------------- | --------------- |
| `TABLE_OWNER` | Metaflow `current.username` | yes |
| `TABLE_TEAM` | `data-science` | yes |
| `TABLE_DOMAIN` | `ds.domain` Metaflow tag, else `unknown` | yes |
| `TABLE_PROJECT` | `ds.project` Metaflow tag, else `unknown` | yes |
| `TABLE_STATUS` | `active` (override allows `active`/`development`/`testing`/`deprecated`/`archived`/`retired`) | yes |
| `TABLE_SLA` | override only (`streaming`/`realtime`/`hourly`/`daily`/`weekly`/`monthly`/`quarterly`/`ad_hoc`/`on_demand`) | only if given |
| `TABLE_CONTACT` | override only (Slack channel or email) | only if given |

> **`TABLE_DOMAIN` / `TABLE_PROJECT` depend on flow tags.** These are read from the
> `ds.domain` / `ds.project` Metaflow tags. If a flow runs without them, the value falls
> back to the literal string `unknown` and a warning is printed (the same warning used
> for select.dev cost tracking). Make sure your flow carries `--tag "ds.domain:..."` and
> `--tag "ds.project:..."` — these are applied automatically in CI and the standard `poe`
> run commands in the monorepo — or pass `tags={"domain": ..., "project": ...}` explicitly.

Pass `tags=` to override any value. Keys may be `owner`/`team`/`domain`/`project`/
`status`/`sla`/`contact` (optionally `TABLE_`-prefixed):

```python
publish(
table_name="OUT_OF_STOCK_ADS",
query="sql/create_training_data.sql",
tags={"sla": "daily", "contact": "#ds-recsys", "status": "active"},
)
```

Notes:

- Tags are applied **only to production tables**. Non-prod (`DATA_SCIENCE_STAGE`) runs
apply no tags.
- The tag *definitions* must first be created once by a Snowflake admin (the RFC
`CREATE TAG` setup). Until then, tagging is **skipped with a warning** — the publish
still succeeds.
- Invalid `status`/`sla` values raise `ValueError` before any data is written.
- Tagged tables surface in the `TABLE_OWNERSHIP_REGISTRY` view (see
`create_ownership_registry_view`).
28 changes: 28 additions & 0 deletions docs/metaflow/publish_pandas.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ publish_pandas(
use_utc: bool = True,
use_s3_stage: bool = False,
table_definition: list[tuple[str, str]] | None = None,
tags: dict[str, str] | None = None,
) -> None
```

Expand All @@ -30,6 +31,8 @@ publish_pandas(
- Validates DataFrame input.
- Writes directly via `write_pandas` or via S3 stage flow for large data.
- Adds a Snowflake table URL to Metaflow card output.
- **Automatically applies ownership object tags to production tables** (see
[Ownership tags](#ownership-tags) below).

## Parameters

Expand All @@ -49,9 +52,34 @@ publish_pandas(
| `use_utc` | `bool` | No | If `True`, uses UTC timezone for Snowflake session. |
| `use_s3_stage` | `bool` | No | If `True`, publishes via S3 stage flow; otherwise uses direct `write_pandas`. |
| `table_definition` | `list[tuple[str, str]] \| None` | No | Optional Snowflake table schema; used by S3 stage flow when table creation is needed. |
| `tags` | `dict[str, str] \| None` | No | Overrides for the ownership object tags applied to the published table. See [Ownership tags](#ownership-tags).|

**Returns:** `None`

## Ownership tags

When publishing to **production**, `publish_pandas()` automatically applies the same
seven table-ownership object tags as [`publish`](publish.md#ownership-tags):
`TABLE_OWNER`, `TABLE_TEAM`, `TABLE_DOMAIN`, `TABLE_PROJECT`, `TABLE_STATUS` and
(when provided via `tags=`) `TABLE_SLA` / `TABLE_CONTACT`.

```python
publish_pandas(
table_name="MY_TABLE",
df=df,
tags={"sla": "daily", "contact": "#ds-recsys"},
)
```

- Tags are applied **only to production tables**; non-prod runs apply none.
- `TABLE_DOMAIN` / `TABLE_PROJECT` come from the `ds.domain` / `ds.project` Metaflow tags;
if a flow runs without them they fall back to the literal `unknown` and a warning is
printed. Ensure the flow carries those tags (automatic in CI / standard `poe` commands)
or pass `tags={"domain": ..., "project": ...}`. See [`publish`](publish.md#ownership-tags).
- Tag *definitions* must first be created by a Snowflake admin (RFC `CREATE TAG` setup);
until then tagging is **skipped with a warning** and the publish still succeeds.
- Invalid `status`/`sla` values raise `ValueError` before any data is written.

## Limitations

- When `use_s3_stage=True`, some column data types may not map exactly as expected between pandas/parquet and Snowflake.
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
[project]
name = "ds-platform-utils"
version = "0.4.2"
version = "0.5.0"
description = "Utility library for Pattern Data Science."
readme = "README.md"
authors = [
{ name = "Amit Vikram Raj", email = "amit.raj@pattern.com" },
{ name = "Eric Riddoch", email = "eric.riddoch@pattern.com" }
{ name = "Eric Riddoch", email = "eric.riddoch@pattern.com" },
{ name = "Vinay Shende", email = "vinay.shende@pattern.com" }
]
# requires-python = ">=3.7"
dependencies = [
Expand Down
202 changes: 202 additions & 0 deletions src/ds_platform_utils/_snowflake/object_tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
"""Build and apply Snowflake object tags for table ownership / governance.

Implements the tag schema from the "Snowflake table ownership via object tags" RFC.
Tags are applied only to production tables, so both the tag *definitions* and the
*tables* live in ``PATTERN_DB.DATA_SCIENCE``.

The tag *definitions* must be created once by a Snowflake admin (see the RFC's
``CREATE TAG`` setup). Until they exist, :func:`apply_table_tags` warns and leaves the
(already successful) table write untouched -- tagging must never break a publish.
"""

import re
from typing import TYPE_CHECKING, Dict, Optional

from ds_platform_utils._snowflake.run_query import _execute_sql
from ds_platform_utils.metaflow._consts import PROD_SCHEMA
from ds_platform_utils.sql_utils import get_select_dev_query_tags

if TYPE_CHECKING:
from snowflake.connector import SnowflakeConnection

DATABASE = "PATTERN_DB"

# A Snowflake unquoted identifier: starts with a letter/underscore, then letters/digits/underscores.
# Identifiers (table name, schema, tag names) are interpolated directly into the SET TAG SQL, so we
# reject anything else to avoid malformed SQL or statement injection. (Tag *values* are safely
# single-quoted + escaped via _quote and are not subject to this check.)
_IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")

# RFC allowed-value lists for the constrained tags.
TABLE_STATUS_ALLOWED = {"active", "development", "testing", "deprecated", "archived", "retired"}
TABLE_SLA_ALLOWED = {
"streaming",
"realtime",
"hourly",
"daily",
"weekly",
"monthly",
"quarterly",
"ad_hoc",
"on_demand",
}
DEFAULT_TABLE_STATUS = "active"

# All seven RFC tag names.
TAG_OWNER = "TABLE_OWNER"
TAG_TEAM = "TABLE_TEAM"
TAG_DOMAIN = "TABLE_DOMAIN"
TAG_PROJECT = "TABLE_PROJECT"
TAG_STATUS = "TABLE_STATUS"
TAG_SLA = "TABLE_SLA"
TAG_CONTACT = "TABLE_CONTACT"

# Maps accepted override keys (case-insensitive, with or without the ``TABLE_`` prefix)
# to the canonical tag name.
_OVERRIDE_ALIASES = {
"owner": TAG_OWNER,
"team": TAG_TEAM,
"domain": TAG_DOMAIN,
"project": TAG_PROJECT,
"status": TAG_STATUS,
"sla": TAG_SLA,
"contact": TAG_CONTACT,
}


def _normalize_overrides(tags_override: Optional[Dict[str, str]]) -> Dict[str, str]:
"""Normalize caller override keys to canonical tag names.

Accepts e.g. ``owner``, ``OWNER`` or ``TABLE_OWNER`` -> ``TABLE_OWNER``.

:param tags_override: Raw override dict supplied by the caller.
:return: Override dict keyed by canonical tag name.
:raises ValueError: If an override key does not map to a known tag.
"""
normalized: Dict[str, str] = {}
for key, value in (tags_override or {}).items():
canonical = _OVERRIDE_ALIASES.get(key.strip().lower().removeprefix("table_"))
if canonical is None:
raise ValueError(
f"Unknown tag override key {key!r}. Allowed keys: {sorted(_OVERRIDE_ALIASES)} "
f"(optionally prefixed with 'TABLE_')."
)
normalized[canonical] = value
return normalized


def build_table_tags(
tags_override: Optional[Dict[str, str]] = None,
current_obj: Optional[object] = None,
) -> Dict[str, str]:
"""Build the final ``{TAG_NAME: value}`` dict to apply to a published table.

OWNER / TEAM / DOMAIN / PROJECT are derived from the Metaflow run context (reusing
:func:`get_select_dev_query_tags`); STATUS defaults to ``active``. Any value may be
overridden via ``tags_override``. SLA and CONTACT are only included when supplied
via ``tags_override`` (they cannot be inferred).

:param tags_override: Optional overrides, keyed by ``owner``/``TABLE_OWNER``/etc.
:param current_obj: Optional Metaflow ``current`` stand-in (for testing).
:return: Mapping of canonical tag name to value, ready to apply.
:raises ValueError: If STATUS or SLA is not in its allowed-value list, or an
override key is unknown.
"""
overrides = _normalize_overrides(tags_override)
derived = get_select_dev_query_tags(current_obj=current_obj)

tags: Dict[str, str] = {
TAG_OWNER: derived["user"],
TAG_TEAM: derived["team"],
TAG_DOMAIN: derived["domain"],
TAG_PROJECT: derived["workload_id"],
TAG_STATUS: DEFAULT_TABLE_STATUS,
}
# SLA / CONTACT are only set when explicitly provided.
tags.update(overrides)

status = tags[TAG_STATUS]
if status not in TABLE_STATUS_ALLOWED:
raise ValueError(f"TABLE_STATUS must be one of {sorted(TABLE_STATUS_ALLOWED)}, got {status!r}.")

sla = tags.get(TAG_SLA)
if sla is not None and sla not in TABLE_SLA_ALLOWED:
raise ValueError(f"TABLE_SLA must be one of {sorted(TABLE_SLA_ALLOWED)}, got {sla!r}.")

# Drop any tags whose value is None/empty so we never emit ``= ''``.
return {name: str(value) for name, value in tags.items() if value is not None and str(value) != ""}


def _quote(value: str) -> str:
"""Escape a tag value for a single-quoted SQL literal (double embedded quotes)."""
return value.replace("'", "''")


def _validate_identifier(value: str, kind: str) -> None:
"""Reject anything that isn't a plain unquoted SQL identifier.

Identifiers are interpolated unquoted into the ``SET TAG`` SQL, so a value containing
e.g. ``;`` or whitespace could produce invalid SQL or statement injection.

:param value: Identifier to check (table name, schema, or tag name).
:param kind: Human-readable label used in the error message.
:raises ValueError: If ``value`` is not a valid unquoted identifier.
"""
if not _IDENTIFIER_RE.match(value):
raise ValueError(f"Invalid {kind} {value!r}; expected an unquoted identifier (letters/numbers/underscore).")


def build_set_tag_sql(table_name: str, tags: Dict[str, str], schema: str = PROD_SCHEMA) -> str:
"""Build a single ``ALTER TABLE ... SET TAG`` statement.

Tag definitions and the table both live in ``schema`` (``DATA_SCIENCE`` for prod).

:param table_name: Table to tag (upper-cased to match Snowflake's stored identifier).
:param tags: Mapping of tag name to value (e.g. from :func:`build_table_tags`).
:param schema: Schema holding both the table and the tag definitions.
:return: The ``ALTER TABLE`` SQL string.
:raises ValueError: If ``tags`` is empty, or any identifier (table/schema/tag name) is invalid.
"""
if not tags:
raise ValueError("No tags to apply.")
table = table_name.upper()
_validate_identifier(table, "table_name")
_validate_identifier(schema, "schema")
for name in tags:
_validate_identifier(name, "tag name")
assignments = ",\n ".join(f"{DATABASE}.{schema}.{name} = '{_quote(value)}'" for name, value in tags.items())
return f"ALTER TABLE {DATABASE}.{schema}.{table}\n SET TAG\n {assignments};"


def apply_table_tags(
conn: "SnowflakeConnection",
table_name: str,
tags: Dict[str, str],
schema: str = PROD_SCHEMA,
) -> None:
"""Apply object tags to a published table, warning (never raising) on failure.

A failure here most commonly means the tag definitions have not yet been created by
an admin (see the RFC ``CREATE TAG`` setup). Because the table write has already
succeeded by this point, we log a clear warning and return rather than breaking the
publish.

:param conn: Open Snowflake connection.
:param table_name: Table to tag.
:param tags: Mapping of tag name to value.
:param schema: Schema holding both the table and the tag definitions.
"""
if not tags:
return
try:
# Built inside the try so identifier-validation errors warn-and-skip rather than break publish.
sql = build_set_tag_sql(table_name=table_name, tags=tags, schema=schema)
_execute_sql(conn, sql)
conn.commit()
print(f"Applied ownership tags to {DATABASE}.{schema}.{table_name.upper()}: {sorted(tags)}")
except Exception as exc: # noqa: BLE001 -- tagging must never break a successful publish
print(
f"Warning: failed to apply ownership tags to {DATABASE}.{schema}.{table_name.upper()} "
f"({exc}). The table was published successfully; tags were skipped. This usually means the "
f"tag definitions have not been created yet by a Snowflake admin (see the table-ownership RFC)."
)
Loading
Loading