diff --git a/.github/workflows/slo-report.yml b/.github/workflows/slo-report.yml index 07231d3b..049f919e 100644 --- a/.github/workflows/slo-report.yml +++ b/.github/workflows/slo-report.yml @@ -16,42 +16,7 @@ jobs: pull-requests: write steps: - name: Publish YDB SLO Report - uses: ydb-platform/ydb-slo-action/report@13c687b7d4b2879da79dd12932dee0ed2b65dd1c + uses: ydb-platform/ydb-slo-action/report@v2 with: github_token: ${{ secrets.GITHUB_TOKEN }} github_run_id: ${{ github.event.workflow_run.id }} - - remove-slo-label: - if: always() && github.event.workflow_run.event == 'pull_request' - name: Remove SLO Label - needs: ydb-slo-action-report - runs-on: ubuntu-latest - permissions: - pull-requests: write - steps: - - name: Remove SLO label from PR - uses: actions/github-script@v7 - with: - script: | - const pullRequests = context.payload.workflow_run.pull_requests; - if (pullRequests && pullRequests.length > 0) { - for (const pr of pullRequests) { - try { - await github.rest.issues.removeLabel({ - owner: context.repo.owner, - repo: context.repo.repo, - issue_number: pr.number, - name: 'SLO' - }); - console.log(`Removed SLO label from PR #${pr.number}`); - } catch (error) { - if (error.status === 404) { - console.log(`SLO label not found on PR #${pr.number}, skipping`); - } else { - throw error; - } - } - } - } else { - console.log('No pull requests associated with this workflow run'); - } diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index fbbf66cc..fd7ec605 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -42,16 +42,14 @@ jobs: strategy: fail-fast: false matrix: - include: - - id: sync-table - prefix: table - workload: sync-table - - id: sync-query - prefix: table - workload: sync-query + sdk: + - name: sync-table + command: "--read-rps ${{ inputs.slo_workload_read_max_rps || '1000' }} --write-rps ${{ inputs.slo_workload_write_max_rps || '100' }}" + - name: sync-query + command: "--read-rps ${{ inputs.slo_workload_read_max_rps || '1000' }} --write-rps ${{ inputs.slo_workload_write_max_rps || '100' }}" concurrency: - group: slo-${{ github.ref }}-${{ matrix.workload }} + group: slo-${{ github.ref }}-${{ matrix.sdk.name }} cancel-in-progress: true steps: @@ -84,7 +82,7 @@ jobs: docker compose version - name: Checkout current version - uses: actions/checkout@v5 + uses: actions/checkout@v6 with: path: current fetch-depth: 0 @@ -118,16 +116,10 @@ jobs: echo "ref=$BASELINE_REF" >> $GITHUB_OUTPUT - name: Checkout baseline version - uses: actions/checkout@v5 + uses: actions/checkout@v6 with: ref: ${{ steps.baseline.outputs.sha }} path: baseline - fetch-depth: 1 - - - name: Show Docker versions - run: | - docker --version - docker compose version - name: Build workload images (current + baseline) run: | @@ -141,125 +133,32 @@ jobs: -t "ydb-app-baseline" \ "$GITHUB_WORKSPACE/baseline" - - name: Initialize YDB SLO - id: ydb_slo - uses: ydb-platform/ydb-slo-action/init@13c687b7d4b2879da79dd12932dee0ed2b65dd1c + - name: Run SLO Tests + uses: ydb-platform/ydb-slo-action/init@v2 + timeout-minutes: 30 with: - github_issue: ${{ github.event.pull_request.number || inputs.github_issue }} + github_issue: ${{ github.event.inputs.github_issue }} github_token: ${{ secrets.GITHUB_TOKEN }} - workload_name: ydb-python-${{ matrix.workload }} + workload_name: ${{ matrix.sdk.name }} + workload_duration: ${{ inputs.slo_workload_duration_seconds || '600' }} workload_current_ref: ${{ github.head_ref || github.ref_name }} + workload_current_image: ydb-app-current + workload_current_command: ${{ matrix.sdk.command }} workload_baseline_ref: ${{ steps.baseline.outputs.ref }} - - - name: Prepare SLO Database - run: | - docker run --rm \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - -e "WORKLOAD=${{ matrix.workload }}" \ - -e "REF=${{ github.head_ref || github.ref_name }}" \ - ydb-app-current \ - ${{ matrix.prefix }}-create grpc://ydb:2136 /Root/testdb - - - name: Run SLO Tests (current + baseline in parallel) - timeout-minutes: 15 - env: - WORKLOAD: ${{ matrix.workload }} - DURATION: ${{ inputs.slo_workload_duration_seconds || 600 }} - READ_RPS: ${{ inputs.slo_workload_read_max_rps || 1000 }} - WRITE_RPS: ${{ inputs.slo_workload_write_max_rps || 100 }} - CURRENT_REF: ${{ github.head_ref || github.ref_name }} - BASELINE_REF: ${{ steps.baseline.outputs.ref }} - run: | - ARGS="${{ matrix.prefix }}-run grpc://ydb:2136 /Root/testdb \ - --otlp-endpoint http://prometheus:9090/api/v1/otlp/v1/metrics \ - --report-period 250 \ - --time ${DURATION} \ - --read-rps ${READ_RPS} \ - --write-rps ${WRITE_RPS} \ - --read-timeout 1000 \ - --write-timeout 1000" - - echo "Starting current workload (ref=${CURRENT_REF}, workload=${WORKLOAD})..." - docker run -d \ - --name ydb-app-current \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - -e "REF=${CURRENT_REF}" \ - -e "WORKLOAD=${WORKLOAD}" \ - ydb-app-current \ - $ARGS - - echo "Starting baseline workload (ref=${BASELINE_REF}, workload=${WORKLOAD})..." - docker run -d \ - --name ydb-app-baseline \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - -e "REF=${BASELINE_REF}" \ - -e "WORKLOAD=${WORKLOAD}" \ - ydb-app-baseline \ - $ARGS - - echo "" - echo "==================== INITIAL CURRENT LOGS ====================" - docker logs -n 15 ydb-app-current 2>&1 || echo "No current container" - echo "" - echo "==================== INITIAL BASELINE LOGS ====================" - docker logs -n 15 ydb-app-baseline 2>&1 || echo "No baseline container" - echo "" - - echo "Waiting for workloads to complete (${DURATION}s)..." - sleep ${DURATION} - - echo "Stopping containers after ${DURATION}s..." - docker stop --timeout=30 ydb-app-current ydb-app-baseline 2>&1 || true - - # Force kill if still running - docker kill ydb-app-current ydb-app-baseline 2>&1 || true - - # Check exit codes - CURRENT_EXIT=$(docker inspect ydb-app-current --format='{{.State.ExitCode}}' 2>/dev/null || echo "1") - BASELINE_EXIT=$(docker inspect ydb-app-baseline --format='{{.State.ExitCode}}' 2>/dev/null || echo "1") - - echo "Current exit code: ${CURRENT_EXIT}" - echo "Baseline exit code: ${BASELINE_EXIT}" - - echo "" - echo "==================== FINAL CURRENT LOGS ====================" - docker logs -n 15 ydb-app-current 2>&1 || echo "No current container" - echo "" - echo "==================== FINAL BASELINE LOGS ====================" - docker logs -n 15 ydb-app-baseline 2>&1 || echo "No baseline container" - echo "" - - if [[ "${CURRENT_EXIT}" != "0" || "${BASELINE_EXIT}" != "0" ]]; then - echo "One or both workloads failed." - exit 0 - fi - - echo "SUCCESS: Workloads completed successfully" - - - if: always() - name: Store logs - run: | - docker logs ydb-app-current > current.log 2>&1 || echo "No current container" > current.log - docker logs ydb-app-baseline > baseline.log 2>&1 || echo "No baseline container" > baseline.log - - - if: always() - name: Upload logs - uses: actions/upload-artifact@v4 + workload_baseline_image: ydb-app-current + workload_baseline_command: ${{ matrix.sdk.command }} + + ydb-slo-action-report: + runs-on: ubuntu-latest + name: Publish YDB SLO Report + needs: ydb-slo-action + permissions: + checks: write + contents: read + pull-requests: write + steps: + - name: Publish YDB SLO Report + uses: ydb-platform/ydb-slo-action/report@v2 with: - name: ydb-python-${{ matrix.workload }}-logs - path: | - ./current.log - ./baseline.log - retention-days: 1 + github_token: ${{ secrets.GITHUB_TOKEN }} + github_run_id: ${{ github.run_id }} diff --git a/AGENTS.md b/AGENTS.md index b347890e..2b6f1c98 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -90,44 +90,47 @@ To regenerate protobuf stubs: see `Makefile` and `generate-protobuf.Dockerfile`. --- -## Topic Chaos Testing (SLO) +## SLO Testing -Run this only for changes that affect topic reader/writer reconnection logic. +Run this for changes that affect topic/table reader/writer reconnection logic. -**1. Start YDB with chaos** (kills a DB node every ~20 seconds): -```sh -docker compose -f tests/slo/playground/configs/compose.yaml up -d -``` +### Docker Compose (full stack) -**2. Wait until YDB is healthy:** -```sh -docker ps --format "table {{.Names}}\t{{.Status}}" | grep ydb -``` +Uses [ydb-slo-action](https://github.com/ydb-platform/ydb-slo-action/tree/v2/deploy) infra (YDB cluster + Prometheus + workload in one command). -**3. Create a test topic** (from `tests/slo/` directory): +From `tests/slo/` directory: ```sh -source .venv/bin/activate -python ./src topic-create grpc://localhost:2135 /Root/testdb \ - --path /Root/testdb/slo_topic --debug +WORKLOAD_NAME=topic ./slo_runner.sh +WORKLOAD_NAME=sync-query ./slo_runner.sh ``` -**4. Test writer** (60 sec): +Override defaults via env vars: `RUN_TIME_SEC`, `WRITE_RPS`, `READ_THREADS`, `WRITE_THREADS`, `MESSAGE_SIZE`, `DEBUG=1`. + +### Local run (against your own YDB) + +**1. Start playground cluster:** ```sh -python ./src topic-run grpc://localhost:2135 /Root/testdb \ - --path /Root/testdb/slo_topic --otlp-endpoint "" \ - --read-threads 0 --write-rps 1 --time 60 --debug +docker compose -f tests/slo/playground/configs/compose.yaml up -d ``` -**5. Test reader** (60 sec): +**2. Run workload** (from `tests/slo/` directory): ```sh -python ./src topic-run grpc://localhost:2135 /Root/testdb \ - --path /Root/testdb/slo_topic --otlp-endpoint "" \ - --read-rps 1 --write-threads 0 --time 60 --debug +source ../../.venv/bin/activate + +# Topic workload (60 sec) +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name topic --otlp-endpoint "" --time 60 --debug + +# Table workload (60 sec) +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name sync-query --otlp-endpoint "" --time 60 --debug ``` -**6. Tear down:** +**3. Tear down:** ```sh docker compose -f tests/slo/playground/configs/compose.yaml down ``` -**Success criteria:** writer and reader reconnect automatically during node restarts with no fatal errors. +Full list of CLI arguments and environment variables: see `tests/slo/README.md` or run `python tests/slo/src --help`. + +**Success criteria:** workload reconnects automatically during node restarts with no fatal errors. diff --git a/pyproject.toml b/pyproject.toml index 41e7ef6f..0bda93e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,9 @@ +[tool.ty.environment] +extra-paths = ["tests/slo/src"] + +[tool.ruff] +line-length = 120 + [tool.black] line-length = 120 diff --git a/tests/slo/.gitignore b/tests/slo/.gitignore new file mode 100644 index 00000000..1ce13a4f --- /dev/null +++ b/tests/slo/.gitignore @@ -0,0 +1 @@ +.infra/ diff --git a/tests/slo/Dockerfile b/tests/slo/Dockerfile index 4f18a2b3..7c1990d5 100644 --- a/tests/slo/Dockerfile +++ b/tests/slo/Dockerfile @@ -1,38 +1,42 @@ # syntax=docker/dockerfile:1 # This image packages the Python SLO workload runner. -# It expects to be run with arguments like: -# docker run --rm table-run --otlp-endpoint http://prometheus:9090/api/v1/otlp/v1/metrics ... +# +# All settings are CLI arguments with env-var fallback (CLI arg > env var > default): +# endpoint $YDB_ENDPOINT grpc://ydb:2136 +# db $YDB_DATABASE /Root/testdb +# --workload-name $WORKLOAD_NAME sync-query +# --workload-ref $WORKLOAD_REF / $REF main +# --otlp-endpoint $OTEL_EXPORTER_OTLP_ENDPOINT http://ydb-prometheus:9090/api/v1/otlp +# --time $WORKLOAD_DURATION 600 +# +# Example: +# docker run --rm grpc://ydb:2136 /Root/testdb --workload-name topic --time 120 # # Notes: # - OpenTelemetry 1.39.x requires Python >= 3.9. -# - The entrypoint is `python ./tests/slo/src`, i.e. it runs the `__main__.py` -# from that directory (same as `python tests/slo/src ...` in CI). -FROM python:3.11-slim AS build +FROM python:3.11-slim ENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 -WORKDIR /src -COPY . /src - -# Install runtime deps into an isolated venv so we can copy it into the final stage. -RUN python -m venv /opt/venv \ - && /opt/venv/bin/python -m pip install --no-cache-dir --upgrade pip \ - && /opt/venv/bin/pip install --no-cache-dir . \ - && /opt/venv/bin/pip install --no-cache-dir -r tests/slo/requirements.txt +RUN apt-get update && apt-get install -y --no-install-recommends gcc libc6-dev && rm -rf /var/lib/apt/lists/* +WORKDIR /src -FROM python:3.11-slim +# 1. YDB SDK +COPY setup.py pyproject.toml README.md requirements.txt ./ +COPY ydb/ ydb/ +RUN pip install --no-cache-dir . -ENV PYTHONDONTWRITEBYTECODE=1 \ - PYTHONUNBUFFERED=1 \ - PATH="/opt/venv/bin:${PATH}" +# 2. SLO deps +COPY tests/slo/requirements.txt tests/slo/requirements.txt +RUN pip install --no-cache-dir -r tests/slo/requirements.txt -WORKDIR /app - -COPY --from=build /opt/venv /opt/venv -COPY --from=build /src/tests/slo/src /app/tests/slo/src +# 3. Workload source +COPY tests/slo/src /src/tests/slo/src ENTRYPOINT ["python", "./tests/slo/src"] + +CMD ["--read-rps", "1000", "--write-rps", "100"] diff --git a/tests/slo/README.md b/tests/slo/README.md index c7b3e6b8..d028a819 100644 --- a/tests/slo/README.md +++ b/tests/slo/README.md @@ -3,240 +3,178 @@ SLO is the type of test where app based on ydb-sdk is tested against falling YDB cluster nodes, tablets, network (that is possible situations for distributed DBs with hundreds of nodes) -### Workload types: +## Workload types -There are two workload types: +- **sync-query** — tests table operations via Query API (read/write) +- **sync-table** — tests table operations via Table API (read/write) +- **topic** — tests topic operations (publish/consume) -- **Table SLO** - tests table operations (read/write) -- **Topic SLO** - tests topic operations (publish/consume) +## Quick start (Docker Compose) -### Implementations: +The runner script handles everything: clones [ydb-slo-action](https://github.com/ydb-platform/ydb-slo-action/tree/v2/deploy) infra configs, builds the workload image, +starts YDB cluster + Prometheus via docker compose, runs the workload, and tears down on exit. -- `sync` -- `async` (now unimplemented) +```sh +cd tests/slo -### Usage: - -Each workload type has 3 commands: - -**Table commands:** -- `table-create` - creates table in database -- `table-cleanup` - drops table in database -- `table-run` - runs table workload (read and write to table with set RPS) - -**Topic commands:** -- `topic-create` - creates topic with consumer in database -- `topic-cleanup` - drops topic in database -- `topic-run` - runs topic workload (publish and consume messages with set RPS) - -### Infra (Docker Compose) - -SLO workload is designed to run **inside the Docker Compose network** so it can reach YDB/Prometheus by service DNS names without publishing ports to localhost. - -Infra compose configs are maintained in a separate repo: -- https://github.com/ydb-platform/ydb-slo-action/tree/main/deploy - -Expected setup: -- Start infra using `deploy/compose.yml` from `ydb-slo-action` -- Infra network name should be `ydb_cluster` -- Workload container attaches to that network - -Example infra start (from the `ydb-slo-action` repo root): -- `docker compose -f deploy/compose.yml --profile telemetry up -d --build` - -### Runner script (`tests/slo/slo_runner.sh`) - -This repo contains a simple maintainer convenience runner that: -1) builds the workload image -2) runs a basic SLO workload inside `ydb_cluster` - -It is intentionally minimal (not a complete interface for all workload options). For full control, use the commands in `tests/slo/src/` directly. - -Example usage (infra must already be running): -- `NETWORK_NAME=ydb_cluster ./tests/slo/slo_runner.sh` - -Defaults used by the runner (override via env vars): -- `NETWORK_NAME=ydb_cluster` -- `YDB_ENDPOINT=grpc://ydb-storage-1:2136` (also commonly works as `grpc://storage-1:2136`) -- `YDB_DATABASE=/Root/testdb` -- `OTLP_ENDPOINT=http://prometheus:9090/api/v1/otlp/v1/metrics` - -### Run examples with all arguments: - -You can also configure the OTLP endpoint via environment variable: -- `OTLP_ENDPOINT=http://ydb-prometheus:9090/api/v1/otlp/v1/metrics` (full OTLP metrics endpoint) - -**Table examples:** - -table-create: -`python tests/slo/src/ table-create localhost:2136 /local -t tableName ---min-partitions-count 6 --max-partitions-count 1000 --partition-size 1 -с 1000 ---write-timeout 10000` - -table-cleanup: -`python tests/slo/src/ table-cleanup localhost:2136 /local -t tableName` - -table-run: -`python tests/slo/src/ table-run localhost:2136 /local -t tableName ---otlp-endpoint http://ydb-prometheus:9090/api/v1/otlp/v1/metrics ---report-period 250 ---read-rps 1000 --read-timeout 10000 ---write-rps 100 --write-timeout 10000 ---time 600 --shutdown-time 30` - -**Topic examples:** - -topic-create: -`python tests/slo/src/ topic-create localhost:2136 /local ---topic-path /local/slo_topic --topic-consumer slo_consumer` - -topic-cleanup: -`python tests/slo/src/ topic-cleanup localhost:2136 /local --topic-path /local/slo_topic` - -topic-run: -`python tests/slo/src/ topic-run localhost:2136 /local ---topic-path /local/slo_topic --topic-consumer slo_consumer ---otlp-endpoint http://ydb-prometheus:9090/api/v1/otlp/v1/metrics ---report-period 250 ---topic-write-rps 50 --topic-read-rps 100 ---topic-write-timeout 5000 --topic-read-timeout 3000 ---time 600 --shutdown-time 30` - -## Arguments for commands: - -### table-create -`python tests/slo/src/ table-create [options]` +# Run topic workload (default) +WORKLOAD_NAME=topic ./slo_runner.sh +# Run table workload +WORKLOAD_NAME=sync-query ./slo_runner.sh ``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to -Options: - -t --table-name table name to create +## Local run (against your own YDB) - -p-min --min-partitions-count minimum amount of partitions in table - -p-max --max-partitions-count maximum amount of partitions in table - -p-size --partition-size partition size in mb +Start the playground cluster and run the workload directly with Python. +All examples run from `tests/slo/` directory with activated venv. - -c --initial-data-count amount of initially created rows - - --write-timeout write timeout milliseconds - - --batch-size amount of new records in each create request - --threads number of threads to use - -``` - -### table-cleanup -`python tests/slo/src/ table-cleanup [options]` +```sh +# Start playground YDB cluster +docker compose -f playground/configs/compose.yaml up -d +# Activate venv +source ../../.venv/bin/activate ``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to -Options: - -t --table-name table name to create +### Using CLI arguments + +```sh +# Topic workload — write only, 60 sec, debug logging +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name topic \ + --topic-path /Root/testdb/slo_topic \ + --otlp-endpoint "" \ + --write-rps 1 --write-threads 1 --read-threads 0 \ + --time 60 --debug + +# Topic workload — read + write +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name topic \ + --otlp-endpoint "" \ + --write-rps 5 --write-threads 2 --read-threads 2 --read-rps 10 \ + --time 120 + +# Table workload (sync-query) — default RPS +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name sync-query \ + --otlp-endpoint "" \ + --time 60 --debug + +# Table workload — high load +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name sync-query \ + --otlp-endpoint "" \ + --read-rps 500 --write-rps 100 --read-threads 8 --write-threads 4 \ + --time 300 ``` -### table-run -`python tests/slo/src/ table-run [options]` - +### Using environment variables + +```sh +# All settings via env vars +YDB_ENDPOINT=grpc://localhost:2136 \ +YDB_DATABASE=/Root/testdb \ +WORKLOAD_NAME=topic \ +WORKLOAD_DURATION=60 \ +OTEL_EXPORTER_OTLP_ENDPOINT="" \ + python ./src --debug + +# Mix: connection via env, tuning via args +YDB_ENDPOINT=grpc://localhost:2136 \ +YDB_DATABASE=/Root/testdb \ + python ./src --workload-name sync-query --otlp-endpoint "" \ + --read-rps 200 --write-rps 50 --time 120 ``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to -Options: - -t --table-name table name to create +### Tear down - --otlp-endpoint Prometheus OTLP metrics endpoint (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics) - --report-period metrics export period in milliseconds - - --read-rps read RPS - --read-timeout read timeout milliseconds - - --write-rps write RPS - --write-timeout write timeout milliseconds - - --time run time in seconds - --shutdown-time graceful shutdown time in seconds - - --read-threads number of threads to use for write requests - --write-threads number of threads to use for read requests +```sh +docker compose -f playground/configs/compose.yaml down ``` -### topic-create -`python tests/slo/src/ topic-create [options]` - -``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to - -Options: - --topic-path topic path to create - --topic-consumer consumer name - --topic-min-partitions minimum active partitions - --topic-max-partitions maximum active partitions - --topic-retention-hours retention period in hours -``` +### Configuration -### topic-cleanup -`python tests/slo/src/ topic-cleanup [options]` +Override defaults via environment variables: -``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to +| Variable | Default | Description | +|----------|---------|-------------| +| `WORKLOAD_NAME` | `topic` | Workload type: `sync-query`, `sync-table`, `topic` | +| `RUN_TIME_SEC` | `120` | Workload run time in seconds | +| `WRITE_RPS` | `1` | Write RPS | +| `READ_THREADS` | `0` | Read worker threads | +| `WRITE_THREADS` | `1` | Write worker threads | +| `MESSAGE_SIZE` | `100` | Topic message size in bytes | +| `REPORT_PERIOD_MS` | `1000` | Metrics flush period in ms | +| `DEBUG` | `0` | Set to `1` to enable debug logging | +| `WORKLOAD_IMAGE` | `ydb-python-slo:local` | Docker image name for the workload | -Options: - --topic-path topic path to drop -``` +## CLI arguments -### topic-run -`python tests/slo/src/ topic-run [options]` +The workload runs as a single command that creates resources, runs the workload, and cleans up. +Every flag supports a fallback chain: **CLI arg > environment variable > hardcoded default**. ``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to - -Options: - --topic-path topic path - --topic-consumer consumer name - - --otlp-endpoint Prometheus OTLP metrics endpoint (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics) - --report-period metrics export period in milliseconds - - --topic-read-rps read RPS for topics - --topic-read-timeout read timeout milliseconds for topics - --topic-write-rps write RPS for topics - --topic-write-timeout write timeout milliseconds for topics - - --topic-message-size message size in bytes - --topic-read-threads number of threads to use for read requests - --topic-write-threads number of threads to use for write requests - - --time run time in seconds - --shutdown-time graceful shutdown time in seconds +python tests/slo/src [endpoint] [db] [options] ``` -## Authentication - -Workload using [auth-env](https://ydb.yandex-team.ru/docs/reference/ydb-sdk/recipes/auth-env) for authentication. +### Connection & identity + +| Argument | Env var | Default | Description | +|----------|---------|---------|-------------| +| `endpoint` (positional) | `YDB_ENDPOINT` | `grpc://ydb:2136` | YDB endpoint | +| `db` (positional) | `YDB_DATABASE` | `/Root/testdb` | YDB database path | +| `--workload-name` | `WORKLOAD_NAME` | `sync-query` | Workload type | +| `--workload-ref` | `WORKLOAD_REF` / `REF` | `main` | Reference label for metrics | +| `--otlp-endpoint` | `OTEL_EXPORTER_OTLP_ENDPOINT` | `http://ydb-prometheus:9090/api/v1/otlp` | OTLP endpoint | +| `--time` | `WORKLOAD_DURATION` | `600` | Workload duration in seconds | +| `--debug` | — | `false` | Enable debug logging | + +### Run parameters + +| Argument | Default | Description | +|----------|---------|-------------| +| `--read-rps` | `100` | Read RPS limit | +| `--read-timeout` | `10000` | Read timeout in ms | +| `--write-rps` | `10` | Write RPS limit | +| `--write-timeout` | `20000` | Write timeout in ms | +| `--read-threads` | `8` | Read worker threads | +| `--write-threads` | `4` | Write worker threads | +| `--shutdown-time` | `10` | Graceful shutdown time in seconds | +| `--report-period` | `1000` | Metrics push period in ms | + +### Table parameters + +| Argument | Default | Description | +|----------|---------|-------------| +| `--table-name` | `key_value` | Table name | +| `--min-partitions-count` | `6` | Minimum partition count | +| `--max-partitions-count` | `1000` | Maximum partition count | +| `--partition-size` | `100` | Partition size in MB | +| `--initial-data-count` | `1000` | Rows to pre-fill | +| `--batch-size` | `100` | Rows per insert batch | +| `--threads` | `10` | Threads for initial data fill | + +### Topic parameters + +| Argument | Default | Description | +|----------|---------|-------------| +| `--topic-path` | `/Root/testdb/slo_topic` | Topic path | +| `--topic-consumer` | `slo_consumer` | Consumer name | +| `--topic-partitions` | `1` | Topic partition count | +| `--message-size` | `100` | Message size in bytes | ## What's inside ### Table workload -When running `table-run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`. -- `readJob` reads rows from the table one by one with random identifiers generated by writeJob -- `writeJob` generates and inserts rows -- `metricsJob` periodically sends metrics to Prometheus +Creates three jobs: `readJob`, `writeJob`, `metricsJob`. -Table have these fields: +- `readJob` — reads rows from the table with random identifiers +- `writeJob` — generates and inserts rows +- `metricsJob` — periodically sends metrics to Prometheus + +Table schema: - `object_id Uint64` - `object_hash Uint64 Digest::NumericHash(id)` - `payload_str UTF8` @@ -246,28 +184,19 @@ Table have these fields: Primary key: `("object_hash", "object_id")` ### Topic workload -When running `topic-run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`. -- `readJob` reads messages from topic using TopicReader and commits offsets -- `writeJob` generates and publishes messages to topic using TopicWriter -- `metricsJob` periodically sends metrics to Prometheus +Creates three jobs: `readJob`, `writeJob`, `metricsJob`. -Messages contain: -- Sequential message ID -- Thread identifier -- Configurable payload size (padded with 'x' characters) +- `readJob` — reads messages from topic using TopicReader and commits offsets +- `writeJob` — generates and publishes messages using TopicWriter +- `metricsJob` — periodically sends metrics to Prometheus ## Collected metrics -- `oks` - amount of OK requests -- `not_oks` - amount of not OK requests -- `inflight` - amount of requests in flight -- `latency` - summary of latencies in ms -- `attempts` - summary of amount for request - -Metrics are collected for both table operations (`read`, `write`) and topic operations (`read`, `write`). - -> Note: with Prometheus OTLP receiver (no Pushgateway) counters/histograms are cumulative and cannot be reset to `0`. -> If you need clean separation between runs, use distinct `REF`/`WORKLOAD` (and/or `SLO_INSTANCE_ID`) so each run writes into separate time series. -## Look at metrics in grafana -You can get dashboard used in that test [here](https://github.com/ydb-platform/slo-tests/blob/main/k8s/helms/grafana.yaml#L69) - you will need to import json into grafana. +- `sdk_operations_total` — total operations (labeled by `operation_status`: success/err) +- `sdk_errors_total` — errors by type +- `sdk_pending_operations` — in-flight operations +- `sdk_retry_attempts_total` — retry attempts +- `sdk_operation_latency_p50_seconds` — P50 latency +- `sdk_operation_latency_p95_seconds` — P95 latency +- `sdk_operation_latency_p99_seconds` — P99 latency diff --git a/tests/slo/requirements.txt b/tests/slo/requirements.txt index cd5cdfe1..d9021cc5 100644 --- a/tests/slo/requirements.txt +++ b/tests/slo/requirements.txt @@ -1,6 +1,6 @@ requests==2.33.0 aiolimiter==1.1.0 -quantile-estimator==0.1.2 +hdrhistogram==0.10.3 # OpenTelemetry (OTLP/HTTP exporter) # NOTE: OpenTelemetry 1.39.1 requires Python >= 3.9. diff --git a/tests/slo/slo_runner.sh b/tests/slo/slo_runner.sh index 9742ca2f..87d89fbb 100755 --- a/tests/slo/slo_runner.sh +++ b/tests/slo/slo_runner.sh @@ -4,20 +4,18 @@ set -euo pipefail # Local SLO runner. # # This script: -# 1) checks that the infra docker network exists (default: ydb_cluster) +# 1) clones / updates ydb-slo-action deploy configs # 2) builds the workload image -# 3) runs topic-create + topic-run inside that network +# 3) starts everything via docker compose (YDB + Prometheus + workload) +# 4) tears down on exit # -# Why it runs the workload as a container: -# - infra compose does not necessarily publish YDB/Prometheus ports to localhost -# - attaching to the compose network makes service discovery reliable (DNS) +# The workload runs as the "workload-current" service from the compose file, +# configured via WORKLOAD_CURRENT_IMAGE and WORKLOAD_CURRENT_COMMAND env vars. +# +# Infra configs: https://github.com/ydb-platform/ydb-slo-action/tree/v2/deploy # # Configuration (env vars): -# NETWORK_NAME : docker network to attach workload container to (default: ydb_cluster) -# YDB_ENDPOINT : grpc endpoint inside the network (default: grpc://ydb-storage-1:2136) -# YDB_DATABASE : database (default: /Root/testdb) -# TOPIC_PATH : topic path (default: /Root/testdb/slo_topic) -# OTLP_ENDPOINT : Prometheus OTLP receiver URL (default: http://prometheus:9090/api/v1/otlp/v1/metrics) +# WORKLOAD_NAME : workload type (default: topic) # RUN_TIME_SEC : workload run time seconds (default: 120) # WRITE_RPS : topic write rps (default: 1) # READ_THREADS : topic read threads (default: 0) @@ -29,12 +27,11 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" -NETWORK_NAME="${NETWORK_NAME:-ydb_cluster}" - -YDB_DATABASE="${YDB_DATABASE:-/Root/testdb}" -TOPIC_PATH="${TOPIC_PATH:-${YDB_DATABASE}/slo_topic}" +INFRA_DIR="${SCRIPT_DIR}/.infra" +INFRA_REPO="https://github.com/ydb-platform/ydb-slo-action.git" +INFRA_BRANCH="v2" -OTLP_ENDPOINT="${OTLP_ENDPOINT:-http://prometheus:9090/api/v1/otlp/v1/metrics}" +WORKLOAD_NAME="${WORKLOAD_NAME:-topic}" RUN_TIME_SEC="${RUN_TIME_SEC:-120}" WRITE_RPS="${WRITE_RPS:-1}" READ_THREADS="${READ_THREADS:-0}" @@ -45,74 +42,85 @@ DEBUG="${DEBUG:-0}" WORKLOAD_IMAGE="${WORKLOAD_IMAGE:-ydb-python-slo:local}" -# Infra configuration -# -# Infra is expected to be started separately (see https://github.com/ydb-platform/ydb-slo-action/tree/main/deploy). -# This runner only attaches the workload container to the existing docker network "${NETWORK_NAME}". -YDB_ENDPOINT="${YDB_ENDPOINT:-grpc://ydb-storage-1:2136}" +# --------------------------------------------------------------------------- +# Infra management (ydb-slo-action/v2/deploy) +# --------------------------------------------------------------------------- -ensure_network() { - if docker network inspect "${NETWORK_NAME}" >/dev/null 2>&1; then - return 0 +fetch_infra() { + if [[ -d "${INFRA_DIR}/.git" ]]; then + echo "[slo_runner] updating infra configs..." + git -C "${INFRA_DIR}" fetch origin "${INFRA_BRANCH}" --depth 1 --quiet + git -C "${INFRA_DIR}" -c advice.detachedHead=false checkout FETCH_HEAD --quiet + else + echo "[slo_runner] cloning infra configs (${INFRA_REPO} @ ${INFRA_BRANCH}, sparse: deploy/)..." + git clone --no-checkout --depth 1 --branch "${INFRA_BRANCH}" --filter=blob:none "${INFRA_REPO}" "${INFRA_DIR}" + git -C "${INFRA_DIR}" sparse-checkout init --cone + git -C "${INFRA_DIR}" sparse-checkout set deploy + git -C "${INFRA_DIR}" checkout "${INFRA_BRANCH}" fi - - echo "[slo_runner] docker network '${NETWORK_NAME}' not found." >&2 - echo "[slo_runner] Start infra and ensure it creates/uses this network, then re-run." >&2 - echo "[slo_runner] Infra configs: https://github.com/ydb-platform/ydb-slo-action/tree/main/deploy" >&2 - exit 2 } -workload_run() { - # Runs workload as a container attached to the infra compose network. - # Usage: - # workload_run - docker run --rm --network "${NETWORK_NAME}" "${WORKLOAD_IMAGE}" "$@" -} +COMPOSE_FILE="${INFRA_DIR}/deploy/compose.yml" build_workload_image() { - docker build -f "${REPO_ROOT}/tests/slo/Dockerfile" -t "${WORKLOAD_IMAGE}" "${REPO_ROOT}" + echo "[slo_runner] building workload image: ${WORKLOAD_IMAGE} ..." + docker build --platform linux/amd64 -f "${REPO_ROOT}/tests/slo/Dockerfile" -t "${WORKLOAD_IMAGE}" "${REPO_ROOT}" +} + +# --------------------------------------------------------------------------- +# Build workload command +# --------------------------------------------------------------------------- + +build_workload_command() { + local cmd=( + --workload-name "${WORKLOAD_NAME}" + --report-period "${REPORT_PERIOD_MS}" + --read-threads "${READ_THREADS}" + --write-threads "${WRITE_THREADS}" + --write-rps "${WRITE_RPS}" + --message-size "${MESSAGE_SIZE}" + --time "${RUN_TIME_SEC}" + ) + if [[ "${DEBUG}" == "1" ]]; then + cmd+=(--debug) + fi + echo "${cmd[*]}" } -echo "[slo_runner] repo root: ${REPO_ROOT}" -echo "[slo_runner] compose network: ${NETWORK_NAME}" -echo "[slo_runner] ydb endpoint: ${YDB_ENDPOINT}" -echo "[slo_runner] ydb db: ${YDB_DATABASE}" -echo "[slo_runner] topic path: ${TOPIC_PATH}" -echo "[slo_runner] otlp endpoint: ${OTLP_ENDPOINT}" -echo "[slo_runner] checking docker network: ${NETWORK_NAME}..." -ensure_network -echo "[slo_runner] building workload image: ${WORKLOAD_IMAGE} ..." +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +fetch_infra build_workload_image -echo "[slo_runner] topic-create..." -topic_create_args=( - topic-create - "${YDB_ENDPOINT}" - "${YDB_DATABASE}" - --path "${TOPIC_PATH}" -) -if [[ "${DEBUG}" == "1" ]]; then - topic_create_args+=(--debug) -fi -workload_run "${topic_create_args[@]}" - -echo "[slo_runner] topic-run..." -topic_run_args=( - topic-run - "${YDB_ENDPOINT}" - "${YDB_DATABASE}" - --path "${TOPIC_PATH}" - --otlp-endpoint "${OTLP_ENDPOINT}" - --report-period "${REPORT_PERIOD_MS}" - --read-threads "${READ_THREADS}" - --write-threads "${WRITE_THREADS}" - --write-rps "${WRITE_RPS}" - --message-size "${MESSAGE_SIZE}" - --time "${RUN_TIME_SEC}" -) -if [[ "${DEBUG}" == "1" ]]; then - topic_run_args+=(--debug) +echo "[slo_runner] starting infra + workload..." +echo "[slo_runner] workload image: ${WORKLOAD_IMAGE}" +echo "[slo_runner] workload name: ${WORKLOAD_NAME}" +echo "[slo_runner] run time: ${RUN_TIME_SEC}s" + +export WORKLOAD_CURRENT_IMAGE="${WORKLOAD_IMAGE}" +export WORKLOAD_CURRENT_COMMAND="$(build_workload_command)" +export WORKLOAD_NAME +export WORKLOAD_DURATION="${RUN_TIME_SEC}" + +COMPOSE="docker compose -f ${COMPOSE_FILE} --profile telemetry --profile workload-current" +trap '${COMPOSE} down' EXIT + +echo "[slo_runner] starting infra..." +${COMPOSE} up -d --wait + +prom_port=$(docker port ydb-prometheus 9090 2>/dev/null | head -1 || true) +if [[ -n "${prom_port}" ]]; then + echo "[slo_runner] prometheus: http://${prom_port}" fi -workload_run "${topic_run_args[@]}" -echo "[slo_runner] done" +echo "[slo_runner] waiting for workload to finish..." +${COMPOSE} logs -f workload-current & +LOGS_PID=$! + +exit_code=$(docker wait ydb-workload-current) + +kill "${LOGS_PID}" 2>/dev/null || true +echo "[slo_runner] workload exited with code ${exit_code}" +exit "${exit_code}" diff --git a/tests/slo/src/__main__.py b/tests/slo/src/__main__.py index dd1ae0b7..eb5504e0 100644 --- a/tests/slo/src/__main__.py +++ b/tests/slo/src/__main__.py @@ -2,8 +2,7 @@ import logging from options import parse_options -from root_runner import run_from_args - +from root_runner import run_all if __name__ == "__main__": args = parse_options() @@ -12,4 +11,4 @@ log_level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig(level=log_level, format="%(asctime)s %(levelname)-8s %(message)s") - run_from_args(args) + run_all(args) diff --git a/tests/slo/src/core/metrics.py b/tests/slo/src/core/metrics.py index bff90eda..fa151c2d 100644 --- a/tests/slo/src/core/metrics.py +++ b/tests/slo/src/core/metrics.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import threading import time from abc import ABC, abstractmethod from collections.abc import Iterable @@ -12,9 +13,6 @@ OP_TYPE_READ, OP_TYPE_WRITE = "read", "write" OP_STATUS_SUCCESS, OP_STATUS_FAILURE = "success", "err" -REF = environ.get("REF", "main") -WORKLOAD = environ.get("WORKLOAD", "sync-query") - logger = logging.getLogger(__name__) @@ -101,116 +99,112 @@ def push(self) -> None: class OtlpMetrics(BaseMetrics): """ - Canonical OpenTelemetry metrics implementation. - - This exports metrics via OTLP/HTTP to a Prometheus server with OTLP receiver enabled: - POST http(s)://:/api/v1/otlp/v1/metrics - - Naming notes: - - Metric names follow OpenTelemetry conventions (dot-separated namespaces, e.g. `sdk.operations.total`). - - Prometheus OTLP translation typically converts dots to underscores and may add suffixes like - `_total` for counters and `_bucket/_sum/_count` for histograms. + OpenTelemetry metrics implementation. + + Exports via OTLP/HTTP; the endpoint is configured through standard OTel env vars: + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics) + OTEL_EXPORTER_OTLP_ENDPOINT (fallback base URL) + OTEL_EXPORTER_OTLP_PROTOCOL (default: http/protobuf) + + Latency is tracked with an HDR histogram per (operation_type, operation_status) label + combination and published as three Gauge instruments: + sdk_operation_latency_p50_seconds + sdk_operation_latency_p95_seconds + sdk_operation_latency_p99_seconds """ - def __init__(self, otlp_metrics_endpoint: str): + # HDR histogram range: 1 µs … 60 s (in microseconds), 3 significant figures. + _HDR_MIN_US = 1 + _HDR_MAX_US = 60_000_000 + _HDR_SIG_FIGS = 3 + + def __init__(self, workload_name: str, workload_ref: str): from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( OTLPMetricExporter, ) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader - from opentelemetry.sdk.metrics.view import ( - ExplicitBucketHistogramAggregation, - View, - ) from opentelemetry.sdk.resources import Resource - # Resource attributes: Prometheus maps service.name -> job, service.instance.id -> instance. + self._workload_name = workload_name + self._workload_ref = workload_ref + resource = Resource.create( { - "service.name": f"workload-{WORKLOAD}", - "service.instance.id": environ.get("SLO_INSTANCE_ID", f"{REF}-{WORKLOAD}"), - "ref": REF, + "service.name": f"workload-{workload_name}", + "service.instance.id": environ.get("SLO_INSTANCE_ID", f"{workload_ref}-{workload_name}"), + "ref": workload_ref, "sdk": "ydb-python-sdk", "sdk_version": version("ydb"), - "workload": WORKLOAD, + "workload": workload_name, "workload_version": "0.0.0", } ) - exporter = OTLPMetricExporter(endpoint=otlp_metrics_endpoint) - reader = PeriodicExportingMetricReader(exporter) # we force_flush() explicitly in push() - - latency_view = View( - instrument_name="sdk.operation.latency", - aggregation=ExplicitBucketHistogramAggregation( - boundaries=( - 0.001, - 0.002, - 0.003, - 0.004, - 0.005, - 0.0075, - 0.010, - 0.020, - 0.050, - 0.100, - 0.200, - 0.500, - 1.000, - ) - ), - ) + # Endpoint is read automatically from OTEL_EXPORTER_OTLP_METRICS_ENDPOINT / + # OTEL_EXPORTER_OTLP_ENDPOINT by the exporter; no need to pass it explicitly. + exporter = OTLPMetricExporter() + reader = PeriodicExportingMetricReader(exporter) - self._provider = MeterProvider( - resource=resource, - metric_readers=[reader], - views=[latency_view], - ) + self._provider = MeterProvider(resource=resource, metric_readers=[reader]) self._meter = self._provider.get_meter("ydb-slo") - # Instruments (sync) + # Counters — names/labels match metrics.yaml in ydb-slo-action self._errors = self._meter.create_counter( name="sdk.errors.total", description="Total number of errors encountered, categorized by error type.", ) self._operations_total = self._meter.create_counter( name="sdk.operations.total", - description="Total number of operations, categorized by type attempted by the SDK.", - ) - self._operations_success_total = self._meter.create_counter( - name="sdk.operations.success.total", - description="Total number of successful operations, categorized by type.", + description="Total number of operations, labeled by operation_status (success/err).", ) - self._operations_failure_total = self._meter.create_counter( - name="sdk.operations.failure.total", - description="Total number of failed operations, categorized by type.", - ) - self._latency = self._meter.create_histogram( - name="sdk.operation.latency", - unit="s", - description="Latency of operations performed by the SDK in seconds, categorized by type and status.", + self._retry_attempts_total = self._meter.create_counter( + name="sdk.retry.attempts.total", + description="Total number of retry attempts.", ) - self._pending = self._meter.create_up_down_counter( name="sdk.pending.operations", - description="Current number of pending operations, categorized by type.", + description="Current number of pending operations.", ) - self._retry_attempts_total = self._meter.create_counter( - name="sdk.retry.attempts.total", - description="Total number of retry attempts, categorized by ref and operation type.", + # Latency gauges (fed from HDR histograms via push()) + self._latency_p50 = self._meter.create_gauge( + name="sdk_operation_latency_p50_seconds", + unit="s", + description="P50 operation latency in seconds.", + ) + self._latency_p95 = self._meter.create_gauge( + name="sdk_operation_latency_p95_seconds", + unit="s", + description="P95 operation latency in seconds.", + ) + self._latency_p99 = self._meter.create_gauge( + name="sdk_operation_latency_p99_seconds", + unit="s", + description="P99 operation latency in seconds.", ) + # HDR histograms: key → (operation_type, operation_status) + self._hdr_lock = threading.Lock() + self._hdr: dict = {} + self.reset() + def _hdr_for(self, key: tuple): + """Return (creating if necessary) an HDR histogram for the given label key.""" + from hdrh.histogram import HdrHistogram + + hist = self._hdr.get(key) + if hist is None: + hist = HdrHistogram(self._HDR_MIN_US, self._HDR_MAX_US, self._HDR_SIG_FIGS) + self._hdr[key] = hist + return hist + def start(self, labels) -> float: labels_t = _normalize_labels(labels) self._pending.add( 1, - attributes={ - "ref": REF, - "operation_type": labels_t[0], - }, + attributes={"ref": self._workload_ref, "operation_type": labels_t[0]}, ) return time.time() @@ -223,76 +217,67 @@ def stop( ) -> None: labels_t = _normalize_labels(labels) duration = time.time() - start_time + duration_us = max(self._HDR_MIN_US, int(duration * 1_000_000)) op_type = labels_t[0] - base_attrs = { - "ref": REF, - "operation_type": op_type, - } + op_status = OP_STATUS_SUCCESS if error is None else OP_STATUS_FAILURE + base_attrs = {"ref": self._workload_ref, "operation_type": op_type} - # Update instruments self._retry_attempts_total.add(int(attempts), attributes=base_attrs) self._pending.add(-1, attributes=base_attrs) - - # Counters + latency - self._operations_total.add(1, attributes=base_attrs) + self._operations_total.add(1, attributes={**base_attrs, "operation_status": op_status}) if error is not None: self._errors.add( 1, - attributes={ - **base_attrs, - "error_type": type(error).__name__, - }, + attributes={**base_attrs, "error_type": type(error).__name__}, ) - self._operations_failure_total.add(1, attributes=base_attrs) - self._latency.record( - duration, - attributes={ - **base_attrs, - "operation_status": OP_STATUS_FAILURE, - }, - ) - return - - self._operations_success_total.add(1, attributes=base_attrs) - self._latency.record( - duration, - attributes={ - **base_attrs, - "operation_status": OP_STATUS_SUCCESS, - }, - ) + + with self._hdr_lock: + self._hdr_for((op_type, op_status)).record_value(duration_us) def push(self) -> None: - # Metrics job calls push() with the cadence of --report-period. - # force_flush() makes the exporter send immediately. + with self._hdr_lock: + snapshot = {k: v for k, v in self._hdr.items()} + + for (op_type, op_status), hist in snapshot.items(): + attrs = { + "ref": self._workload_ref, + "operation_type": op_type, + "operation_status": op_status, + } + p50 = hist.get_value_at_percentile(50.0) / 1_000_000 + p95 = hist.get_value_at_percentile(95.0) / 1_000_000 + p99 = hist.get_value_at_percentile(99.0) / 1_000_000 + self._latency_p50.set(p50, attributes=attrs) + self._latency_p95.set(p95, attributes=attrs) + self._latency_p99.set(p99, attributes=attrs) + self._provider.force_flush() def reset(self) -> None: - # OpenTelemetry counters/histograms are cumulative and cannot be reset. - # Reset is implemented as an immediate push/flush. self.push() -def create_metrics(otlp_endpoint: Optional[str]) -> BaseMetrics: +def create_metrics(args) -> BaseMetrics: """ Factory used by SLO runners. - Metrics are enabled if either: - - OTLP_ENDPOINT env var is set, or - - `--otlp-endpoint` is provided (and non-empty) - - If endpoint is empty, metrics are disabled (DummyMetrics). + Uses args.otlp_endpoint, args.workload_name, args.workload_ref from parsed CLI arguments. + If the endpoint is empty, returns a no-op DummyMetrics. """ - endpoint = (environ.get("OTLP_ENDPOINT") or (otlp_endpoint or "")).strip() + endpoint = (args.otlp_endpoint or "").strip() + if not endpoint: - logger.info("Creating dummy metrics (metrics disabled)") + logger.info("OTLP endpoint not configured — metrics disabled") return DummyMetrics() - logger.info("Creating OTLP metrics exporter to Prometheus: %s", endpoint) + environ.setdefault("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf") + environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = endpoint + + logger.info("Creating OTLP metrics exporter (endpoint: %s)", endpoint) try: - return OtlpMetrics(endpoint) + return OtlpMetrics(args.workload_name, args.workload_ref) except Exception: - logger.exception("Failed to initialize OTLP metrics exporter; falling back to DummyMetrics") + logger.exception("Failed to initialize OTLP metrics — falling back to DummyMetrics") return DummyMetrics() diff --git a/tests/slo/src/jobs/async_topic_jobs.py b/tests/slo/src/jobs/async_topic_jobs.py index 92ae9fa4..03f10f30 100644 --- a/tests/slo/src/jobs/async_topic_jobs.py +++ b/tests/slo/src/jobs/async_topic_jobs.py @@ -120,10 +120,6 @@ async def _run_topic_reads(self, limiter: AsyncLimiter): logger.info("Stop async topic read workload") def _run_metric_job(self): - # Metrics are enabled only if an OTLP endpoint is provided (CLI: --otlp-endpoint). - if not getattr(self.args, "otlp_endpoint", None): - return [] - task = asyncio.create_task( self._async_metric_sender(self.args.time), name="slo_metrics_sender", diff --git a/tests/slo/src/jobs/base.py b/tests/slo/src/jobs/base.py index 32246fa7..9cc06598 100644 --- a/tests/slo/src/jobs/base.py +++ b/tests/slo/src/jobs/base.py @@ -2,6 +2,7 @@ import threading import time from abc import ABC, abstractmethod +from typing import Any import ydb @@ -53,10 +54,7 @@ def __init__(self, driver, args, metrics): def run_tests(self): pass - def _run_metric_job(self): - if not getattr(self.args, "otlp_endpoint", None): - return [] - + def _run_metric_job(self) -> list[Any]: report_period_ms = max(1, int(self.args.report_period)) limiter = SyncRateLimiter(min_interval_s=report_period_ms / 1000.0) diff --git a/tests/slo/src/jobs/table_jobs.py b/tests/slo/src/jobs/table_jobs.py index 2452c7b7..144fd686 100644 --- a/tests/slo/src/jobs/table_jobs.py +++ b/tests/slo/src/jobs/table_jobs.py @@ -92,9 +92,7 @@ def __init__(self, driver, args, metrics, table_name, max_id): self.table_name = table_name self.max_id = max_id - from core.metrics import WORKLOAD - - self.workload_type = WORKLOAD + self.workload_type = args.workload_name def run_tests(self): if self.workload_type == "sync-table": diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index 7082d086..c9f340c6 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -1,182 +1,89 @@ import argparse +import os -def add_common_options(parser): - parser.add_argument("endpoint", help="YDB endpoint") - parser.add_argument("db", help="YDB database name") - parser.add_argument("-t", "--table-name", default="key_value", help="Table name") - parser.add_argument("--debug", action="store_true", help="Enable debug logging") - parser.add_argument("--async", action="store_true", help="Use async mode for operations") - - -def make_table_create_parser(subparsers): - table_create_parser = subparsers.add_parser("table-create", help="Create tables and fill with initial content") - add_common_options(table_create_parser) +def parse_options(): + """ + Parse CLI arguments. - table_create_parser.add_argument( - "-p-min", - "--min-partitions-count", - default=6, - type=int, - help="Minimum amount of partitions in table", - ) - table_create_parser.add_argument( - "-p-max", - "--max-partitions-count", - default=1000, - type=int, - help="Maximum amount of partitions in table", - ) - table_create_parser.add_argument("-p-size", "--partition-size", default=100, type=int, help="Partition size [mb]") - table_create_parser.add_argument( - "-c", - "--initial-data-count", - default=1000, - type=int, - help="Total number of records to generate", + Every flag supports a fallback chain: CLI arg > environment variable > hardcoded default. + """ + parser = argparse.ArgumentParser( + description="YDB Python SLO workload", + formatter_class=argparse.RawDescriptionHelpFormatter, ) - table_create_parser.add_argument( - "--write-timeout", - default=20000, - type=int, - help="Write requests execution timeout [ms]", + parser.add_argument( + "endpoint", + nargs="?", + default=os.environ.get("YDB_ENDPOINT", "grpc://localhost:2136"), + help="YDB endpoint (default: $YDB_ENDPOINT)", ) - - table_create_parser.add_argument( - "--batch-size", - default=100, - type=int, - help="Number of new records in each create request", + parser.add_argument( + "db", + nargs="?", + default=os.environ.get("YDB_DATABASE", "/local"), + help="YDB database (default: $YDB_DATABASE)", ) - table_create_parser.add_argument("--threads", default=10, type=int, help="Number of threads to use") - - -def make_table_run_parser(subparsers): - table_run_parser = subparsers.add_parser("table-run", help="Run table SLO workload") - add_common_options(table_run_parser) - - table_run_parser.add_argument("--read-rps", default=100, type=int, help="Read request rps") - table_run_parser.add_argument( - "--read-timeout", - default=10000, + parser.add_argument( + "--time", + default=int(os.environ.get("WORKLOAD_DURATION", "600")), type=int, - help="Read requests execution timeout [ms]", + help="Workload duration in seconds (default: $WORKLOAD_DURATION or 600)", ) - - table_run_parser.add_argument("--write-rps", default=10, type=int, help="Write request rps") - table_run_parser.add_argument( - "--write-timeout", - default=20000, - type=int, - help="Write requests execution timeout [ms]", + parser.add_argument( + "--workload-name", + default=os.environ.get("WORKLOAD_NAME", os.environ.get("WORKLOAD", "sync-query")), + help="Workload type: sync-table, sync-query, topic (default: $WORKLOAD_NAME or sync-query)", ) - - table_run_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") - table_run_parser.add_argument( - "--shutdown-time", - default=10, - type=int, - help="Graceful shutdown time in seconds", + parser.add_argument( + "--workload-ref", + default=os.environ.get("WORKLOAD_REF", os.environ.get("REF", "main")), + help="Reference label for metrics (default: $WORKLOAD_REF or main)", ) - - table_run_parser.add_argument( + parser.add_argument( "--otlp-endpoint", - default="http://localhost:9090/api/v1/otlp/v1/metrics", - type=str, - help="Full Prometheus OTLP metrics endpoint (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics). Empty to disable.", + default=os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:9090/api/v1/otlp"), + help="OTLP endpoint (default: $OTEL_EXPORTER_OTLP_ENDPOINT or http://localhost:9090/api/v1/otlp)", ) - table_run_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") - - table_run_parser.add_argument("--read-threads", default=8, type=int, help="Number of threads to use for write") - table_run_parser.add_argument("--write-threads", default=4, type=int, help="Number of threads to use for read") - - -def make_table_cleanup_parser(subparsers): - table_cleanup_parser = subparsers.add_parser("table-cleanup", help="Drop tables") - add_common_options(table_cleanup_parser) - -def make_topic_create_parser(subparsers): - topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer") - add_common_options(topic_create_parser) - - topic_create_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") - topic_create_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name") - topic_create_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count") - - -def make_topic_run_parser(subparsers): - topic_parser = subparsers.add_parser("topic-run", help="Run topic SLO workload") - add_common_options(topic_parser) - - topic_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") - topic_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name") - topic_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count") - topic_parser.add_argument("--read-rps", default=100, type=int, help="Topic read request rps") - topic_parser.add_argument("--read-timeout", default=5000, type=int, help="Topic read timeout [ms]") - topic_parser.add_argument("--write-rps", default=100, type=int, help="Topic write request rps") - topic_parser.add_argument("--write-timeout", default=5000, type=int, help="Topic write timeout [ms]") - topic_parser.add_argument( - "--read-threads", - default=1, - type=int, - help="Number of threads for topic reading", - ) - topic_parser.add_argument( - "--write-threads", - default=1, - type=int, - help="Number of threads for topic writing", - ) - topic_parser.add_argument("--message-size", default=100, type=int, help="Topic message size in bytes") + parser.add_argument("--debug", action="store_true", help="Enable debug logging") - topic_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") - topic_parser.add_argument( - "--shutdown-time", - default=10, + # Table params + parser.add_argument("--table-name", default="key_value", help="Table name") + parser.add_argument("--min-partitions-count", default=6, type=int) + parser.add_argument("--max-partitions-count", default=1000, type=int) + parser.add_argument("--partition-size", default=100, type=int, help="Partition size [mb]") + parser.add_argument( + "--initial-data-count", + default=1000, type=int, - help="Graceful shutdown time in seconds", + help="Number of rows to pre-fill", ) - topic_parser.add_argument( - "--otlp-endpoint", - default="http://localhost:9090/api/v1/otlp/v1/metrics", - type=str, - help="Full Prometheus OTLP metrics endpoint (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics). Empty to disable.", - ) - topic_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") - - -def make_topic_cleanup_parser(subparsers): - topic_cleanup_parser = subparsers.add_parser("topic-cleanup", help="Drop topic") - add_common_options(topic_cleanup_parser) - - topic_cleanup_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") - - -def get_root_parser(): - parser = argparse.ArgumentParser( - formatter_class=argparse.RawDescriptionHelpFormatter, - description="YDB Python SLO application", - ) - - subparsers = parser.add_subparsers( - title="subcommands", - dest="subcommand", - help="List of subcommands", - ) - - make_table_create_parser(subparsers) - make_table_run_parser(subparsers) - make_table_cleanup_parser(subparsers) - - make_topic_create_parser(subparsers) - make_topic_run_parser(subparsers) - make_topic_cleanup_parser(subparsers) - - return parser - - -def parse_options(): - parser = get_root_parser() - return parser.parse_args() + parser.add_argument("--batch-size", default=100, type=int, help="Rows per insert batch") + parser.add_argument("--threads", default=10, type=int, help="Threads for initial data fill") + + # Run params + parser.add_argument("--read-rps", default=100, type=int, help="Read RPS limit") + parser.add_argument("--read-timeout", default=10000, type=int, help="Read timeout [ms]") + parser.add_argument("--write-rps", default=10, type=int, help="Write RPS limit") + parser.add_argument("--write-timeout", default=20000, type=int, help="Write timeout [ms]") + parser.add_argument("--read-threads", default=8, type=int, help="Read worker threads") + parser.add_argument("--write-threads", default=4, type=int, help="Write worker threads") + parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time [s]") + parser.add_argument("--report-period", default=1000, type=int, help="Metrics push period [ms]") + + # Topic params (used when --workload-name is 'topic') + parser.add_argument("--topic-path", default="/local/slo_topic", help="Topic path") + parser.add_argument("--topic-consumer", default="slo_consumer", help="Topic consumer name") + parser.add_argument("--topic-partitions", default=1, type=int, help="Topic partition count") + parser.add_argument("--message-size", default=100, type=int, help="Topic message size [bytes]") + + args = parser.parse_args() + + # Aliases used by topic runner + args.path = args.topic_path + args.consumer = args.topic_consumer + args.partitions_count = args.topic_partitions + + return args diff --git a/tests/slo/src/pyrightconfig.json b/tests/slo/src/pyrightconfig.json new file mode 100644 index 00000000..e2cad99c --- /dev/null +++ b/tests/slo/src/pyrightconfig.json @@ -0,0 +1,4 @@ +{ + "pythonVersion": "3.9", + "extraPaths": ["."] +} diff --git a/tests/slo/src/root_runner.py b/tests/slo/src/root_runner.py index 20589c14..7a8e2060 100644 --- a/tests/slo/src/root_runner.py +++ b/tests/slo/src/root_runner.py @@ -1,94 +1,53 @@ -import asyncio -import ydb -import ydb.aio import logging -from typing import Dict -from runners.topic_runner import TopicRunner from runners.table_runner import TableRunner -from runners.base import BaseRunner - -logger = logging.getLogger(__name__) +from runners.topic_runner import TopicRunner +import ydb +import ydb.aio -class SLORunner: - def __init__(self): - self.runners: Dict[str, type(BaseRunner)] = {} +logger = logging.getLogger(__name__) - def register_runner(self, prefix: str, runner_cls: type(BaseRunner)): - self.runners[prefix] = runner_cls +_RUNNERS = { + "sync-table": TableRunner, + "sync-query": TableRunner, + "topic": TopicRunner, +} - def run_command(self, args): - subcommand_parts = args.subcommand.split("-", 1) - if len(subcommand_parts) < 2: - raise ValueError(f"Invalid subcommand format: {args.subcommand}. Expected 'prefix-command'") - prefix, command = subcommand_parts - if prefix not in self.runners: - raise ValueError(f"Unknown prefix: {prefix}. Available: {list(self.runners.keys())}") +def _get_runner(workload_name: str): + runner_cls = _RUNNERS.get(workload_name) + if runner_cls is None: + raise ValueError(f"Unknown workload_name: {workload_name!r}. Known: {list(_RUNNERS)}") + return runner_cls() - runner_instance = self.runners[prefix]() - # Check if async mode is requested and command is 'run' - if getattr(args, "async", False) and command == "run": - asyncio.run(self._run_async_command(args, runner_instance, command)) - else: - self._run_sync_command(args, runner_instance, command) +def run_all(args): + """Create infrastructure, run the workload, then clean up — all in one go.""" + workload_name = args.workload_name + runner = _get_runner(workload_name) - def _run_sync_command(self, args, runner_instance, command): - """Run command in synchronous mode""" - driver_config = ydb.DriverConfig( - args.endpoint, - database=args.db, - grpc_keep_alive_timeout=5000, - ) + driver_config = ydb.DriverConfig( + args.endpoint, + database=args.db, + grpc_keep_alive_timeout=5000, + ) - with ydb.Driver(driver_config) as driver: - driver.wait(timeout=300) - try: - runner_instance.set_driver(driver) - if command == "create": - runner_instance.create(args) - elif command == "run": - runner_instance.run(args) - elif command == "cleanup": - runner_instance.cleanup(args) - else: - raise RuntimeError(f"Unknown command {command} for prefix {runner_instance.prefix}") - except BaseException: - logger.exception("Something went wrong") - raise - finally: - driver.stop(timeout=getattr(args, "shutdown_time", 10)) + with ydb.Driver(driver_config) as driver: + driver.wait(timeout=300) + runner.set_driver(driver) - async def _run_async_command(self, args, runner_instance, command): - """Run command in asynchronous mode""" - driver_config = ydb.DriverConfig( - args.endpoint, - database=args.db, - grpc_keep_alive_timeout=5000, - ) + try: + logger.info("[%s] Creating resources", workload_name) + runner.create(args) - async with ydb.aio.Driver(driver_config) as driver: - await driver.wait(timeout=300) + logger.info("[%s] Running workload for %d s", workload_name, args.time) + runner.run(args) + finally: + logger.info("[%s] Cleaning up resources", workload_name) try: - runner_instance.set_driver(driver) - if command == "run": - await runner_instance.run_async(args) - else: - raise RuntimeError(f"Async mode only supports 'run' command, got '{command}'") - except BaseException: - logger.exception("Something went wrong in async mode") - raise - - -def create_runner() -> SLORunner: - runner = SLORunner() - runner.register_runner("table", TableRunner) - runner.register_runner("topic", TopicRunner) - return runner - + runner.cleanup(args) + except Exception: + logger.exception("Cleanup failed — ignoring") -def run_from_args(args): - runner = create_runner() - runner.run_command(args) + driver.stop(timeout=args.shutdown_time) diff --git a/tests/slo/src/runners/table_runner.py b/tests/slo/src/runners/table_runner.py index fb6f00dc..628c4802 100644 --- a/tests/slo/src/runners/table_runner.py +++ b/tests/slo/src/runners/table_runner.py @@ -88,7 +88,7 @@ def transaction(session: ydb.table.Session): self.logger.info("Table creation completed") def run(self, args): - metrics = create_metrics(args.otlp_endpoint) + metrics = create_metrics(args) self.logger.info("Starting table SLO tests") diff --git a/tests/slo/src/runners/topic_runner.py b/tests/slo/src/runners/topic_runner.py index c9a8bdaa..a2800b42 100644 --- a/tests/slo/src/runners/topic_runner.py +++ b/tests/slo/src/runners/topic_runner.py @@ -70,7 +70,7 @@ def create(self, args): def run(self, args): assert self.driver is not None, "Driver is not initialized. Call set_driver() before run()." - metrics = create_metrics(args.otlp_endpoint) + metrics = create_metrics(args) self.logger.info("Starting topic SLO tests") @@ -85,7 +85,7 @@ def run(self, args): async def run_async(self, args): """Async version of topic SLO tests using ydb.aio.Driver""" assert self.driver is not None, "Driver is not initialized. Call set_driver() before run_async()." - metrics = create_metrics(args.otlp_endpoint) + metrics = create_metrics(args) self.logger.info("Starting async topic SLO tests")