Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,33 @@ This enables the `route` label for Prometheus HTTP metrics.

5. To enable the `/metrics` endpoint, set the `PROMETHEUS_ENABLED` setting to `True`.

### Startup commands

`common.core` registers composite startup verbs on the `flagsmith` command, so a container entrypoint can sequence startup itself rather than via a shell script:

- `flagsmith serve` — wait for the database, then start the API.
- `flagsmith migrate` — migrate each configured database, then create the cache table. A bare invocation runs the full sequence; `flagsmith migrate <app> <name>` defers to Django for targeted migrations.
- `flagsmith run-task-processor` — migrate, wait for migrations to be applied, then start the Task Processor.
- `flagsmith migrate-and-serve` — migrate, run any configured startup commands, then start the API.

The verbs are configured through these settings:

| Setting | Default | Purpose |
| --- | --- | --- |
| `FLAGSMITH_MIGRATE_DATABASES` | `["default"]` | Database aliases the `migrate` step applies, in order. |
| `FLAGSMITH_WAIT_FOR_MIGRATIONS_DATABASES` | `["default"]` | Database aliases `run-task-processor` waits on before starting. |
| `FLAGSMITH_STARTUP_COMMANDS` | `[]` | Management commands run, in order, between migrate and serve in `migrate-and-serve`. |

…and these environment variables:

| Environment variable | Default | Purpose |
| --- | --- | --- |
| `SKIP_WAIT_FOR_DB` | unset | When set, skip waiting for the database. |
| `TASK_PROCESSOR_NUM_THREADS` | `5` | Number of worker threads. |
| `TASK_PROCESSOR_SLEEP_INTERVAL_MS` | `500` | Millis each worker waits before checking for new tasks (falls back to `TASK_PROCESSOR_SLEEP_INTERVAL`). |

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Clarify that TASK_PROCESSOR_SLEEP_INTERVAL is configured in seconds.

Suggested change
| `TASK_PROCESSOR_SLEEP_INTERVAL_MS` | `500` | Millis each worker waits before checking for new tasks (falls back to `TASK_PROCESSOR_SLEEP_INTERVAL`). |
| `TASK_PROCESSOR_SLEEP_INTERVAL_MS` | `500` | Millis each worker waits before checking for new tasks (falls back to `TASK_PROCESSOR_SLEEP_INTERVAL` in seconds). |

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

| `TASK_PROCESSOR_GRACE_PERIOD_MS` | `20000` | Millis before a running task is considered stuck. |
| `TASK_PROCESSOR_QUEUE_POP_SIZE` | `10` | Tasks each worker pops from the queue per cycle. |

### Pre-commit hooks

