Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloud_pipelines_backend/backend_types_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ class Secret(_TableBase):
__tablename__ = "secret"
user_id: orm.Mapped[str] = orm.mapped_column(primary_key=True, index=True)
secret_name: orm.Mapped[str] = orm.mapped_column(primary_key=True)
secret_value: orm.Mapped[str]
secret_value: orm.Mapped[str] = orm.mapped_column(sql.Text())
created_at: orm.Mapped[datetime.datetime]
updated_at: orm.Mapped[datetime.datetime]
expires_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column(default=None)
Expand Down
64 changes: 64 additions & 0 deletions cloud_pipelines_backend/database_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,3 +722,67 @@ def run_all_annotation_backfills(

_logger.info("Exit backfill for annotations table")
return result


def migrate_secret_value_column(
*,
db_engine: sqlalchemy.Engine,
) -> None:
"""Widen secret.secret_value to TEXT.

Idempotent: inspects the actual DB column and skips if already TEXT.
Dialect-aware: MySQL, PostgreSQL, SQLite.
"""
table = bts.Secret.__table__
column = table.c.secret_value
type_sql = column.type.compile(dialect=db_engine.dialect)

inspector = sqlalchemy.inspect(db_engine)
db_columns = {c["name"]: c for c in inspector.get_columns(table.name)}
col_type = db_columns[column.name]["type"]
dialect = db_engine.dialect.name

if isinstance(col_type, sqlalchemy.types.Text):
_logger.info(f"migrate column to TEXT: skipped (already TEXT)")
return

current_length = col_type.length if isinstance(col_type.length, int) else 0

_logger.info(
f"migrate column to TEXT: {table.name}.{column.name}"
f" — current_type={col_type}, current_length={current_length},"
f" target_type={type_sql}, dialect={dialect}"
)

try:
if dialect == "mysql":
alter_sql = (
f"ALTER TABLE {table.name} MODIFY COLUMN {column.name} {type_sql}"
)
elif dialect == "postgresql":
alter_sql = (
f"ALTER TABLE {table.name} ALTER COLUMN {column.name} TYPE {type_sql}"
)
elif dialect == "sqlite":
_logger.info(
f"migrate column to TEXT: SQLite does not enforce VARCHAR length"
f" and does not support ALTER COLUMN — no migration needed"
)
return
else:
_logger.warning(
f"migrate column to TEXT: unsupported dialect {dialect} — skipping"
)
return

_logger.info(f"migrate column to TEXT: executing SQL: {alter_sql}")
with db_engine.connect() as conn:
conn.execute(sqlalchemy.text(alter_sql))
conn.commit()
_logger.info(f"migrate column to TEXT: complete")
except Exception:
_logger.exception(
f"migrate column to TEXT failed: table={table.name},"
f" column={column.name}, current_type={col_type},"
f" target_type={type_sql}, dialect={dialect}"
)
2 changes: 2 additions & 0 deletions cloud_pipelines_backend/database_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def migrate_db(
index.create(db_engine, checkfirst=True)
break

database_migrations.migrate_secret_value_column(db_engine=db_engine)

if do_skip_backfill:
_logger.info("Skipping annotation backfills")
else:
Expand Down
107 changes: 107 additions & 0 deletions tests/test_database_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1929,3 +1929,110 @@ def test_migrate_db_skips_backfills_when_flag_set(self) -> None:
_count_annotations(session_factory=session_factory, key=pipeline_name_key)
== 0
)


# ---------------------------------------------------------------------------
# secret_value column migration
# ---------------------------------------------------------------------------


def test_migrate_secret_value_column_idempotent(
caplog: pytest.LogCaptureFixture,
) -> None:
"""First call ALTERs VARCHAR(255) to TEXT; second call skips (already TEXT)."""
db_engine = database_ops.create_db_engine(database_uri="sqlite://")
bts._TableBase.metadata.create_all(db_engine)

varchar_255 = sqlalchemy.types.String(length=255)
text_type = sqlalchemy.types.Text()

call_count = 0

def _fake_get_columns(
*,
table_name: str,
) -> list[dict[str, Any]]:
nonlocal call_count
call_count += 1
fake_type = varchar_255 if call_count == 1 else text_type
return [{"name": "secret_value", "type": fake_type}]

mock_conn = mock.MagicMock()
mock_connect = mock.MagicMock()
mock_connect.__enter__ = mock.MagicMock(return_value=mock_conn)
mock_connect.__exit__ = mock.MagicMock(return_value=False)

real_dialect = db_engine.dialect
fake_dialect = mock.MagicMock(wraps=real_dialect)
fake_dialect.name = "mysql"
fake_dialect.type_compiler_instance = real_dialect.type_compiler_instance

with (
mock.patch.object(db_engine, "dialect", fake_dialect),
mock.patch("sqlalchemy.inspect") as mock_inspect,
mock.patch.object(db_engine, "connect", return_value=mock_connect),
caplog.at_level(logging.INFO),
):
mock_inspect.return_value.get_columns.side_effect = (
lambda table_name: _fake_get_columns(table_name=table_name)
)

database_migrations.migrate_secret_value_column(db_engine=db_engine)
database_migrations.migrate_secret_value_column(db_engine=db_engine)

msgs = caplog.messages
assert len(msgs) == 4
assert (
"current_type=VARCHAR(255), current_length=255, target_type=TEXT, dialect=mysql"
in msgs[0]
)
assert (
"executing SQL: ALTER TABLE secret MODIFY COLUMN secret_value TEXT" in msgs[1]
)
assert "complete" in msgs[2]
assert "skipped (already TEXT)" in msgs[3]
mock_conn.execute.assert_called_once()
mock_conn.commit.assert_called_once()


def test_migrate_secret_value_column_mysql_alter_sql(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Mock MySQL dialect and verify the exact ALTER SQL generated."""
db_engine = database_ops.create_db_engine(database_uri="sqlite://")
bts._TableBase.metadata.create_all(db_engine)

real_dialect = db_engine.dialect

def _fake_get_columns(
*,
table_name: str,
) -> list[dict[str, Any]]:
return [{"name": "secret_value", "type": sqlalchemy.types.String(length=255)}]

mock_conn = mock.MagicMock()
mock_connect = mock.MagicMock()
mock_connect.__enter__ = mock.MagicMock(return_value=mock_conn)
mock_connect.__exit__ = mock.MagicMock(return_value=False)

fake_dialect = mock.MagicMock(wraps=real_dialect)
fake_dialect.name = "mysql"
fake_dialect.type_compiler_instance = real_dialect.type_compiler_instance

with (
mock.patch.object(db_engine, "dialect", fake_dialect),
mock.patch("sqlalchemy.inspect") as mock_inspect,
mock.patch.object(db_engine, "connect", return_value=mock_connect),
caplog.at_level(logging.INFO),
):
mock_inspect.return_value.get_columns.side_effect = (
lambda table_name: _fake_get_columns(table_name=table_name)
)

database_migrations.migrate_secret_value_column(db_engine=db_engine)

executed_sql = mock_conn.execute.call_args[0][0].text
assert executed_sql == "ALTER TABLE secret MODIFY COLUMN secret_value TEXT"
assert any("current_length=255" in msg for msg in caplog.messages)
assert any("dialect=mysql" in msg for msg in caplog.messages)
mock_conn.commit.assert_called_once()
Loading