|
| 1 | +""" |
| 2 | +Deployment-time operations for configuring an existing DataJoint pipeline. |
| 3 | +
|
| 4 | +This module hosts idempotent operational helpers — things you run as part of a |
| 5 | +deploy hook to configure a schema for its environment, distinct from |
| 6 | +:mod:`datajoint.migrate` which handles one-shot schema/state evolution. |
| 7 | +
|
| 8 | +The boundary between the two: |
| 9 | +
|
| 10 | +- :mod:`datajoint.migrate` — fix legacy state, evolve a schema definition, |
| 11 | + retroactive corrections. Cadence: one-shot. Examples: ``migrate_columns``, |
| 12 | + ``add_job_metadata_columns``, ``rebuild_lineage``. |
| 13 | +- :mod:`datajoint.deploy` — configure an environment for a consumer's |
| 14 | + requirements (CDC tools, replication, role grants, performance tuning). |
| 15 | + Cadence: re-runnable, idempotent. Examples: :func:`set_replica_identity`. |
| 16 | +
|
| 17 | +Functions in this module should be safe to call repeatedly from a deploy hook |
| 18 | +without accumulating side effects. |
| 19 | +""" |
| 20 | + |
| 21 | +from __future__ import annotations |
| 22 | + |
| 23 | +from typing import TYPE_CHECKING, Any, Literal, Union |
| 24 | + |
| 25 | +from .errors import DataJointError |
| 26 | + |
| 27 | +if TYPE_CHECKING: |
| 28 | + from .schemas import _Schema |
| 29 | + from .table import Table |
| 30 | + |
| 31 | + TargetType = Union["_Schema", type["Table"], "Table"] |
| 32 | + |
| 33 | + |
| 34 | +def set_replica_identity( |
| 35 | + target: "TargetType", |
| 36 | + mode: Literal["default", "full"] = "full", |
| 37 | + dry_run: bool = True, |
| 38 | +) -> dict: |
| 39 | + """ |
| 40 | + Apply ``ALTER TABLE ... REPLICA IDENTITY <mode>`` to a schema or table on PostgreSQL. |
| 41 | +
|
| 42 | + ``REPLICA IDENTITY`` controls how much of the **old row** PostgreSQL writes to |
| 43 | + the write-ahead log on UPDATE/DELETE. Under ``DEFAULT``, only primary-key |
| 44 | + columns appear in WAL; under ``FULL``, the entire old row does. |
| 45 | +
|
| 46 | + Why this exists |
| 47 | + --------------- |
| 48 | + Some change-data-capture (CDC) consumers require the full row pre-image to |
| 49 | + drive their downstream models. The canonical example is **Databricks |
| 50 | + Lakehouse Sync**: tables without ``REPLICA IDENTITY FULL`` are silently |
| 51 | + skipped by the sync — no error, just missing data downstream. Other CDC |
| 52 | + tools (Debezium, ClickHouse ClickPipes, Azure CDC) work fine with |
| 53 | + ``DEFAULT`` when tables have a primary key; only Databricks mandates |
| 54 | + ``FULL``. |
| 55 | +
|
| 56 | + This helper is the **operational** way to apply the setting. It is not a |
| 57 | + migration: there's no legacy state being fixed; the setting is simply a |
| 58 | + property of the deployment environment, and a fresh declare in a new |
| 59 | + environment may need it re-applied. It is idempotent — re-applying the |
| 60 | + same mode is a no-op at the storage layer — so it is safe to call from a |
| 61 | + deploy hook on every release. |
| 62 | +
|
| 63 | + Cost |
| 64 | + ---- |
| 65 | + The ALTER itself is metadata-only and instant, but requires a brief |
| 66 | + ``AccessExclusiveLock`` on each table — it will block behind in-flight |
| 67 | + writes/reads on a busy table. Run during a quiet window on actively- |
| 68 | + ingested tables. |
| 69 | +
|
| 70 | + The ongoing cost is in WAL volume after the change: UPDATE/DELETE on |
| 71 | + tables with FULL log the entire old row, which can be sizable on tables |
| 72 | + with TOASTed bytea columns. For DataJoint's typical insert-append |
| 73 | + workload, this cost is negligible. The notable scenario is bulk |
| 74 | + ``delete()`` on tables with ``<blob>`` columns — a transient WAL burst |
| 75 | + proportional to the deleted-row payload size. |
| 76 | +
|
| 77 | + Partial-failure semantics |
| 78 | + ------------------------- |
| 79 | + If ``connection.query(ddl)`` raises on table N of M, the first N-1 |
| 80 | + tables are already modified at the storage layer but the exception |
| 81 | + propagates without returning the partial summary. The operation is |
| 82 | + idempotent, so re-running brings the remaining tables into compliance. |
| 83 | +
|
| 84 | + Compliance considerations |
| 85 | + ------------------------- |
| 86 | + Under ``DEFAULT``, only primary-key values appear in WAL. Under ``FULL``, |
| 87 | + entire rows do — including any PHI/PII/sensitive columns. For self-hosted |
| 88 | + PostgreSQL with unrestricted WAL access this is a real consideration; for |
| 89 | + managed PostgreSQL with logical replication confined to a specific |
| 90 | + subscriber (Lakebase, RDS), WAL stays inside the managed environment's |
| 91 | + security boundary. Apply intentionally. |
| 92 | +
|
| 93 | + Parameters |
| 94 | + ---------- |
| 95 | + target : Schema or Table |
| 96 | + A :class:`datajoint.Schema` (all user tables) or a |
| 97 | + :class:`datajoint.Table` class/instance (just that table). |
| 98 | + mode : str, default ``"full"`` |
| 99 | + ``"default"`` (PK only, minimal WAL) or ``"full"`` (entire row). |
| 100 | + dry_run : bool, default ``True`` |
| 101 | + If True, collect the DDL statements but do not execute. Set to False |
| 102 | + to actually apply. |
| 103 | +
|
| 104 | + Returns |
| 105 | + ------- |
| 106 | + dict |
| 107 | + - ``tables_analyzed`` (int): number of tables considered. |
| 108 | + - ``tables_modified`` (int): number of tables on which the ALTER ran. |
| 109 | + Always 0 when ``dry_run=True``. |
| 110 | + - ``ddl`` (list[str]): the DDL statements that were (or would be) executed. |
| 111 | +
|
| 112 | + Raises |
| 113 | + ------ |
| 114 | + DataJointError |
| 115 | + If the target's backend is not PostgreSQL, or if ``mode`` is not one of |
| 116 | + ``"default"`` / ``"full"``. |
| 117 | +
|
| 118 | + Examples |
| 119 | + -------- |
| 120 | + >>> from datajoint.deploy import set_replica_identity |
| 121 | + >>> # Preview |
| 122 | + >>> set_replica_identity(my_schema, mode="full", dry_run=True) |
| 123 | + {'tables_analyzed': 12, 'tables_modified': 0, 'ddl': ['ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL', ...]} |
| 124 | + >>> # Apply |
| 125 | + >>> set_replica_identity(my_schema, mode="full", dry_run=False) |
| 126 | + {'tables_analyzed': 12, 'tables_modified': 12, 'ddl': [...]} |
| 127 | + >>> # Single table |
| 128 | + >>> set_replica_identity(MyTable, mode="full", dry_run=False) |
| 129 | +
|
| 130 | + See Also |
| 131 | + -------- |
| 132 | + PostgreSQL: `Logical Replication — Replica Identity |
| 133 | + <https://www.postgresql.org/docs/current/logical-replication-publication.html>`_. |
| 134 | + Databricks: `Lakehouse Sync |
| 135 | + <https://docs.databricks.com/aws/en/oltp/projects/lakehouse-sync>`_. |
| 136 | + """ |
| 137 | + mode_normalized = mode.lower() if isinstance(mode, str) else mode |
| 138 | + if mode_normalized not in ("default", "full"): |
| 139 | + raise DataJointError(f"mode must be 'default' or 'full'; got {mode!r}") |
| 140 | + mode = mode_normalized # type: ignore[assignment] |
| 141 | + |
| 142 | + from .schemas import _Schema |
| 143 | + from .table import Table |
| 144 | + |
| 145 | + if isinstance(target, _Schema): |
| 146 | + connection = target.connection |
| 147 | + if connection is None: |
| 148 | + raise DataJointError("Schema has no active connection.") |
| 149 | + adapter = connection.adapter |
| 150 | + if target.database is None: |
| 151 | + raise DataJointError("Schema is not activated. Call schema.activate(...) before set_replica_identity().") |
| 152 | + tables = [adapter.make_full_table_name(target.database, t) for t in target.list_tables()] |
| 153 | + elif isinstance(target, type) and issubclass(target, Table): |
| 154 | + instance = target() |
| 155 | + connection = instance.connection |
| 156 | + if connection is None: |
| 157 | + raise DataJointError(f"Table {target.__name__} has no active connection.") |
| 158 | + adapter = connection.adapter |
| 159 | + tables = [instance.full_table_name] |
| 160 | + elif isinstance(target, Table): |
| 161 | + connection = target.connection |
| 162 | + if connection is None: |
| 163 | + raise DataJointError(f"Table {type(target).__name__} has no active connection.") |
| 164 | + adapter = connection.adapter |
| 165 | + tables = [target.full_table_name] |
| 166 | + else: |
| 167 | + raise DataJointError(f"target must be a Schema or Table class/instance; got {type(target).__name__}") |
| 168 | + |
| 169 | + if not hasattr(adapter, "replica_identity_ddl"): |
| 170 | + raise DataJointError( |
| 171 | + f"set_replica_identity is PostgreSQL-only; the {adapter.backend} adapter does not support REPLICA IDENTITY." |
| 172 | + ) |
| 173 | + |
| 174 | + result: dict[str, Any] = { |
| 175 | + "tables_analyzed": len(tables), |
| 176 | + "tables_modified": 0, |
| 177 | + "ddl": [], |
| 178 | + } |
| 179 | + for full_name in tables: |
| 180 | + ddl = adapter.replica_identity_ddl(full_name, mode) # type: ignore[attr-defined] |
| 181 | + result["ddl"].append(ddl) |
| 182 | + if not dry_run: |
| 183 | + connection.query(ddl) |
| 184 | + result["tables_modified"] += 1 |
| 185 | + return result |
0 commit comments