-
-
Notifications
You must be signed in to change notification settings - Fork 303
Zero-downtime migration tooling; widen File.file_size to bigint (expand stage) #5986
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: hotfixes
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| from django_prometheus.db.backends.postgresql.base import ( | ||
| DatabaseWrapper as PrometheusDatabaseWrapper, | ||
| ) | ||
| from django_zero_downtime_migrations.backends.postgres.schema import ( | ||
| DatabaseSchemaEditor, | ||
| ) | ||
|
|
||
|
|
||
| class DatabaseWrapper(PrometheusDatabaseWrapper): | ||
| """Prometheus query metrics + zero-downtime safe-DDL schema editor.""" | ||
|
|
||
| SchemaEditorClass = DatabaseSchemaEditor |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| import hashlib | ||
|
|
||
| import pgtrigger | ||
|
|
||
|
|
||
| def mirror_field(source, target): | ||
| """Mirror Django field `source` into `target` via a BEFORE INSERT/UPDATE | ||
| trigger (expand/contract dual-write).""" | ||
|
|
||
| def decorator(model): | ||
| source_col = model._meta.get_field(source).column | ||
| target_col = model._meta.get_field(target).column | ||
| name = "mirror_{}_to_{}".format(source_col, target_col) | ||
| if len(name) > 43: # stay safely under pgtrigger's trigger-name limit | ||
| digest = hashlib.sha1( | ||
| "{}_{}".format(source_col, target_col).encode() | ||
| ).hexdigest()[:8] | ||
| name = "mirror_{}".format(digest) | ||
| # Change-guard (IS DISTINCT FROM): keeps a read cutover from clobbering | ||
| # writes to the repointed column with the stale source value. | ||
| trigger = pgtrigger.Trigger( | ||
| name=name, | ||
| when=pgtrigger.Before, | ||
| operation=pgtrigger.Insert | pgtrigger.Update, | ||
| func="IF NEW.{s} IS DISTINCT FROM OLD.{s} THEN NEW.{t} = NEW.{s}; END IF; RETURN NEW;".format( | ||
| s=source_col, t=target_col | ||
| ), | ||
| ) | ||
| return pgtrigger.register(trigger)(model) | ||
|
|
||
| return decorator |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| import time | ||
|
|
||
| from django.apps import apps | ||
| from django.core.exceptions import FieldDoesNotExist | ||
| from django.core.management.base import BaseCommand | ||
| from django.core.management.base import CommandError | ||
| from django.db import transaction | ||
| from django.db.models import F | ||
|
|
||
|
|
||
| class Command(BaseCommand): | ||
| help = "Idempotent, resumable, throttled online backfill of one column into another, in batches." | ||
|
|
||
| def add_arguments(self, parser): | ||
| parser.add_argument("--model", required=True, help="app_label.ModelName") | ||
| parser.add_argument("--source-field", required=True) | ||
| parser.add_argument("--target-field", required=True) | ||
| parser.add_argument("--batch-size", type=int, default=1000) | ||
| parser.add_argument( | ||
| "--sleep", type=float, default=0.1, help="seconds between batches" | ||
| ) | ||
| parser.add_argument("--start-id", default=None, help="resume from this pk") | ||
| parser.add_argument( | ||
| "--verify", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah that sounds good |
||
| action="store_true", | ||
| help="report unbackfilled rows, exit nonzero if any", | ||
| ) | ||
|
|
||
| def _resolve_model_fields(self, model_label, source, target): | ||
| try: | ||
| model = apps.get_model(model_label) | ||
| except (LookupError, ValueError) as e: | ||
| raise CommandError("Bad --model {!r}: {}".format(model_label, e)) | ||
| try: | ||
| model._meta.get_field(source) | ||
| model._meta.get_field(target) | ||
| except FieldDoesNotExist as e: | ||
| raise CommandError(str(e)) | ||
| return model | ||
|
|
||
| def handle(self, *args, **options): | ||
| if options["sleep"] < 0: | ||
| raise CommandError("--sleep must be >= 0") | ||
| if options["batch_size"] < 1: | ||
| raise CommandError("--batch-size must be >= 1") | ||
| source = options["source_field"] | ||
| target = options["target_field"] | ||
| model = self._resolve_model_fields(options["model"], source, target) | ||
|
|
||
| pk_name = model._meta.pk.name | ||
| batch_size, throttle = options["batch_size"], options["sleep"] | ||
| only_unfilled = {target + "__isnull": True, source + "__isnull": False} | ||
| unfilled = model.objects.filter(**only_unfilled) | ||
| unfilled_pks = unfilled.order_by(pk_name).values_list("pk", flat=True) | ||
|
|
||
| if options["verify"]: | ||
| pending = unfilled.count() | ||
| self.stdout.write("{} rows still need backfill.".format(pending)) | ||
| if pending: | ||
| raise CommandError( | ||
| "backfill incomplete: {} rows pending".format(pending) | ||
| ) | ||
| return | ||
|
|
||
| # Start at the first unfilled pk (>= --start-id if given); re-runs and | ||
| # resumes skip straight past an already-filled prefix. | ||
| lo = unfilled_pks | ||
| if options["start_id"] is not None: | ||
| lo = lo.filter(pk__gte=options["start_id"]) | ||
| lo = lo.first() | ||
|
|
||
| total = 0 | ||
| while lo is not None: | ||
| # hi = batch_size-th pk at/after lo (None on the final short batch). | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes - explanatory code would be better than comments here. |
||
| # Keyset paging by pk — works for any pk type, int or UUID. | ||
| hi = ( | ||
| model.objects.filter(pk__gte=lo) | ||
| .order_by(pk_name) | ||
| .values_list("pk", flat=True)[batch_size - 1 : batch_size] | ||
| .first() | ||
| ) | ||
| window = {"pk__gte": lo} if hi is None else {"pk__gte": lo, "pk__lte": hi} | ||
| with transaction.atomic(): | ||
| total += model.objects.filter(**window, **only_unfilled).update( | ||
| **{target: F(source)} | ||
| ) | ||
| self.stdout.write( | ||
| "backfilled through pk={} (updated {} so far)".format( | ||
| hi if hi is not None else lo, total | ||
| ) | ||
| ) | ||
| if hi is None: | ||
| break | ||
| lo = unfilled_pks.filter(pk__gt=hi).first() | ||
| # Throttle between batches so WAL generation / replication lag and autovacuum | ||
| # can keep up on large tables. Pass --sleep 0 to disable. | ||
| if throttle: | ||
| time.sleep(throttle) | ||
|
Comment on lines
+95
to
+98
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This may not provide a meaningful benefit, especially for large tables like the file table. I don't think there is any worry about the WAL, but autovacuum would be triggered. I just don't know that the sleep time would necessarily align with its completion (likely much much longer). If there is concern, I think a more explicit approach to managing it would be beneficial:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I also argued with Claude quite a bit about this, and I relented because it was insistent. I think I mostly gave in out of fear because File is such a large table - but maybe we just assume we don't need the throttle at all, test it out on hotfixes and see whether that's a bad idea? |
||
| self.stdout.write("Done. {} rows updated.".format(total)) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| # Generated by Django 3.2.24 on 2026-06-23 05:56 | ||
| import pgtrigger.compiler | ||
| import pgtrigger.migrations | ||
| from django.db import migrations | ||
| from django.db import models | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ("contentcuration", "0166_add_usersubscription"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AddField( | ||
| model_name="file", | ||
| name="file_size_bigint", | ||
| field=models.BigIntegerField(blank=True, null=True), | ||
| ), | ||
| migrations.AddIndex( | ||
| model_name="file", | ||
| index=models.Index( | ||
| fields=["checksum", "file_size_bigint"], | ||
| name="file_checksum_fsizebig_idx", | ||
| ), | ||
| ), | ||
| pgtrigger.migrations.AddTrigger( | ||
| model_name="file", | ||
| trigger=pgtrigger.compiler.Trigger( | ||
| name="mirror_file_size_to_file_size_bigint", | ||
| sql=pgtrigger.compiler.UpsertTriggerSql( | ||
| func="IF NEW.file_size IS DISTINCT FROM OLD.file_size THEN NEW.file_size_bigint = NEW.file_size; END IF; RETURN NEW;", | ||
| hash="051e321c4cdf91ea81f96b9f9a29e3b5015def67", | ||
| operation="INSERT OR UPDATE", | ||
| pgid="pgtrigger_mirror_file_size_to_file_size_bigint_54326", | ||
| table="contentcuration_file", | ||
| when="BEFORE", | ||
| ), | ||
| ), | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,6 +79,7 @@ | |
| from contentcuration.constants import feedback | ||
| from contentcuration.constants import user_history | ||
| from contentcuration.constants.contentnode import kind_activity_map | ||
| from contentcuration.db.dual_write import mirror_field | ||
| from contentcuration.db.models.expressions import Array | ||
| from contentcuration.db.models.functions import ArrayRemove | ||
| from contentcuration.db.models.functions import Unnest | ||
|
|
@@ -3255,6 +3256,8 @@ class StagedFile(models.Model): | |
|
|
||
|
|
||
| FILE_DISTINCT_INDEX_NAME = "file_checksum_file_size_idx" | ||
| # studio#5974: bigint shadow of FILE_DISTINCT_INDEX_NAME, for the file_size widening. | ||
| FILE_DISTINCT_BIGINT_INDEX_NAME = "file_checksum_fsizebig_idx" | ||
| FILE_MODIFIED_DESC_INDEX_NAME = "file_modified_desc_idx" | ||
| FILE_DURATION_CONSTRAINT = "file_media_duration_int" | ||
| MEDIA_PRESETS = [ | ||
|
|
@@ -3266,6 +3269,14 @@ class StagedFile(models.Model): | |
| ] | ||
|
|
||
|
|
||
| # studio#5974 cutover (next release, after backfill completes): | ||
| # - drop the @mirror_field decorator and the file_size_bigint field below | ||
| # - file_size = models.BigIntegerField(blank=True, null=True, db_column="file_size_bigint") | ||
| # - FILE_DISTINCT_BIGINT_INDEX_NAME -> fields=["checksum", "file_size"] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really apart of this step, but how confident are we in Django handling this particular action without significant flux?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's why we need to hand write the migrations with the "separate database state and operations" wrapper for this next step - otherwise it will almost certainly do something we definitely do not want. |
||
| # - migration: swap the mirror trigger + SeparateDatabaseAndState state realignment | ||
| # studio#5974 contract (later release, no old pods left): migration drops the old | ||
| # file_size int column (IgnoreMigration) + the trigger; no model change. | ||
| @mirror_field("file_size", "file_size_bigint") # studio#5974: dual-write int->bigint | ||
| class File(models.Model): | ||
| """ | ||
| The bottom layer of the contentDB schema, defines the basic building brick for content. | ||
|
|
@@ -3275,6 +3286,9 @@ class File(models.Model): | |
| id = UUIDField(primary_key=True, default=uuid.uuid4) | ||
| checksum = models.CharField(max_length=400, blank=True, db_index=True) | ||
| file_size = models.IntegerField(blank=True, null=True) | ||
| file_size_bigint = models.BigIntegerField( | ||
| blank=True, null=True | ||
| ) # studio#5974 shadow | ||
| file_on_disk = models.FileField( | ||
| upload_to=object_storage_name, | ||
| storage=default_storage, | ||
|
|
@@ -3485,6 +3499,10 @@ class Meta: | |
| models.Index( | ||
| fields=["checksum", "file_size"], name=FILE_DISTINCT_INDEX_NAME | ||
| ), | ||
| models.Index( | ||
| fields=["checksum", "file_size_bigint"], | ||
| name=FILE_DISTINCT_BIGINT_INDEX_NAME, | ||
| ), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A constraint on |
||
| models.Index(fields=["-modified"], name=FILE_MODIFIED_DESC_INDEX_NAME), | ||
| ] | ||
| constraints = [ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -92,6 +92,7 @@ | |
| "django_celery_results", | ||
| "kolibri_public", | ||
| "automation", | ||
| "pgtrigger", | ||
| ) | ||
|
|
||
| SESSION_ENGINE = "django.contrib.sessions.backends.cached_db" | ||
|
|
@@ -193,7 +194,7 @@ | |
|
|
||
| DATABASES = { | ||
| "default": { | ||
| "ENGINE": "django.db.backends.postgresql_psycopg2", | ||
| "ENGINE": "django_zero_downtime_migrations.backends.postgres", | ||
| "NAME": os.getenv("DATA_DB_NAME") or "kolibri-studio", | ||
| # For dev purposes only | ||
| "USER": os.getenv("DATA_DB_USER") or "learningequality", | ||
|
|
@@ -204,6 +205,13 @@ | |
| }, | ||
| } | ||
|
|
||
| ZERO_DOWNTIME_MIGRATIONS_LOCK_TIMEOUT = "5s" | ||
| ZERO_DOWNTIME_MIGRATIONS_STATEMENT_TIMEOUT = "15s" | ||
|
Comment on lines
+208
to
+209
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These seem a bit aggressive. Even for "zero downtime" stuff, I would give it a reasonable request amount of time, i.e. 30s and 60s. Just that I would hate for this to fail because it's too aggressive.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair - especially as we rarely due huge numbers of migrations in our deploys. |
||
| ZERO_DOWNTIME_MIGRATIONS_FLEXIBLE_STATEMENT_TIMEOUT = ( | ||
| True # don't kill long-but-safe ops (e.g. CREATE INDEX CONCURRENTLY) | ||
| ) | ||
| ZERO_DOWNTIME_MIGRATIONS_RAISE_FOR_UNSAFE = True # surface unsafe DDL at runtime | ||
|
|
||
| IS_CONTENTNODE_TABLE_PARTITIONED = ( | ||
| os.getenv("IS_CONTENTNODE_TABLE_PARTITIONED") or False | ||
| ) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch size could probably be pushed to 10k or a little more since this operates entirely within the DB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think it also our default elsewhere?