From e0ac2d6a29b3659e8b5dbe4b62d092db21ba8338 Mon Sep 17 00:00:00 2001 From: Yue Chao Qin Date: Fri, 27 Mar 2026 16:04:38 -0700 Subject: [PATCH] fix: Migrate `secret_value` column to be TEXT --- cloud_pipelines_backend/backend_types_sql.py | 2 +- .../database_migrations.py | 64 +++++++++++ cloud_pipelines_backend/database_ops.py | 2 + tests/test_database_migrations.py | 107 ++++++++++++++++++ 4 files changed, 174 insertions(+), 1 deletion(-) diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index b984c6c..a89389f 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -523,7 +523,7 @@ class Secret(_TableBase): __tablename__ = "secret" user_id: orm.Mapped[str] = orm.mapped_column(primary_key=True, index=True) secret_name: orm.Mapped[str] = orm.mapped_column(primary_key=True) - secret_value: orm.Mapped[str] + secret_value: orm.Mapped[str] = orm.mapped_column(sql.Text()) created_at: orm.Mapped[datetime.datetime] updated_at: orm.Mapped[datetime.datetime] expires_at: orm.Mapped[datetime.datetime | None] = orm.mapped_column(default=None) diff --git a/cloud_pipelines_backend/database_migrations.py b/cloud_pipelines_backend/database_migrations.py index 746ba6b..c6b9d8d 100644 --- a/cloud_pipelines_backend/database_migrations.py +++ b/cloud_pipelines_backend/database_migrations.py @@ -722,3 +722,67 @@ def run_all_annotation_backfills( _logger.info("Exit backfill for annotations table") return result + + +def migrate_secret_value_column( + *, + db_engine: sqlalchemy.Engine, +) -> None: + """Widen secret.secret_value to TEXT. + + Idempotent: inspects the actual DB column and skips if already TEXT. + Dialect-aware: MySQL, PostgreSQL, SQLite. + """ + table = bts.Secret.__table__ + column = table.c.secret_value + type_sql = column.type.compile(dialect=db_engine.dialect) + + inspector = sqlalchemy.inspect(db_engine) + db_columns = {c["name"]: c for c in inspector.get_columns(table.name)} + col_type = db_columns[column.name]["type"] + dialect = db_engine.dialect.name + + if isinstance(col_type, sqlalchemy.types.Text): + _logger.info(f"migrate column to TEXT: skipped (already TEXT)") + return + + current_length = col_type.length if isinstance(col_type.length, int) else 0 + + _logger.info( + f"migrate column to TEXT: {table.name}.{column.name}" + f" — current_type={col_type}, current_length={current_length}," + f" target_type={type_sql}, dialect={dialect}" + ) + + try: + if dialect == "mysql": + alter_sql = ( + f"ALTER TABLE {table.name} MODIFY COLUMN {column.name} {type_sql}" + ) + elif dialect == "postgresql": + alter_sql = ( + f"ALTER TABLE {table.name} ALTER COLUMN {column.name} TYPE {type_sql}" + ) + elif dialect == "sqlite": + _logger.info( + f"migrate column to TEXT: SQLite does not enforce VARCHAR length" + f" and does not support ALTER COLUMN — no migration needed" + ) + return + else: + _logger.warning( + f"migrate column to TEXT: unsupported dialect {dialect} — skipping" + ) + return + + _logger.info(f"migrate column to TEXT: executing SQL: {alter_sql}") + with db_engine.connect() as conn: + conn.execute(sqlalchemy.text(alter_sql)) + conn.commit() + _logger.info(f"migrate column to TEXT: complete") + except Exception: + _logger.exception( + f"migrate column to TEXT failed: table={table.name}," + f" column={column.name}, current_type={col_type}," + f" target_type={type_sql}, dialect={dialect}" + ) diff --git a/cloud_pipelines_backend/database_ops.py b/cloud_pipelines_backend/database_ops.py index caf889e..ed8c464 100644 --- a/cloud_pipelines_backend/database_ops.py +++ b/cloud_pipelines_backend/database_ops.py @@ -104,6 +104,8 @@ def migrate_db( index.create(db_engine, checkfirst=True) break + database_migrations.migrate_secret_value_column(db_engine=db_engine) + if do_skip_backfill: _logger.info("Skipping annotation backfills") else: diff --git a/tests/test_database_migrations.py b/tests/test_database_migrations.py index 288168a..4ee41ac 100644 --- a/tests/test_database_migrations.py +++ b/tests/test_database_migrations.py @@ -1929,3 +1929,110 @@ def test_migrate_db_skips_backfills_when_flag_set(self) -> None: _count_annotations(session_factory=session_factory, key=pipeline_name_key) == 0 ) + + +# --------------------------------------------------------------------------- +# secret_value column migration +# --------------------------------------------------------------------------- + + +def test_migrate_secret_value_column_idempotent( + caplog: pytest.LogCaptureFixture, +) -> None: + """First call ALTERs VARCHAR(255) to TEXT; second call skips (already TEXT).""" + db_engine = database_ops.create_db_engine(database_uri="sqlite://") + bts._TableBase.metadata.create_all(db_engine) + + varchar_255 = sqlalchemy.types.String(length=255) + text_type = sqlalchemy.types.Text() + + call_count = 0 + + def _fake_get_columns( + *, + table_name: str, + ) -> list[dict[str, Any]]: + nonlocal call_count + call_count += 1 + fake_type = varchar_255 if call_count == 1 else text_type + return [{"name": "secret_value", "type": fake_type}] + + mock_conn = mock.MagicMock() + mock_connect = mock.MagicMock() + mock_connect.__enter__ = mock.MagicMock(return_value=mock_conn) + mock_connect.__exit__ = mock.MagicMock(return_value=False) + + real_dialect = db_engine.dialect + fake_dialect = mock.MagicMock(wraps=real_dialect) + fake_dialect.name = "mysql" + fake_dialect.type_compiler_instance = real_dialect.type_compiler_instance + + with ( + mock.patch.object(db_engine, "dialect", fake_dialect), + mock.patch("sqlalchemy.inspect") as mock_inspect, + mock.patch.object(db_engine, "connect", return_value=mock_connect), + caplog.at_level(logging.INFO), + ): + mock_inspect.return_value.get_columns.side_effect = ( + lambda table_name: _fake_get_columns(table_name=table_name) + ) + + database_migrations.migrate_secret_value_column(db_engine=db_engine) + database_migrations.migrate_secret_value_column(db_engine=db_engine) + + msgs = caplog.messages + assert len(msgs) == 4 + assert ( + "current_type=VARCHAR(255), current_length=255, target_type=TEXT, dialect=mysql" + in msgs[0] + ) + assert ( + "executing SQL: ALTER TABLE secret MODIFY COLUMN secret_value TEXT" in msgs[1] + ) + assert "complete" in msgs[2] + assert "skipped (already TEXT)" in msgs[3] + mock_conn.execute.assert_called_once() + mock_conn.commit.assert_called_once() + + +def test_migrate_secret_value_column_mysql_alter_sql( + caplog: pytest.LogCaptureFixture, +) -> None: + """Mock MySQL dialect and verify the exact ALTER SQL generated.""" + db_engine = database_ops.create_db_engine(database_uri="sqlite://") + bts._TableBase.metadata.create_all(db_engine) + + real_dialect = db_engine.dialect + + def _fake_get_columns( + *, + table_name: str, + ) -> list[dict[str, Any]]: + return [{"name": "secret_value", "type": sqlalchemy.types.String(length=255)}] + + mock_conn = mock.MagicMock() + mock_connect = mock.MagicMock() + mock_connect.__enter__ = mock.MagicMock(return_value=mock_conn) + mock_connect.__exit__ = mock.MagicMock(return_value=False) + + fake_dialect = mock.MagicMock(wraps=real_dialect) + fake_dialect.name = "mysql" + fake_dialect.type_compiler_instance = real_dialect.type_compiler_instance + + with ( + mock.patch.object(db_engine, "dialect", fake_dialect), + mock.patch("sqlalchemy.inspect") as mock_inspect, + mock.patch.object(db_engine, "connect", return_value=mock_connect), + caplog.at_level(logging.INFO), + ): + mock_inspect.return_value.get_columns.side_effect = ( + lambda table_name: _fake_get_columns(table_name=table_name) + ) + + database_migrations.migrate_secret_value_column(db_engine=db_engine) + + executed_sql = mock_conn.execute.call_args[0][0].text + assert executed_sql == "ALTER TABLE secret MODIFY COLUMN secret_value TEXT" + assert any("current_length=255" in msg for msg in caplog.messages) + assert any("dialect=mysql" in msg for msg in caplog.messages) + mock_conn.commit.assert_called_once()