From c755f86a91b16deba39a286513607f8762a4e467 Mon Sep 17 00:00:00 2001 From: Greg Magolan Date: Wed, 20 May 2026 20:51:03 -0700 Subject: [PATCH 1/5] fix(github-status-comments): merge entries across concurrent workflows --- .../install-prebuilt-aspect-cli/action.yaml | 26 + .github/workflows/ci-github-runners.yaml | 95 ++++ .../{ci.yaml => ci-workflows-runners.yaml} | 4 +- .../aspect/feature/github_status_comments.axl | 450 ++++++++++++++---- .../feature/github_status_comments_test.axl | 322 +++++++++++++ .../src/builtins/aspect/lib/github.axl | 11 + 6 files changed, 822 insertions(+), 86 deletions(-) create mode 100644 .github/actions/install-prebuilt-aspect-cli/action.yaml create mode 100644 .github/workflows/ci-github-runners.yaml rename .github/workflows/{ci.yaml => ci-workflows-runners.yaml} (99%) diff --git a/.github/actions/install-prebuilt-aspect-cli/action.yaml b/.github/actions/install-prebuilt-aspect-cli/action.yaml new file mode 100644 index 000000000..8943c994b --- /dev/null +++ b/.github/actions/install-prebuilt-aspect-cli/action.yaml @@ -0,0 +1,26 @@ +name: Install prebuilt aspect-cli (from in-run artifact) +description: | + Internal-only — NOT a general-purpose aspect-cli installer. + + Downloads the `aspect-cli` artifact uploaded earlier in this workflow + run by the `pre-build` job and installs it on PATH as `aspect`. Used + by GitHub-hosted ephemeral runners that don't have remote-cache + access to rebuild the CLI from source efficiently. + + To install aspect-cli normally, see https://docs.aspect.build/cli. + +runs: + using: composite + steps: + - name: Download aspect-cli binary + uses: actions/download-artifact@v4 + with: + name: aspect-cli + path: ${{ runner.temp }}/aspect-bin + - name: Install on PATH + shell: bash + run: | + chmod +x "${RUNNER_TEMP}/aspect-bin/aspect-cli" + mkdir -p "$HOME/.local/bin" + mv "${RUNNER_TEMP}/aspect-bin/aspect-cli" "$HOME/.local/bin/aspect" + echo "$HOME/.local/bin" >> "$GITHUB_PATH" diff --git a/.github/workflows/ci-github-runners.yaml b/.github/workflows/ci-github-runners.yaml new file mode 100644 index 000000000..c67b16907 --- /dev/null +++ b/.github/workflows/ci-github-runners.yaml @@ -0,0 +1,95 @@ +name: CI - Aspect Workflows (GitHub runners) + +on: + push: + branches: [main] + pull_request: + branches: [main] + +permissions: + id-token: write + +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +# The Dash0 OTLP endpoint and bearer token feed `.aspect/config.axl`'s `ctx.telemetry.exporters.add(...)` +# call, which dogfoods the trace.event/trace.log pipeline against a real backend. +# Both are no-ops if a secret isn't configured (unset env var = empty string, +# config.axl skips registration). +env: + ASPECT_API_TOKEN: ${{ secrets.ASPECT_API_TOKEN }} + DASH0_ENDPOINT: ${{ secrets.DASH0_ENDPOINT }} + DASH0_TOKEN: ${{ secrets.DASH0_TOKEN }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + +jobs: + pre-build: + runs-on: [self-hosted, aspect-workflows, aspect-default] + timeout-minutes: 10 + steps: + - uses: actions/checkout@v6 + - name: Pre-build CLI + run: | + echo "Build and upload CLI to remote cache" + .aspect/bootstrap.sh + # Surface the just-built binary to the ephemeral (GitHub-hosted) jobs + # below. Those runners are small and don't have remote-cache access yet, + # so we re-use the binary that the self-hosted runner already built rather + # than rebuilding from source per ephemeral job. + - name: Upload aspect-cli binary + uses: actions/upload-artifact@v4 + with: + name: aspect-cli + path: target/ci/aspect-cli + retention-days: 1 + if-no-files-found: error + + # Dedicated buildifier-only step that runs in parallel with format-task. + # Models the legacy Rosetta `buildifier` task using a Starlark-only + # `format_multirun` target — see https://docs.aspect.build/cli/migration/buildifier. + buildifier-task: + runs-on: ubuntu-latest + needs: [pre-build] + timeout-minutes: 10 + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 2 + - uses: ./.github/actions/install-prebuilt-aspect-cli + - name: Buildifier Task + run: aspect format --task-key buildifier-gha-ephemeral --formatter-target=//tools/format:format-starlark + + format-task: + runs-on: ubuntu-latest + needs: [pre-build] + timeout-minutes: 10 + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 2 + - uses: ./.github/actions/install-prebuilt-aspect-cli + # Starlark patterns are handled by buildifier-task; ignore them here so + # the two steps don't do duplicate work. + - name: Format Task + run: | + aspect format --task-key format-gha-ephemeral \ + --ignore-pattern='**/*.axl' \ + --ignore-pattern='**/*.bzl' \ + --ignore-pattern='**/BUILD' \ + --ignore-pattern='**/BUILD.bazel' \ + --ignore-pattern='**/MODULE.bazel' \ + --ignore-pattern='**/WORKSPACE' \ + --ignore-pattern='**/WORKSPACE.bazel' + + gazelle-task: + runs-on: ubuntu-latest + needs: [pre-build] + timeout-minutes: 10 + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 2 + - uses: ./.github/actions/install-prebuilt-aspect-cli + - name: Gazelle Task + run: aspect gazelle --task-key gazelle-gha-ephemeral diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci-workflows-runners.yaml similarity index 99% rename from .github/workflows/ci.yaml rename to .github/workflows/ci-workflows-runners.yaml index d166b9f00..78212d767 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci-workflows-runners.yaml @@ -1,4 +1,5 @@ -name: CI +name: CI - Aspect Workflows (Workflows runners) + on: push: branches: [main] @@ -93,6 +94,7 @@ jobs: buildifier-task: runs-on: [self-hosted, aspect-workflows, aspect-default] needs: [pre-build] + timeout-minutes: 10 steps: - uses: actions/checkout@v6 with: diff --git a/crates/aspect-cli/src/builtins/aspect/feature/github_status_comments.axl b/crates/aspect-cli/src/builtins/aspect/feature/github_status_comments.axl index 35fb82d78..0924863aa 100644 --- a/crates/aspect-cli/src/builtins/aspect/feature/github_status_comments.axl +++ b/crates/aspect-cli/src/builtins/aspect/feature/github_status_comments.axl @@ -30,6 +30,7 @@ Body-template variables: entries — list of per-task entry dicts, alphabetical by (job, name, key). """ +load("@std//base64.axl", "base64") load("@std//time.axl", "monotonic", "time") load( "../lib/bazel_results.axl", @@ -71,6 +72,18 @@ load( _ARTIFACT_PREFIX = "aspect-ci-status-" _MARKER_FMT = "" +# Cross-workflow state block embedded in the rendered comment. +# +# The PR comment is the canonical source of truth across concurrent GitHub +# Actions workflows on the same PR — each workflow's artifact swarm can +# only see its own run_id's artifacts, so the comment carries the +# cross-workflow `workflow_runs` slot list, per-task `entries` snapshot, +# `head_sha`, and `last_upsert_epoch_s` as a base64-encoded JSON payload +# inside an HTML comment. base64 keeps the literal `-->` (HTML-comment +# terminator) out of the payload regardless of entry content. +_STATE_BLOCK_OPEN = "" + # Used by `_should_quiet_delete_log` — see that helper's docstring. _EVENTUAL_CONSISTENCY_WINDOW_S = 15 @@ -711,7 +724,7 @@ def _collect_tip_rows(ctx, entries): out.append(decorated) return out -def _render_body(ctx, marker, entries, started_at, body_template, status_badges, sections, last_updated_at = "", api_usage = ""): +def _render_body(ctx, marker, entries, started_at, body_template, status_badges, sections, last_updated_at = "", api_usage = "", state = None): """Render the comment body via the (possibly user-overridden) Jinja2 template. `entries` is expected to be the output of `_prepare_for_render` — @@ -748,7 +761,14 @@ def _render_body(ctx, marker, entries, started_at, body_template, status_badges, `tip_rows` is the deduped + attributed + severity-sorted output of `_collect_tip_rows`, with the cap applied so suggestion / info tips can't drown the section (important + warning always pass - through in full).""" + through in full). + + `state` — when set, an HTML-comment state block carrying the + cross-workflow state (head_sha, last_upsert_epoch_s, workflow_runs, + entries) is appended after the rendered body. The block is the + canonical cross-workflow source of truth read by other workflows + on the same PR. Pass `None` to omit (used by snapshot tests and + callers that don't need cross-workflow merge).""" repro_rows, fix_rows = _collect_repro_fix_rows(sections) tip_rows = _collect_tip_rows(ctx, entries) body = ctx.template.jinja2(body_template, data = { @@ -766,8 +786,151 @@ def _render_body(ctx, marker, entries, started_at, body_template, status_badges, }) if not body.endswith("\n"): body = body + "\n" + if state != None: + body = body + _serialize_state(state) + "\n" return body +def _serialize_state(state): + """Encode the cross-workflow state as a single-line HTML comment. + + Wraps a base64-encoded JSON payload so the literal `-->` HTML-comment + terminator can never appear inside the body — regardless of entry + string content. Round-tripped via `_parse_state`. + + `state` is a dict with `head_sha`, `last_upsert_epoch_s`, + `workflow_runs` (`[{run_id, workflow_name}, ...]`), and `entries` + (per-task snapshots used as TTL/race fallback). + """ + encoded = base64.encode(json.encode(state)) + return _STATE_BLOCK_OPEN + encoded + _STATE_BLOCK_CLOSE + +_BASE64_ALPHABET = {c: True for c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=".elems()} + +def _is_standard_base64(s): + """Cheap pre-flight check: every char in the standard base64 alphabet + (A-Z, a-z, 0-9, +, /, =) and the length is a multiple of 4. Lets + `_parse_state` return `None` for a malformed payload rather than + have `base64.decode` raise into the writer.""" + if not s or len(s) % 4 != 0: + return False + for c in s.elems(): + if c not in _BASE64_ALPHABET: + return False + return True + +def _parse_state(body): + """Extract the cross-workflow state block from a comment body, if present. + + Returns a dict with `head_sha`, `last_upsert_epoch_s`, + `workflow_runs`, and `entries` on success. Returns `None` when the + block is absent or malformed — the caller falls back to + current-workflow-only behavior in that case. + + Each field is shape-checked before use: a payload that's syntactically + valid base64-JSON but carries wrong types (e.g. `last_upsert_epoch_s` + as a string after a manual comment edit, or `entries` as a dict + instead of a list) is normalized to defaults rather than raising into + the caller. List fields are filtered to drop non-dict elements so + downstream merge/render can assume dict shape per row. + """ + if not body: + return None + start = body.find(_STATE_BLOCK_OPEN) + if start < 0: + return None + payload_start = start + len(_STATE_BLOCK_OPEN) + end = body.find(_STATE_BLOCK_CLOSE, payload_start) + if end < 0: + return None + encoded = body[payload_start:end] + if not _is_standard_base64(encoded): + return None + decoded = json.try_decode(str(base64.decode(encoded)), None) + if type(decoded) != "dict": + return None + head_sha = decoded.get("head_sha") + epoch = decoded.get("last_upsert_epoch_s") + workflow_runs = decoded.get("workflow_runs") + entries = decoded.get("entries") + return { + "head_sha": head_sha if type(head_sha) == "string" else "", + "last_upsert_epoch_s": epoch if type(epoch) == "int" else 0, + "workflow_runs": [r for r in workflow_runs if type(r) == "dict"] if type(workflow_runs) == "list" else [], + "entries": [e for e in entries if type(e) == "dict"] if type(entries) == "list" else [], + } + +def _merge_state(existing_state, head_sha, my_run_id, my_workflow_name, my_entries, refreshed_other_entries): + """Build the new state block to write to the comment. + + The hybrid cross-workflow model: + * `workflow_runs` is the list of `{run_id, workflow_name}` slots — + one per workflow active on this PR. Each writer overlays its own + slot, preserving sibling-workflow slots unchanged. Writes are + slot-independent → no cross-workflow data race. + * `entries` is the full per-task snapshot, kept for two reasons: + (a) fallback when a sibling workflow's artifacts are unreachable + (TTL expiry, transient API error); (b) terminal-state + persistence past the artifact retention window. + + Args: + existing_state: parsed state from the current comment body, or + `None` for a first-writer comment. + head_sha: PR head SHA for THIS push. A mismatch in + `existing_state.head_sha` discards everything — a new push + starts the comment fresh. + my_run_id, my_workflow_name: this writer's identity. + my_entries: this writer's swarm's authoritative entries. + refreshed_other_entries: `{run_id_str: [entry, ...]}` for sibling + workflows whose artifacts we successfully refreshed this cycle. + Missing run_ids fall back to the snapshot in `existing_state`. + + Returns a dict ready to feed to `_serialize_state` (caller stamps + `last_upsert_epoch_s`). + """ + + # Drop everything from a prior push. + if existing_state and existing_state.get("head_sha") != head_sha: + existing_state = None + + existing_runs = (existing_state or {}).get("workflow_runs", []) or [] + other_runs = [r for r in existing_runs if str(r.get("run_id", "")) != str(my_run_id)] + workflow_runs = other_runs + [{"run_id": str(my_run_id), "workflow_name": my_workflow_name}] + + # Snapshot fallback table for sibling workflows we couldn't refresh. + snapshot_by_run = {} + for e in ((existing_state or {}).get("entries", []) or []): + rid = str(e.get("workflow_run_id", "")) + snapshot_by_run.setdefault(rid, []).append(e) + + entries = list(my_entries) + for r in other_runs: + rid = str(r.get("run_id", "")) + refreshed = refreshed_other_entries.get(rid) + if refreshed != None: + entries.extend(refreshed) + else: + entries.extend(snapshot_by_run.get(rid, [])) + + # Deterministic ordering so the serialized state block is byte-stable + # across writers. Without this, publish ownership alternating between + # workflows would flip the order on each cycle (`others + [mine]` + # depends on which writer "mine" is), and `stable_body`'s + # unchanged-content short-circuit in `_publish` would never fire. + workflow_runs = sorted(workflow_runs, key = lambda r: str(r.get("run_id", ""))) + entries = sorted(entries, key = lambda e: ( + str(e.get("workflow_run_id", "")), + str(e.get("job", "")), + str(e.get("key", "")), + str(e.get("name", "")), + str(e.get("run_attempt", "")), + )) + + return { + "head_sha": head_sha, + "workflow_runs": workflow_runs, + "entries": entries, + } + def _github_status_comments(ctx: FeatureContext): lifecycle = ctx.traits[TaskLifecycleTrait] @@ -796,10 +959,12 @@ def _github_status_comments(ctx: FeatureContext): owner = pr["owner"] repo = pr["repo"] pr_number = pr["pr_number"] + head_sha = pr.get("sha") or "" marker = _MARKER_FMT % pr_number job_name = ctx.std.env.var("GITHUB_JOB") or "unknown-job" run_id = ctx.std.env.var("GITHUB_RUN_ID") or "" run_attempt = ctx.std.env.var("GITHUB_RUN_ATTEMPT") or "1" + workflow_name = ctx.std.env.var("GITHUB_WORKFLOW") or "" if not run_id: _LOG.info(ctx.std, "Missing GITHUB_RUN_ID; skipping.") return @@ -868,6 +1033,14 @@ def _github_status_comments(ctx: FeatureContext): "tips": [tip_to_payload(t) for t in collect_tips_sorted(ctx)], "rate_limit_observations": export_observations(ctx), "last_pr_comment_upsert_epoch_s": _state.get("_last_pr_comment_upsert_epoch_s", 0), + # Cross-workflow merge tags. The PR comment carries entries + # from every concurrent workflow on the same head_sha; these + # fields let `_merge_entries` distinguish our rows from those + # of a sibling workflow's swarm and drop rows from a prior + # push (when head_sha changes). + "workflow_run_id": run_id, + "workflow_name": workflow_name, + "head_sha": head_sha, } payload.update(_state.get("_results_subset") or _empty_results_subset()) @@ -910,13 +1083,15 @@ def _github_status_comments(ctx: FeatureContext): _state["_last_upload_at"] = monotonic() return True - def _collect_sibling_entries(ctx, token): - """List sibling artifacts and return one entry dict per logical task. + def _list_and_download_workflow_artifacts(ctx, token, target_run_id, is_own_swarm): + """Shared artifact-listing + download logic for both own-swarm and + cross-workflow discovery. - Per-artifact downloads are cached by artifact id; only new ids - cost a download. The artifact id changes on every replacement - (delete + upload), so a stable id means the producer hasn't - pushed a new payload since the last poll. + Per-artifact downloads are cached by artifact id under a + per-run_id cache slot; only new ids cost a download. Artifact + ids change on every replacement (delete + upload), so a stable + id means the producer hasn't pushed a new payload since the + last poll. Two duplicate-handling paths converge here: @@ -924,9 +1099,9 @@ def _github_status_comments(ctx: FeatureContext): artifact API is backed by Azure storage with eventual- consistency reads, so a listing can briefly include a previously-deleted artifact alongside the freshly-uploaded - one. We sort newest-first (by artifact id; ids are + one. Sort newest-first (by artifact id; ids are monotonically increasing per repo) and skip names already - in `seen`, so the newest payload always wins. + in `seen` — the newest payload always wins. - **Same logical `(name, key, job)` across run attempts.** The artifacts endpoint doesn't accept an `attempt_number` @@ -935,27 +1110,31 @@ def _github_status_comments(ctx: FeatureContext): `_dedup_by_run_attempt` collapses them to the latest attempt before returning. - Eager same-cycle re-cleanup of duplicates was considered and - rejected: the same eventual-consistency window means a - delete-after-upload may not see the just-uploaded artifact, - leaving a fresh ghost AND a stale one. The natural per-cycle - `delete_artifact + upload_artifact` in `_publish_self` - converges over 1-2 cycles once the consistency window closes; - the two dedup paths above ensure correct rendering in the - meantime. + When `is_own_swarm=True` the function additionally: + * substitutes our own payload from `_state` (avoids a + self-download round-trip), + * GCs cache entries whose artifacts were replaced or deleted, + * ingests sibling rate-limit observations into our merged + ring buffer (swarm-only — cross-workflow siblings share the + App-bucket quota by definition, but the swarm-coordination + model is workflow-scoped). Returns `None` on transport failure listing artifacts. """ - listed = github.actions.list_run_artifacts(ctx, token, owner, repo, run_id, name_prefix = _ARTIFACT_PREFIX) + listed = github.actions.list_run_artifacts(ctx, token, owner, repo, target_run_id, name_prefix = _ARTIFACT_PREFIX) if not listed.get("success"): - _LOG.warn(ctx.std, "List artifacts failed: %s" % listed.get("error", "unknown")) + _LOG.warn(ctx.std, "List artifacts failed (run_id=%s): %s" % (target_run_id, listed.get("error", "unknown"))) return None - cache = _state.setdefault("_entry_cache", {}) # artifact_id (str) -> entry dict - own_name = _state.get("_my_artifact_name", "") + # Per-run_id cache slot — different runs have disjoint artifact + # id spaces, so a shared cache would mis-GC cross-workflow + # entries against the own-swarm listing. + cache_key = "_entry_cache:" + str(target_run_id) + cache = _state.setdefault(cache_key, {}) + own_name = _state.get("_my_artifact_name", "") if is_own_swarm else "" workdir = _state["_workdir"] - seen = {} # artifact name -> entry - current_ids = {} # current ids in this listing → bool, used to GC cache + seen = {} + current_ids = {} sorted_artifacts = sorted(listed["artifacts"], key = lambda a: -(a.get("id") or 0)) for a in sorted_artifacts: @@ -966,8 +1145,6 @@ def _github_status_comments(ctx: FeatureContext): if aname in seen: continue - # We have authoritative data for our own task in _state — skip the - # download and use it directly. Avoids reading our own zip back. if aname == own_name: seen[aname] = _own_payload(ctx) continue @@ -994,31 +1171,54 @@ def _github_status_comments(ctx: FeatureContext): cache[aid] = entry seen[aname] = entry - # Swarm-share: fold this sibling's recent GitHub-API - # observations into our merged ring buffer. The artifact - # name (stable per `(job, task_key, run_attempt)`) is the - # `sibling_id`, so a sibling that re-uploads under the - # same logical identity overwrites its prior slice - # instead of accumulating. Producers that predate the - # share protocol leave the field absent → ingest no-ops. - ingest_sibling_observations( - ctx, - aname, - entry.get("rate_limit_observations", {}) or {}, - ) - - # GC cache entries whose artifacts were replaced or deleted. - for aid in list(cache.keys()): - if aid not in current_ids: - cache.pop(aid) + if is_own_swarm: + # The artifact name (stable per `(job, task_key, + # run_attempt)`) is the `sibling_id`, so a sibling that + # re-uploads under the same logical identity overwrites + # its prior slice instead of accumulating. Producers + # that predate the share protocol leave the field absent + # → ingest no-ops. + ingest_sibling_observations( + ctx, + aname, + entry.get("rate_limit_observations", {}) or {}, + ) + + if is_own_swarm: + for aid in list(cache.keys()): + if aid not in current_ids: + cache.pop(aid) return _dedup_by_run_attempt(list(seen.values())) + def _collect_sibling_entries(ctx, token): + """Discover entries from our own workflow run's artifacts. Thin + wrapper around `_list_and_download_workflow_artifacts` with the + own-swarm flag set.""" + return _list_and_download_workflow_artifacts(ctx, token, run_id, is_own_swarm = True) + + def _collect_other_workflow_entries(ctx, token, other_run_id): + """Discover entries from a sibling workflow's artifacts on the + same PR. Used to refresh data for workflows whose run_ids we + learned from the comment state block — eliminates the + read-then-write race that would otherwise let workflows clobber + each other's data. + + Returns `None` on API failure or empty listing (e.g. artifact + TTL expiry). Callers should fall back to the comment's snapshot + in that case so a finished workflow's terminal state survives + past its artifact retention window.""" + entries = _list_and_download_workflow_artifacts(ctx, token, other_run_id, is_own_swarm = False) + if entries == None: + return None + return entries if entries else None + def _upsert_comment(ctx, token, body): - """PATCH the existing comment if known/findable, else POST and - dedupe duplicates. The unchanged-content short-circuit lives - in `_publish` (gated on a timestamp-stripped diff); by the - time we reach here we're committing to an upsert. + """PATCH the existing comment when its id is known (set by + `_fetch_existing_comment_body`), else POST a fresh one. The + unchanged-content short-circuit lives in `_publish` (gated on a + timestamp-stripped diff); by the time we reach here we're + committing to an upsert. Returns `True` when the upsert hit GitHub successfully (so the caller can stamp `_last_pr_comment_upsert_epoch_s` and let the @@ -1032,22 +1232,6 @@ def _github_status_comments(ctx: FeatureContext): return False return True - listed = github.issues.list_comments(ctx, token, owner, repo, pr_number) - if not listed.get("success"): - _LOG.warn(ctx.std, "List comments failed: %s" % listed.get("error", "unknown")) - return False - for c in listed["comments"]: - if marker in (c.get("body") or ""): - _state["_comment_id"] = c["id"] - break - - if "_comment_id" in _state: - res = github.issues.update_comment(ctx, token, owner, repo, _state["_comment_id"], body) - if not res.get("success"): - _LOG.warn(ctx.std, "Update failed: %s" % res.get("error", "unknown")) - return False - return True - res = github.issues.create_comment(ctx, token, owner, repo, pr_number, body) if not res.get("success"): _LOG.warn(ctx.std, "Create failed: %s" % res.get("error", "unknown")) @@ -1069,6 +1253,41 @@ def _github_status_comments(ctx: FeatureContext): github.issues.delete_comment(ctx, token, owner, repo, d["id"]) return True + def _fetch_existing_comment_body(ctx, token): + """Read the PR's marker comment body so the upcoming render can + merge entries written by other concurrent workflows. + + Returns the body string (empty string when no marker comment + exists yet). Returns `None` on transient API failure so the caller + can skip this cycle rather than clobber the comment with a + merge-less render. + + Sets `_state["_comment_id"]` when the marker comment is found, so + `_upsert_comment`'s follow-on PATCH knows the id without re-listing. + """ + if "_comment_id" in _state: + res = github.issues.get_comment(ctx, token, owner, repo, _state["_comment_id"]) + if res.get("success"): + return res.get("body", "") or "" + + # If the comment was deleted out from under us, fall back to + # list_comments and re-establish. + if res.get("status_code") == 404: + _state.pop("_comment_id", None) + else: + _LOG.warn(ctx.std, "Get comment failed: %s" % res.get("error", "unknown")) + return None + listed = github.issues.list_comments(ctx, token, owner, repo, pr_number) + if not listed.get("success"): + _LOG.warn(ctx.std, "List comments failed: %s" % listed.get("error", "unknown")) + return None + for c in listed["comments"]: + body = c.get("body") or "" + if marker in body: + _state["_comment_id"] = c["id"] + return body + return "" + def _publish(ctx, final = False): token = _state.get("_github_token") if not token: @@ -1078,7 +1297,6 @@ def _github_status_comments(ctx: FeatureContext): raw_entries = _collect_sibling_entries(ctx, token) if raw_entries == None: return - prepared = _prepare_for_render(raw_entries) # Terminal-settle delay: when we're terminal and look likely to # be the last or second-to-last finisher, wait out the artifact @@ -1093,14 +1311,38 @@ def _github_status_comments(ctx: FeatureContext): # terminals skip the wait (the actual last task will catch up). if ( final and - len(prepared) > 1 and - _running_sibling_count(prepared) <= 1 + len(raw_entries) > 1 and + _running_sibling_count(_prepare_for_render(raw_entries)) <= 1 ): time.sleep(_TERMINAL_SETTLE_DELAY_MS) raw_entries = _collect_sibling_entries(ctx, token) if raw_entries == None: return - prepared = _prepare_for_render(raw_entries) + + # Cross-workflow merge — see `_merge_state` for the contract. + # Each writer fetches the current comment, refreshes every + # sibling workflow's entries directly from that workflow's + # artifacts, and overlays its own slot. Writes are + # slot-independent so workflow A's PATCH cannot clobber + # workflow B's data. + existing_body = _fetch_existing_comment_body(ctx, token) + if existing_body == None: + return + parsed = _parse_state(existing_body) + cross_workflow_lease_epoch = parsed["last_upsert_epoch_s"] if parsed else 0 + + refreshed_others = {} + if parsed and parsed.get("head_sha") == head_sha: + for r in parsed.get("workflow_runs", []) or []: + other_run_id = str(r.get("run_id") or "") + if not other_run_id or other_run_id == str(run_id): + continue + other_entries = _collect_other_workflow_entries(ctx, token, other_run_id) + if other_entries != None: + refreshed_others[other_run_id] = other_entries + + new_state = _merge_state(parsed, head_sha, run_id, workflow_name, raw_entries, refreshed_others) + prepared = _prepare_for_render(new_state["entries"]) sections = _bucket_entries(prepared) # Earliest start across all sibling tasks — formatted as the @@ -1112,11 +1354,18 @@ def _github_status_comments(ctx: FeatureContext): # Swarm coordination: every task uploads its own status # artifact (cheap, Twirp), but the quota-burning PR-comment # upsert (REST PATCH on the App bucket) is gated on a lease - # that any swarm member can hold. `lease_window_s` is fed - # `swarm_elapsed_s` (time since the earliest sibling started) - # and the merged-view quota factor, so every member computes - # the same window from swarm-shared inputs and converges on - # the same skip-or-publish answer. + # that any swarm member — across every workflow on the PR — can + # hold. `lease_window_s` is fed `swarm_elapsed_s` (time since + # the earliest sibling started) and the merged-view quota + # factor, so every member computes the same window from + # swarm-shared inputs and converges on the same skip-or-publish + # answer. + # + # The lease epoch is `max(state.last_upsert_epoch_s, swarm-local + # last_upsert_epoch_s)`: the state block carries the most-recent + # upsert from any workflow, while the swarm-local term covers + # the bootstrap window before any writer has stamped a state + # block on this PR. # # Terminal updates (`final=True`) bypass the lease: if this is # the last task to finish and a sibling published recently with @@ -1130,17 +1379,24 @@ def _github_status_comments(ctx: FeatureContext): swarm_elapsed_s = max(0, now_epoch - swarm_started_epoch) factor = effective_throttle_factor(ctx, BUCKET_APP, swarm_elapsed_s, min_poll_interval_seconds) lease_window_s = min_poll_interval_seconds * factor - if not final and _lease_held(now_epoch, _max_recent_upsert_epoch(prepared), lease_window_s): + last_publish_epoch = max(cross_workflow_lease_epoch, _max_recent_upsert_epoch(prepared)) + if not final and _lease_held(now_epoch, last_publish_epoch, lease_window_s): return # Two renders per cycle: - # 1. `stable_body` — `last_updated_at` and `api_usage` zeroed - # out — used solely for the unchanged-content short-circuit. - # Without this, the always-fresh `Last updated` timestamp - # and continually-burning quota numbers would diff on every - # poll and force a PATCH even when no task state changed. - # 2. `final_body` — the same content with live timestamp and - # quota footer, used for the actual upsert. + # 1. `stable_body` — `last_updated_at`, `api_usage`, and the + # state block's `last_upsert_epoch_s` zeroed out — used + # solely for the unchanged-content short-circuit. Without + # this, the always-fresh `Last updated` timestamp, quota + # footer, and lease epoch would diff every poll and force a + # PATCH even when no task state changed. The state block's + # `entries` and `workflow_runs` ARE included in the diff so a + # new sibling workflow's row appearing properly invalidates + # it. + # 2. `final_body` — the same content with live timestamp, + # quota footer, and lease-stamped state block. + stable_state = dict(new_state) + stable_state["last_upsert_epoch_s"] = 0 stable_body = _render_body( ctx, marker, @@ -1149,9 +1405,12 @@ def _github_status_comments(ctx: FeatureContext): body_template, status_badges, sections, + state = stable_state, ) if _state.get("_last_stable_body") == stable_body: return + final_state = dict(new_state) + final_state["last_upsert_epoch_s"] = now_epoch final_body = _render_body( ctx, marker, @@ -1162,6 +1421,7 @@ def _github_status_comments(ctx: FeatureContext): sections, last_updated_at = format_run_date(ctx, now_ms(ctx)), api_usage = usage_footer(ctx), + state = final_state, ) if not _upsert_comment(ctx, token, final_body): # Skip stamping the lease — the swarm should retry rather @@ -1170,9 +1430,8 @@ def _github_status_comments(ctx: FeatureContext): _state["_last_stable_body"] = stable_body # Siblings see this epoch on our next `_publish_self` upload - # (one outer cycle ≈ 10s later). Within that window a - # fast-waking sibling may double-publish; cheaper than paying - # for an extra Twirp upload to propagate sooner. + # (one outer cycle ≈ 10s later). Cross-workflow siblings see it + # immediately via the state block we just PATCHed. _state["_last_pr_comment_upsert_epoch_s"] = now_epoch def _task_started(ctx, subject): @@ -1329,9 +1588,30 @@ def should_quiet_delete_log(last_upload_at, now): """Public test hook; see `_should_quiet_delete_log`.""" return _should_quiet_delete_log(last_upload_at, now) -def render_body(ctx, marker, entries, started_at, body_template, status_badges, sections): +def render_body(ctx, marker, entries, started_at, body_template, status_badges, sections, state = None): """Public test hook; see `_render_body`.""" - return _render_body(ctx, marker, entries, started_at, body_template, status_badges, sections) + return _render_body( + ctx, + marker, + entries, + started_at, + body_template, + status_badges, + sections, + state = state, + ) + +def serialize_state(state): + """Public test hook; see `_serialize_state`.""" + return _serialize_state(state) + +def parse_state(body): + """Public test hook; see `_parse_state`.""" + return _parse_state(body) + +def merge_state(existing_state, head_sha, my_run_id, my_workflow_name, my_entries, refreshed_other_entries): + """Public test hook; see `_merge_state`.""" + return _merge_state(existing_state, head_sha, my_run_id, my_workflow_name, my_entries, refreshed_other_entries) def lease_held(now_epoch, last_upsert_epoch, lease_window_s): """Public test hook; see `_lease_held`.""" diff --git a/crates/aspect-cli/src/builtins/aspect/feature/github_status_comments_test.axl b/crates/aspect-cli/src/builtins/aspect/feature/github_status_comments_test.axl index febe20b15..9850fdc15 100644 --- a/crates/aspect-cli/src/builtins/aspect/feature/github_status_comments_test.axl +++ b/crates/aspect-cli/src/builtins/aspect/feature/github_status_comments_test.axl @@ -33,9 +33,12 @@ load( "dedup_by_run_attempt", "lease_held", "max_recent_upsert_epoch", + "merge_state", + "parse_state", "prepare_for_render", "render_body", "results_subset", + "serialize_state", "should_quiet_delete_log", ) load( @@ -600,6 +603,309 @@ def _check_max_recent_upsert_epoch(): 300, ) +def _check_state_serialize_round_trip(): + """serialize_state → parse_state round-trips head_sha, lease epoch, + workflow_runs, and entries.""" + state = { + "head_sha": "abc", + "last_upsert_epoch_s": 1700000000, + "workflow_runs": [ + {"run_id": "100", "workflow_name": "ci-gha"}, + {"run_id": "200", "workflow_name": "ci-workflows"}, + ], + "entries": [ + {"name": "build", "key": "build", "workflow_run_id": "100"}, + {"name": "lint", "key": "lint", "workflow_run_id": "200"}, + ], + } + block = serialize_state(state) + parsed = parse_state("some body content\n\n" + block + "\n") + if parsed == None: + fail("parse_state returned None for a round-tripped block") + _eq("round-trip head_sha", parsed["head_sha"], "abc") + _eq("round-trip last_upsert_epoch_s", parsed["last_upsert_epoch_s"], 1700000000) + _eq("round-trip workflow_runs", parsed["workflow_runs"], state["workflow_runs"]) + _eq("round-trip entries", parsed["entries"], state["entries"]) + +def _check_parse_state_absent(): + """parse_state returns None when the body has no state block — + callers fall back to swarm-only behavior.""" + _eq("parse_state — empty body", parse_state(""), None) + _eq("parse_state — body without marker", parse_state("# heading\n\nsome content\n"), None) + +def _check_parse_state_malformed(): + """parse_state returns None on a malformed block — payload not + valid base64-of-JSON. We never want to crash the writer over a + corrupted comment body.""" + bad = "" + _eq("parse_state — malformed payload", parse_state(bad), None) + +def _check_parse_state_missing_terminator(): + """Block-open token without the matching close → no parse, no crash.""" + bad = "