From c62ab98b85c3c85197f8235f5d91dbd5b93a9382 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 22 May 2026 00:29:55 -0700 Subject: [PATCH 1/4] add /sub-issue, /parent-issue, and unlink variants to comment-commands --- .github/workflows/comment-commands.yml | 198 ++++++++++++++++++++++++- 1 file changed, 197 insertions(+), 1 deletion(-) diff --git a/.github/workflows/comment-commands.yml b/.github/workflows/comment-commands.yml index 3300db1353d..f8300380ecd 100644 --- a/.github/workflows/comment-commands.yml +++ b/.github/workflows/comment-commands.yml @@ -14,7 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -# /take, /untake, /request-review, and /unrequest-review comment commands. +# /take, /untake, /request-review, /unrequest-review, /sub-issue, +# /unsub-issue, /parent-issue, and /unparent-issue comment commands. # # Triage state is no longer materialized as a label — it is the search # filter `is:issue is:open no:assignee`. Anyone can self-claim an issue @@ -25,6 +26,14 @@ # via `/request-review @user [@user ...]` and `/unrequest-review @user # [@user ...]`. We avoid the `/review` namespace so it stays free for # future use (e.g. self-review). +# +# Sub-issue linking can be driven from either end of the relationship: +# `/sub-issue #N [#M ...]` on a parent links those issues as children; +# `/parent-issue #N` on a child sets #N as its parent. Unlinking mirrors +# this: `/unsub-issue #N [#M ...]` from the parent, `/unparent-issue` +# from the child (omit the number to auto-detect via GraphQL, or pass +# `/unparent-issue #N` to be explicit). Cross-repo links are not +# supported; references like `owner/repo#N` are ignored. name: Comment commands on: issue_comment: @@ -165,3 +174,190 @@ jobs: `${action} on #${pull_number} failed: ${e.message}`, ); } + + sub-issue: + # The sub-issue REST endpoints key off the issue's database `id`, so + # each #N reference needs a lookup before link/unlink. + if: >- + github.event_name == 'issue_comment' + && github.event.action == 'created' + && github.event.issue.pull_request == null + && github.event.comment.user.type != 'Bot' + && (startsWith(github.event.comment.body, '/sub-issue') + || startsWith(github.event.comment.body, '/unsub-issue') + || startsWith(github.event.comment.body, '/parent-issue') + || startsWith(github.event.comment.body, '/unparent-issue')) + runs-on: ubuntu-latest + steps: + - uses: actions/github-script@v8 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + const body = (context.payload.comment.body || '').trim(); + const issue_number = context.payload.issue.number; + const commenter = context.payload.comment.user.login; + const { owner, repo } = context.repo; + + // Longest alternatives first so `unsub-issue` isn't shadowed + // by `sub-issue`. + const match = body.match( + /^\/(unsub-issue|unparent-issue|sub-issue|parent-issue)\b(.*)$/s, + ); + if (!match) { + core.info(`Comment does not match exact command; skipping.`); + return; + } + const action = match[1]; + const rest = match[2]; + core.info( + `${action} candidate: ${commenter} on issue #${issue_number}; ` + + `body=${JSON.stringify(body)}`, + ); + + // Accept `#N` or bare `N`; cross-repo `owner/repo#N` is not + // supported by the sub-issue endpoint. + const refs = []; + for (const token of rest.split(/\s+/)) { + if (!token) continue; + if (token.includes('/')) { + core.warning(`Ignoring cross-repo reference '${token}'.`); + continue; + } + const m = token.match(/^#?(\d+)$/); + if (m) refs.push(Number(m[1])); + } + + async function getIssueId(number) { + const { data } = await github.rest.issues.get({ + owner, repo, issue_number: number, + }); + return data.id; + } + + async function getParentNumber(number) { + const query = ` + query($owner:String!, $name:String!, $number:Int!) { + repository(owner:$owner, name:$name) { + issue(number:$number) { parent { number } } + } + }`; + const result = await github.graphql(query, { + owner, name: repo, number, + }); + return result.repository.issue.parent?.number ?? null; + } + + async function linkChild(parent_number, child_number) { + const sub_issue_id = await getIssueId(child_number); + await github.request( + 'POST /repos/{owner}/{repo}/issues/{issue_number}/sub_issues', + { owner, repo, issue_number: parent_number, sub_issue_id }, + ); + } + + async function unlinkChild(parent_number, child_number) { + const sub_issue_id = await getIssueId(child_number); + await github.request( + 'DELETE /repos/{owner}/{repo}/issues/{issue_number}/sub_issue', + { owner, repo, issue_number: parent_number, sub_issue_id }, + ); + } + + if (action === 'sub-issue' || action === 'unsub-issue') { + if (!refs.length) { + core.warning(`No #N refs in '/${action}'; skipping.`); + return; + } + for (const n of refs) { + if (n === issue_number) { + core.warning( + `Refusing to self-link #${n}; skipping.`, + ); + continue; + } + try { + if (action === 'sub-issue') { + await linkChild(issue_number, n); + core.info( + `Linked #${n} as sub-issue of #${issue_number}`, + ); + } else { + await unlinkChild(issue_number, n); + core.info( + `Unlinked #${n} from sub-issues of #${issue_number}`, + ); + } + } catch (e) { + core.warning( + `${action} #${n} on #${issue_number} failed: ${e.message}`, + ); + } + } + return; + } + + if (action === 'parent-issue') { + if (refs.length !== 1) { + core.warning( + `/parent-issue expects exactly one #N; skipping.`, + ); + return; + } + const parent_number = refs[0]; + if (parent_number === issue_number) { + core.warning( + `Refusing to set #${issue_number} as its own parent; skipping.`, + ); + return; + } + try { + await linkChild(parent_number, issue_number); + core.info( + `Linked #${issue_number} as sub-issue of #${parent_number}`, + ); + } catch (e) { + core.warning( + `parent-issue #${parent_number} on #${issue_number} ` + + `failed: ${e.message}`, + ); + } + return; + } + + if (action === 'unparent-issue') { + if (refs.length > 1) { + core.warning( + `/unparent-issue accepts at most one #N; skipping.`, + ); + return; + } + let parent_number = refs[0]; + if (parent_number === undefined) { + try { + parent_number = await getParentNumber(issue_number); + } catch (e) { + core.warning( + `parent lookup for #${issue_number} failed: ${e.message}`, + ); + return; + } + if (parent_number == null) { + core.warning( + `#${issue_number} has no parent; skipping.`, + ); + return; + } + } + try { + await unlinkChild(parent_number, issue_number); + core.info( + `Unlinked #${issue_number} from parent #${parent_number}`, + ); + } catch (e) { + core.warning( + `unparent-issue on #${issue_number} (parent #${parent_number}) ` + + `failed: ${e.message}`, + ); + } + return; + } From 6449b50dfc1fb0aa99a2ef70493ec8842cc4209c Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 22 May 2026 01:32:38 -0700 Subject: [PATCH 2/4] drop withDefaultValue from StatisticsManager so checkpoint state round-trips --- .../worker/managers/StatisticsManager.scala | 10 +++++----- .../amber/engine/faulttolerance/CheckpointSpec.scala | 8 ++++++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala index 8ae0419f0a3..2d393e63254 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala @@ -30,11 +30,11 @@ import org.apache.texera.amber.engine.architecture.worker.statistics.{ import scala.collection.mutable class StatisticsManager { - // DataProcessor + // Plain maps (no withDefaultValue) so they survive Kryo round-trip. private val inputStatistics: mutable.Map[PortIdentity, (Long, Long)] = - mutable.Map.empty.withDefaultValue((0L, 0L)) + mutable.Map.empty private val outputStatistics: mutable.Map[PortIdentity, (Long, Long)] = - mutable.Map.empty.withDefaultValue((0L, 0L)) + mutable.Map.empty private var dataProcessingTime: Long = 0L private var totalExecutionTime: Long = 0L private var workerStartTime: Long = 0L @@ -82,7 +82,7 @@ class StatisticsManager { */ def increaseInputStatistics(portId: PortIdentity, size: Long): Unit = { require(size >= 0, "Tuple size must be non-negative") - val (count, totalSize) = inputStatistics(portId) + val (count, totalSize) = inputStatistics.getOrElse(portId, (0L, 0L)) inputStatistics.update(portId, (count + 1, totalSize + size)) } @@ -93,7 +93,7 @@ class StatisticsManager { */ def increaseOutputStatistics(portId: PortIdentity, size: Long): Unit = { require(size >= 0, "Tuple size must be non-negative") - val (count, totalSize) = outputStatistics(portId) + val (count, totalSize) = outputStatistics.getOrElse(portId, (0L, 0L)) outputStatistics.update(portId, (count + 1, totalSize + size)) } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala index fbc7e8044df..3d207fd23b3 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/faulttolerance/CheckpointSpec.scala @@ -63,7 +63,7 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { system.actorOf(Props[SingleNodeListener](), "cluster-info") } - "Default controller state" should "be serializable" in { + "Default controller state" should "round-trip through CheckpointState" in { val cp = new ControllerProcessor( workflow.context, @@ -73,9 +73,11 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { ) val chkpt = new CheckpointState() chkpt.save(CP_STATE_KEY, cp) + val restored: ControllerProcessor = chkpt.load(CP_STATE_KEY) + assert(restored.actorId == cp.actorId) } - "Default worker state" should "be serializable" in { + "Default worker state" should "round-trip through CheckpointState" in { val dp = new DataProcessor( SELF, msg => {}, @@ -83,6 +85,8 @@ class CheckpointSpec extends AnyFlatSpecLike with BeforeAndAfterAll { ) val chkpt = new CheckpointState() chkpt.save(DP_STATE_KEY, dp) + val restored: DataProcessor = chkpt.load(DP_STATE_KEY) + assert(restored.actorId == dp.actorId) } "CheckpointState" should "fail loudly on an unknown key" in { From aae8b1473db228afb1b277477f345aa39b04eeaa Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 22 May 2026 02:06:22 -0700 Subject: [PATCH 3/4] added test cases to complete code coverage --- .../worker/managers/StatisticsManager.scala | 12 ++++++++---- .../worker/managers/WorkerManagersSpec.scala | 14 +++++++++----- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala index 2d393e63254..c34595cf3bd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala @@ -82,8 +82,10 @@ class StatisticsManager { */ def increaseInputStatistics(portId: PortIdentity, size: Long): Unit = { require(size >= 0, "Tuple size must be non-negative") - val (count, totalSize) = inputStatistics.getOrElse(portId, (0L, 0L)) - inputStatistics.update(portId, (count + 1, totalSize + size)) + inputStatistics.updateWith(portId) { + case Some((count, totalSize)) => Some((count + 1, totalSize + size)) + case None => Some((1L, size)) + } } /** @@ -93,8 +95,10 @@ class StatisticsManager { */ def increaseOutputStatistics(portId: PortIdentity, size: Long): Unit = { require(size >= 0, "Tuple size must be non-negative") - val (count, totalSize) = outputStatistics.getOrElse(portId, (0L, 0L)) - outputStatistics.update(portId, (count + 1, totalSize + size)) + outputStatistics.updateWith(portId) { + case Some((count, totalSize)) => Some((count + 1, totalSize + size)) + case None => Some((1L, size)) + } } /** diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala index 3fbff39148c..1932823f5dc 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala @@ -76,11 +76,15 @@ class WorkerManagersSpec extends AnyFlatSpec { val sm = new StatisticsManager() sm.increaseOutputStatistics(PortIdentity(0), 30) sm.increaseOutputStatistics(PortIdentity(0), 70) - assert(sm.getOutputTupleCount == 2L) - val out = sm.getStatistics(nullExec).outputTupleMetrics - assert(out.size == 1) - assert(out.head.tupleMetrics.count == 2L) - assert(out.head.tupleMetrics.size == 100L) + sm.increaseOutputStatistics(PortIdentity(1), 25) + assert(sm.getOutputTupleCount == 3L) + val byPort = sm + .getStatistics(nullExec) + .outputTupleMetrics + .map(m => m.portId -> (m.tupleMetrics.count, m.tupleMetrics.size)) + .toMap + assert(byPort(PortIdentity(0)) == (2L, 100L)) + assert(byPort(PortIdentity(1)) == (1L, 25L)) } it should "reject negative tuple sizes" in { From 322b82d1bcb694df571e0f929eff201683d8d001 Mon Sep 17 00:00:00 2001 From: "Matthew B." Date: Fri, 22 May 2026 11:42:58 -0700 Subject: [PATCH 4/4] Add comment about DataProcessor in StatisticsManager Added a comment regarding DataProcessor in StatisticsManager. Signed-off-by: Matthew B. --- .../engine/architecture/worker/managers/StatisticsManager.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala index c34595cf3bd..ab46e17654c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala @@ -30,6 +30,7 @@ import org.apache.texera.amber.engine.architecture.worker.statistics.{ import scala.collection.mutable class StatisticsManager { + // DataProcessor // Plain maps (no withDefaultValue) so they survive Kryo round-trip. private val inputStatistics: mutable.Map[PortIdentity, (Long, Long)] = mutable.Map.empty