This repo provides a [`flagsmith-lint-tests`](.pre-commit-hooks.yaml) hook that enforces test conventions:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ optional-dependencies = { test-tools = [
"backoff (>=2.2.1,<3.0.0)",
"django (>4,<6)",
"django-health-check",
"environs (<16)",
"opentelemetry-api (>=1.25,<2)",
"prometheus-client (>=0.0.16)",
], flagsmith-schemas = [
Expand Down
79 changes: 79 additions & 0 deletions src/common/core/cli/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Composite startup verbs for the `flagsmith` entrypoint.

These mirror the sequencing that Core API's `run-docker.sh` performed in
shell, but run in a single process so the Django boot cost is paid once.
"""

import os
import shlex

from django.conf import settings
from django.core.management import (
execute_from_command_line as django_execute_from_command_line,
)

WAIT_FOR_MIGRATIONS_TIMEOUT_SECONDS = 30


def _wait_for_db(
*,
wait_for_migrations: bool = False,
database: str = "default",
wait_for: int | None = None,
) -> None:
if os.environ.get("SKIP_WAIT_FOR_DB"):
return
args = ["waitfordb", "--database", database]
if wait_for is not None:
args += ["--waitfor", str(wait_for)]
if wait_for_migrations:
args.append("--migrations")
django_execute_from_command_line(["flagsmith", *args])


def _migrate() -> None:
_wait_for_db()
databases: list[str] = getattr(settings, "FLAGSMITH_MIGRATE_DATABASES", ["default"])
for database in databases:
django_execute_from_command_line(
["flagsmith", "migrate", "--database", database]
)
django_execute_from_command_line(["flagsmith", "createcachetable"])


def migrate(argv: list[str], *, prog: str) -> None:
"""Migrate the configured databases, then create the cache table."""
if argv:
django_execute_from_command_line(["flagsmith", "migrate", *argv])
return
_migrate()


def serve(argv: list[str], *, prog: str) -> None:
"""Wait for the database, then start the API server."""
_wait_for_db()
django_execute_from_command_line(["flagsmith", "start", "api", *argv])


def run_task_processor(argv: list[str], *, prog: str) -> None:
"""Migrate, wait for migrations to be applied, then start the task processor."""
_migrate()
databases: list[str] = getattr(
settings, "FLAGSMITH_WAIT_FOR_MIGRATIONS_DATABASES", ["default"]
)
for database in databases:
_wait_for_db(
wait_for_migrations=True,
database=database,
wait_for=WAIT_FOR_MIGRATIONS_TIMEOUT_SECONDS,
)
django_execute_from_command_line(["flagsmith", "start", "task-processor", *argv])


def migrate_and_serve(argv: list[str], *, prog: str) -> None:
"""Migrate, run any configured startup commands, then start the API server."""
_migrate()
startup_commands: list[str] = getattr(settings, "FLAGSMITH_STARTUP_COMMANDS", [])
for command in startup_commands:
django_execute_from_command_line(["flagsmith", *shlex.split(command)])
django_execute_from_command_line(["flagsmith", "start", "api", *argv])
Comment on lines +73 to +79

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If a startup command in FLAGSMITH_STARTUP_COMMANDS contains arguments (e.g., "bootstrap --force"), passing it directly to django_execute_from_command_line as a single string will fail because Django expects the subcommand and its arguments to be separate elements in the argv list. We should split the command string using shlex.split.

Suggested change
def migrate_and_serve(argv: list[str], *, prog: str) -> None:
"""Migrate, run any configured startup commands, then start the API server."""
_migrate()
startup_commands: list[str] = getattr(settings, "FLAGSMITH_STARTUP_COMMANDS", [])
for command in startup_commands:
django_execute_from_command_line(["flagsmith", command])
django_execute_from_command_line(["flagsmith", "start", "api", *argv])
def migrate_and_serve(argv: list[str], *, prog: str) -> None:
"""Migrate, run any configured startup commands, then start the API server."""
import shlex
_migrate()
startup_commands: list[str] = getattr(settings, "FLAGSMITH_STARTUP_COMMANDS", [])
for command in startup_commands:
django_execute_from_command_line(["flagsmith", *shlex.split(command)])
django_execute_from_command_line(["flagsmith", "start", "api", *argv])

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 changes: 6 additions & 1 deletion src/common/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)
from environs import Env

from common.core.cli import healthcheck
from common.core.cli import healthcheck, run
from common.core.logging import setup_logging
from common.gunicorn.processors import make_gunicorn_access_processor

Expand Down Expand Up @@ -118,6 +118,11 @@ def execute_from_command_line(argv: list[str]) -> None:
# Backwards compatibility for task-processor health checks
# See https://github.com/Flagsmith/flagsmith-task-processor/issues/24
"checktaskprocessorthreadhealth": healthcheck.main,
# Composite startup verbs, formerly run-docker.sh
"serve": run.serve,
"migrate": run.migrate,
"run-task-processor": run.run_task_processor,
"migrate-and-serve": run.migrate_and_serve,
}[subcommand]
except (IndexError, KeyError):
logger.info("Invoking Django")
Expand Down
15 changes: 11 additions & 4 deletions src/task_processor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
from contextlib import contextmanager
from typing import Any, Generator

from environs import Env

from task_processor.threads import TaskRunnerCoordinator
from task_processor.types import TaskCallable, TaskProcessorConfig

logger = logging.getLogger(__name__)

env = Env()


def get_task_identifier_from_function(
function: TaskCallable[Any],
Expand All @@ -24,25 +28,28 @@ def add_arguments(parser: argparse.ArgumentParser) -> None:
"--numthreads",
type=int,
help="Number of worker threads to run.",
default=5,
default=env.int("TASK_PROCESSOR_NUM_THREADS", 5),
)
parser.add_argument(
"--sleepintervalms",
type=int,
help="Number of millis each worker waits before checking for new tasks",
default=2000,
default=env.int(
"TASK_PROCESSOR_SLEEP_INTERVAL_MS",
env.int("TASK_PROCESSOR_SLEEP_INTERVAL", 500),
),
)
Comment on lines 33 to 41

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The legacy environment variable TASK_PROCESSOR_SLEEP_INTERVAL is defined in seconds (e.g., 0.5 or 2), whereas --sleepintervalms expects milliseconds.

  1. If TASK_PROCESSOR_SLEEP_INTERVAL is set to a float like 0.5, env.int will raise a ValidationError because "0.5" is not a valid integer.
  2. If it is set to an integer like 2 (representing 2 seconds), env.int will return 2, which means 2 milliseconds, causing the task processor to spin aggressively and consume 100% CPU.

We should parse TASK_PROCESSOR_SLEEP_INTERVAL as a float, multiply by 1000, and convert to an integer.

Suggested change
parser.add_argument(
"--sleepintervalms",
type=int,
help="Number of millis each worker waits before checking for new tasks",
default=2000,
default=env.int(
"TASK_PROCESSOR_SLEEP_INTERVAL_MS",
env.int("TASK_PROCESSOR_SLEEP_INTERVAL", 500),
),
)
legacy_sleep_interval = env.float("TASK_PROCESSOR_SLEEP_INTERVAL", None)
legacy_sleep_interval_ms = (
int(legacy_sleep_interval * 1000)
if legacy_sleep_interval is not None
else 500
)
parser.add_argument(
"--sleepintervalms",
type=int,
help="Number of millis each worker waits before checking for new tasks",
default=env.int(
"TASK_PROCESSOR_SLEEP_INTERVAL_MS",
legacy_sleep_interval_ms,
),
)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TASK_PROCESSOR_SLEEP_INTERVAL originally expected milliseconds.

parser.add_argument(
"--graceperiodms",
type=int,
help="Number of millis before running task is considered 'stuck'.",
default=20000,
default=env.int("TASK_PROCESSOR_GRACE_PERIOD_MS", 20000),
)
parser.add_argument(
"--queuepopsize",
type=int,
help="Number of tasks each worker will pop from the queue on each cycle.",
default=10,
default=env.int("TASK_PROCESSOR_QUEUE_POP_SIZE", 10),
)


Expand Down
44 changes: 43 additions & 1 deletion tests/unit/common/core/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pytest_mock import MockerFixture
from structlog.typing import Processor

from common.core.main import ensure_cli_env
from common.core.main import ensure_cli_env, execute_from_command_line
from common.core.otel import add_otel_trace_context


Expand Down Expand Up @@ -252,3 +252,45 @@ def test_ensure_cli_env__env_service_name__expected_otel_service_name(
endpoint="http://collector:4318/v1/traces",
service_name="my-custom",
)


@pytest.mark.parametrize(
"subcommand,handler_name",
[
pytest.param("serve", "serve", id="serve"),
pytest.param("migrate", "migrate", id="migrate"),
pytest.param(
"run-task-processor", "run_task_processor", id="run_task_processor"
),
pytest.param("migrate-and-serve", "migrate_and_serve", id="migrate_and_serve"),
],
)
def test_execute_from_command_line__startup_verb__routes_to_run_handler(
mocker: MockerFixture,
subcommand: str,
handler_name: str,
) -> None:
# Given
mock_handler = mocker.patch(f"common.core.cli.run.{handler_name}")

# When
execute_from_command_line(["flagsmith", subcommand, "--extra", "arg"])

# Then
mock_handler.assert_called_once_with(
["--extra", "arg"],
prog=f"flagsmith {subcommand}",
)


def test_execute_from_command_line__unknown_subcommand__invokes_django(
mocker: MockerFixture,
) -> None:
# Given
mock_django = mocker.patch("common.core.main.django_execute_from_command_line")

# When
execute_from_command_line(["flagsmith", "showmigrations"])

# Then
mock_django.assert_called_once_with(["flagsmith", "showmigrations"])
Loading