diff --git a/README.md b/README.md index 98d08fbe..17bd54d7 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,33 @@ This enables the `route` label for Prometheus HTTP metrics. 5. To enable the `/metrics` endpoint, set the `PROMETHEUS_ENABLED` setting to `True`. +### Startup commands + +`common.core` registers composite startup verbs on the `flagsmith` command, so a container entrypoint can sequence startup itself rather than via a shell script: + +- `flagsmith serve` — wait for the database, then start the API. +- `flagsmith migrate` — migrate each configured database, then create the cache table. A bare invocation runs the full sequence; `flagsmith migrate ` defers to Django for targeted migrations. +- `flagsmith run-task-processor` — migrate, wait for migrations to be applied, then start the Task Processor. +- `flagsmith migrate-and-serve` — migrate, run any configured startup commands, then start the API. + +The verbs are configured through these settings: + +| Setting | Default | Purpose | +| --- | --- | --- | +| `FLAGSMITH_MIGRATE_DATABASES` | `["default"]` | Database aliases the `migrate` step applies, in order. | +| `FLAGSMITH_WAIT_FOR_MIGRATIONS_DATABASES` | `["default"]` | Database aliases `run-task-processor` waits on before starting. | +| `FLAGSMITH_STARTUP_COMMANDS` | `[]` | Management commands run, in order, between migrate and serve in `migrate-and-serve`. | + +…and these environment variables: + +| Environment variable | Default | Purpose | +| --- | --- | --- | +| `SKIP_WAIT_FOR_DB` | unset | When set, skip waiting for the database. | +| `TASK_PROCESSOR_NUM_THREADS` | `5` | Number of worker threads. | +| `TASK_PROCESSOR_SLEEP_INTERVAL_MS` | `500` | Millis each worker waits before checking for new tasks (falls back to `TASK_PROCESSOR_SLEEP_INTERVAL`). | +| `TASK_PROCESSOR_GRACE_PERIOD_MS` | `20000` | Millis before a running task is considered stuck. | +| `TASK_PROCESSOR_QUEUE_POP_SIZE` | `10` | Tasks each worker pops from the queue per cycle. | + ### Pre-commit hooks This repo provides a [`flagsmith-lint-tests`](.pre-commit-hooks.yaml) hook that enforces test conventions: diff --git a/pyproject.toml b/pyproject.toml index b8b0ed25..efc69d0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ optional-dependencies = { test-tools = [ "backoff (>=2.2.1,<3.0.0)", "django (>4,<6)", "django-health-check", + "environs (<16)", "opentelemetry-api (>=1.25,<2)", "prometheus-client (>=0.0.16)", ], flagsmith-schemas = [ diff --git a/src/common/core/cli/run.py b/src/common/core/cli/run.py new file mode 100644 index 00000000..4a41ecbb --- /dev/null +++ b/src/common/core/cli/run.py @@ -0,0 +1,79 @@ +"""Composite startup verbs for the `flagsmith` entrypoint. + +These mirror the sequencing that Core API's `run-docker.sh` performed in +shell, but run in a single process so the Django boot cost is paid once. +""" + +import os +import shlex + +from django.conf import settings +from django.core.management import ( + execute_from_command_line as django_execute_from_command_line, +) + +WAIT_FOR_MIGRATIONS_TIMEOUT_SECONDS = 30 + + +def _wait_for_db( + *, + wait_for_migrations: bool = False, + database: str = "default", + wait_for: int | None = None, +) -> None: + if os.environ.get("SKIP_WAIT_FOR_DB"): + return + args = ["waitfordb", "--database", database] + if wait_for is not None: + args += ["--waitfor", str(wait_for)] + if wait_for_migrations: + args.append("--migrations") + django_execute_from_command_line(["flagsmith", *args]) + + +def _migrate() -> None: + _wait_for_db() + databases: list[str] = getattr(settings, "FLAGSMITH_MIGRATE_DATABASES", ["default"]) + for database in databases: + django_execute_from_command_line( + ["flagsmith", "migrate", "--database", database] + ) + django_execute_from_command_line(["flagsmith", "createcachetable"]) + + +def migrate(argv: list[str], *, prog: str) -> None: + """Migrate the configured databases, then create the cache table.""" + if argv: + django_execute_from_command_line(["flagsmith", "migrate", *argv]) + return + _migrate() + + +def serve(argv: list[str], *, prog: str) -> None: + """Wait for the database, then start the API server.""" + _wait_for_db() + django_execute_from_command_line(["flagsmith", "start", "api", *argv]) + + +def run_task_processor(argv: list[str], *, prog: str) -> None: + """Migrate, wait for migrations to be applied, then start the task processor.""" + _migrate() + databases: list[str] = getattr( + settings, "FLAGSMITH_WAIT_FOR_MIGRATIONS_DATABASES", ["default"] + ) + for database in databases: + _wait_for_db( + wait_for_migrations=True, + database=database, + wait_for=WAIT_FOR_MIGRATIONS_TIMEOUT_SECONDS, + ) + django_execute_from_command_line(["flagsmith", "start", "task-processor", *argv]) + + +def migrate_and_serve(argv: list[str], *, prog: str) -> None: + """Migrate, run any configured startup commands, then start the API server.""" + _migrate() + startup_commands: list[str] = getattr(settings, "FLAGSMITH_STARTUP_COMMANDS", []) + for command in startup_commands: + django_execute_from_command_line(["flagsmith", *shlex.split(command)]) + django_execute_from_command_line(["flagsmith", "start", "api", *argv]) diff --git a/src/common/core/main.py b/src/common/core/main.py index 6178ba7a..7dec232e 100644 --- a/src/common/core/main.py +++ b/src/common/core/main.py @@ -10,7 +10,7 @@ ) from environs import Env -from common.core.cli import healthcheck +from common.core.cli import healthcheck, run from common.core.logging import setup_logging from common.gunicorn.processors import make_gunicorn_access_processor @@ -118,6 +118,11 @@ def execute_from_command_line(argv: list[str]) -> None: # Backwards compatibility for task-processor health checks # See https://github.com/Flagsmith/flagsmith-task-processor/issues/24 "checktaskprocessorthreadhealth": healthcheck.main, + # Composite startup verbs, formerly run-docker.sh + "serve": run.serve, + "migrate": run.migrate, + "run-task-processor": run.run_task_processor, + "migrate-and-serve": run.migrate_and_serve, }[subcommand] except (IndexError, KeyError): logger.info("Invoking Django") diff --git a/src/task_processor/utils.py b/src/task_processor/utils.py index f526ff50..66550e7e 100644 --- a/src/task_processor/utils.py +++ b/src/task_processor/utils.py @@ -4,11 +4,15 @@ from contextlib import contextmanager from typing import Any, Generator +from environs import Env + from task_processor.threads import TaskRunnerCoordinator from task_processor.types import TaskCallable, TaskProcessorConfig logger = logging.getLogger(__name__) +env = Env() + def get_task_identifier_from_function( function: TaskCallable[Any], @@ -24,25 +28,28 @@ def add_arguments(parser: argparse.ArgumentParser) -> None: "--numthreads", type=int, help="Number of worker threads to run.", - default=5, + default=env.int("TASK_PROCESSOR_NUM_THREADS", 5), ) parser.add_argument( "--sleepintervalms", type=int, help="Number of millis each worker waits before checking for new tasks", - default=2000, + default=env.int( + "TASK_PROCESSOR_SLEEP_INTERVAL_MS", + env.int("TASK_PROCESSOR_SLEEP_INTERVAL", 500), + ), ) parser.add_argument( "--graceperiodms", type=int, help="Number of millis before running task is considered 'stuck'.", - default=20000, + default=env.int("TASK_PROCESSOR_GRACE_PERIOD_MS", 20000), ) parser.add_argument( "--queuepopsize", type=int, help="Number of tasks each worker will pop from the queue on each cycle.", - default=10, + default=env.int("TASK_PROCESSOR_QUEUE_POP_SIZE", 10), ) diff --git a/tests/unit/common/core/test_main.py b/tests/unit/common/core/test_main.py index b906c03e..32b78621 100644 --- a/tests/unit/common/core/test_main.py +++ b/tests/unit/common/core/test_main.py @@ -6,7 +6,7 @@ from pytest_mock import MockerFixture from structlog.typing import Processor -from common.core.main import ensure_cli_env +from common.core.main import ensure_cli_env, execute_from_command_line from common.core.otel import add_otel_trace_context @@ -252,3 +252,45 @@ def test_ensure_cli_env__env_service_name__expected_otel_service_name( endpoint="http://collector:4318/v1/traces", service_name="my-custom", ) + + +@pytest.mark.parametrize( + "subcommand,handler_name", + [ + pytest.param("serve", "serve", id="serve"), + pytest.param("migrate", "migrate", id="migrate"), + pytest.param( + "run-task-processor", "run_task_processor", id="run_task_processor" + ), + pytest.param("migrate-and-serve", "migrate_and_serve", id="migrate_and_serve"), + ], +) +def test_execute_from_command_line__startup_verb__routes_to_run_handler( + mocker: MockerFixture, + subcommand: str, + handler_name: str, +) -> None: + # Given + mock_handler = mocker.patch(f"common.core.cli.run.{handler_name}") + + # When + execute_from_command_line(["flagsmith", subcommand, "--extra", "arg"]) + + # Then + mock_handler.assert_called_once_with( + ["--extra", "arg"], + prog=f"flagsmith {subcommand}", + ) + + +def test_execute_from_command_line__unknown_subcommand__invokes_django( + mocker: MockerFixture, +) -> None: + # Given + mock_django = mocker.patch("common.core.main.django_execute_from_command_line") + + # When + execute_from_command_line(["flagsmith", "showmigrations"]) + + # Then + mock_django.assert_called_once_with(["flagsmith", "showmigrations"]) diff --git a/tests/unit/common/core/test_run.py b/tests/unit/common/core/test_run.py new file mode 100644 index 00000000..527dff36 --- /dev/null +++ b/tests/unit/common/core/test_run.py @@ -0,0 +1,209 @@ +from unittest.mock import MagicMock + +import pytest +from django.test import override_settings +from pytest_mock import MockerFixture + +from common.core.cli import run + + +@pytest.fixture +def mock_run(mocker: MockerFixture) -> MagicMock: + return mocker.patch("common.core.cli.run.django_execute_from_command_line") + + +def test_serve__default__waits_for_db_then_starts_api( + mock_run: MagicMock, +) -> None: + # Given / When + run.serve([], prog="flagsmith serve") + + # Then + assert [call.args[0] for call in mock_run.call_args_list] == [ + ["flagsmith", "waitfordb", "--database", "default"], + ["flagsmith", "start", "api"], + ] + + +def test_serve__extra_args__forwarded_to_start( + mock_run: MagicMock, +) -> None: + # Given / When + run.serve(["--workers", "5"], prog="flagsmith serve") + + # Then + assert [call.args[0] for call in mock_run.call_args_list][-1] == [ + "flagsmith", + "start", + "api", + "--workers", + "5", + ] + + +def test_serve__skip_wait_for_db_set__does_not_wait( + mock_run: MagicMock, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + monkeypatch.setenv("SKIP_WAIT_FOR_DB", "1") + + # When + run.serve([], prog="flagsmith serve") + + # Then + assert [call.args[0] for call in mock_run.call_args_list] == [ + ["flagsmith", "start", "api"] + ] + + +def test_migrate__no_args__migrates_default_then_creates_cache_table( + mock_run: MagicMock, +) -> None: + # Given / When + run.migrate([], prog="flagsmith migrate") + + # Then + assert [call.args[0] for call in mock_run.call_args_list] == [ + ["flagsmith", "waitfordb", "--database", "default"], + ["flagsmith", "migrate", "--database", "default"], + ["flagsmith", "createcachetable"], + ] + + +@override_settings( + FLAGSMITH_MIGRATE_DATABASES=["default", "analytics", "task_processor"] +) +def test_migrate__configured_databases__migrates_each_in_order( + mock_run: MagicMock, +) -> None: + # Given / When + run.migrate([], prog="flagsmith migrate") + + # Then + assert [call.args[0] for call in mock_run.call_args_list] == [ + ["flagsmith", "waitfordb", "--database", "default"], + ["flagsmith", "migrate", "--database", "default"], + ["flagsmith", "migrate", "--database", "analytics"], + ["flagsmith", "migrate", "--database", "task_processor"], + ["flagsmith", "createcachetable"], + ] + + +def test_migrate__with_args__defers_to_django_migrate( + mock_run: MagicMock, +) -> None: + # Given / When + run.migrate(["myapp", "0042"], prog="flagsmith migrate") + + # Then — targeted migration only, no composite steps + assert [call.args[0] for call in mock_run.call_args_list] == [ + ["flagsmith", "migrate", "myapp", "0042"], + ] + + +def test_run_task_processor__default__migrates_waits_then_starts( + mock_run: MagicMock, +) -> None: + # Given / When + run.run_task_processor([], prog="flagsmith run-task-processor") + + # Then — migrates (like run-docker.sh), waits for migrations, then starts + assert [call.args[0] for call in mock_run.call_args_list] == [ + ["flagsmith", "waitfordb", "--database", "default"], + ["flagsmith", "migrate", "--database", "default"], + ["flagsmith", "createcachetable"], + [ + "flagsmith", + "waitfordb", + "--database", + "default", + "--waitfor", + "30", + "--migrations", + ], + ["flagsmith", "start", "task-processor"], + ] + + +@override_settings( + FLAGSMITH_MIGRATE_DATABASES=["default", "analytics"], + FLAGSMITH_WAIT_FOR_MIGRATIONS_DATABASES=["default", "analytics"], +) +def test_run_task_processor__configured_databases__migrates_and_waits_for_each( + mock_run: MagicMock, +) -> None: + # Given / When + run.run_task_processor([], prog="flagsmith run-task-processor") + + # Then + assert [call.args[0] for call in mock_run.call_args_list] == [ + ["flagsmith", "waitfordb", "--database", "default"], + ["flagsmith", "migrate", "--database", "default"], + ["flagsmith", "migrate", "--database", "analytics"], + ["flagsmith", "createcachetable"], + [ + "flagsmith", + "waitfordb", + "--database", + "default", + "--waitfor", + "30", + "--migrations", + ], + [ + "flagsmith", + "waitfordb", + "--database", + "analytics", + "--waitfor", + "30", + "--migrations", + ], + ["flagsmith", "start", "task-processor"], + ] + + +def test_migrate_and_serve__no_startup_commands__migrates_then_serves( + mock_run: MagicMock, +) -> None: + # Given / When + run.migrate_and_serve([], prog="flagsmith migrate-and-serve") + + # Then + assert [call.args[0] for call in mock_run.call_args_list] == [ + ["flagsmith", "waitfordb", "--database", "default"], + ["flagsmith", "migrate", "--database", "default"], + ["flagsmith", "createcachetable"], + ["flagsmith", "start", "api"], + ] + + +@override_settings(FLAGSMITH_STARTUP_COMMANDS=["bootstrap"]) +def test_migrate_and_serve__startup_commands__runs_them_between_migrate_and_serve( + mock_run: MagicMock, +) -> None: + # Given / When + run.migrate_and_serve([], prog="flagsmith migrate-and-serve") + + # Then + assert [call.args[0] for call in mock_run.call_args_list] == [ + ["flagsmith", "waitfordb", "--database", "default"], + ["flagsmith", "migrate", "--database", "default"], + ["flagsmith", "createcachetable"], + ["flagsmith", "bootstrap"], + ["flagsmith", "start", "api"], + ] + + +@override_settings(FLAGSMITH_STARTUP_COMMANDS=["bootstrap --skip-fixtures"]) +def test_migrate_and_serve__startup_command_with_args__split_into_argv( + mock_run: MagicMock, +) -> None: + # Given / When + run.migrate_and_serve([], prog="flagsmith migrate-and-serve") + + # Then — the command string is shell-split into separate argv elements + assert ["flagsmith", "bootstrap", "--skip-fixtures"] in [ + call.args[0] for call in mock_run.call_args_list + ] diff --git a/tests/unit/task_processor/test_unit_task_processor_utils.py b/tests/unit/task_processor/test_unit_task_processor_utils.py new file mode 100644 index 00000000..a239f663 --- /dev/null +++ b/tests/unit/task_processor/test_unit_task_processor_utils.py @@ -0,0 +1,81 @@ +import argparse + +import pytest + +from task_processor.utils import add_arguments + + +def _parse_defaults() -> argparse.Namespace: + parser = argparse.ArgumentParser() + add_arguments(parser) + return parser.parse_args([]) + + +def test_add_arguments__no_env__uses_run_docker_defaults( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + for name in [ + "TASK_PROCESSOR_NUM_THREADS", + "TASK_PROCESSOR_SLEEP_INTERVAL_MS", + "TASK_PROCESSOR_SLEEP_INTERVAL", + "TASK_PROCESSOR_GRACE_PERIOD_MS", + "TASK_PROCESSOR_QUEUE_POP_SIZE", + ]: + monkeypatch.delenv(name, raising=False) + + # When + args = _parse_defaults() + + # Then + assert args.numthreads == 5 + assert args.sleepintervalms == 500 + assert args.graceperiodms == 20000 + assert args.queuepopsize == 10 + + +def test_add_arguments__env_set__uses_env_values( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + monkeypatch.setenv("TASK_PROCESSOR_NUM_THREADS", "8") + monkeypatch.setenv("TASK_PROCESSOR_SLEEP_INTERVAL_MS", "250") + monkeypatch.setenv("TASK_PROCESSOR_GRACE_PERIOD_MS", "15000") + monkeypatch.setenv("TASK_PROCESSOR_QUEUE_POP_SIZE", "20") + + # When + args = _parse_defaults() + + # Then + assert args.numthreads == 8 + assert args.sleepintervalms == 250 + assert args.graceperiodms == 15000 + assert args.queuepopsize == 20 + + +def test_add_arguments__legacy_sleep_interval__used_as_fallback( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + monkeypatch.delenv("TASK_PROCESSOR_SLEEP_INTERVAL_MS", raising=False) + monkeypatch.setenv("TASK_PROCESSOR_SLEEP_INTERVAL", "750") + + # When + args = _parse_defaults() + + # Then + assert args.sleepintervalms == 750 + + +def test_add_arguments__sleep_interval_ms__takes_precedence_over_legacy( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Given + monkeypatch.setenv("TASK_PROCESSOR_SLEEP_INTERVAL_MS", "300") + monkeypatch.setenv("TASK_PROCESSOR_SLEEP_INTERVAL", "750") + + # When + args = _parse_defaults() + + # Then + assert args.sleepintervalms == 300 diff --git a/uv.lock b/uv.lock index 7131bd73..8b541730 100644 --- a/uv.lock +++ b/uv.lock @@ -495,6 +495,7 @@ task-processor = [ { name = "backoff" }, { name = "django" }, { name = "django-health-check" }, + { name = "environs" }, { name = "opentelemetry-api" }, { name = "prometheus-client" }, ] @@ -537,6 +538,7 @@ requires-dist = [ { name = "drf-spectacular", marker = "extra == 'common-core'", specifier = ">=0.28.0,<1" }, { name = "drf-writable-nested", marker = "extra == 'common-core'" }, { name = "environs", marker = "extra == 'common-core'", specifier = "<16" }, + { name = "environs", marker = "extra == 'task-processor'", specifier = "<16" }, { name = "flagsmith-common", extras = ["otel"], marker = "extra == 'common-core'" }, { name = "flagsmith-flag-engine", marker = "extra == 'flagsmith-schemas'", specifier = ">6" }, { name = "gunicorn", marker = "extra == 'common-core'", specifier = ">=19.1" },