Skip to content

<feature>[thread]: support coalesce queue for batch dhcp (ZSTAC-83039)#3447

Open
zstack-robot-1 wants to merge 1 commit into5.5.12from
sync/jin.ma/fix/ZSTAC-83039
Open

<feature>[thread]: support coalesce queue for batch dhcp (ZSTAC-83039)#3447
zstack-robot-1 wants to merge 1 commit into5.5.12from
sync/jin.ma/fix/ZSTAC-83039

Conversation

@zstack-robot-1
Copy link
Collaborator

Summary

  • 新增 CoalesceQueue 框架,支持将同一 syncSignature 的请求自动合并为一次 batch 执行
  • FlatDhcpBackend 使用 CoalesceQueue,将同一物理机上的多个 DHCP apply 请求合并为单次 batch 调用
  • 从 3.10.33 cherry-pick (37efe64)

Jira

ZSTAC-83039

sync from gitlab !9305

@coderabbitai
Copy link

coderabbitai bot commented Mar 10, 2026

Walkthrough

新增按签名合并请求的通用队列实现与两种适配器(有/无返回值),在 FlatDhcpBackend 中按主机合并 DHCP 应用请求;新增相关单元与集成测试;移除 AbstractZone 中私有静态 DatabaseFacade 的静态初始化字段。

Changes

Cohort / File(s) Summary
合并队列核心
core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java, core/src/main/java/org/zstack/core/thread/CoalesceQueue.java, core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java
新增通用按签名合并队列实现及两种适配器(无返回值/有返回值),包含 PendingRequest、SignatureQueue、批次提交与成功/失败通知逻辑与扩展点。
DHCP 后端集成
plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java
新增 DhcpApplyRequestDhcpApplyQueue(基于 CoalesceQueue),将 applyDhcpService 改为提交到合并队列,按 hostUuid 合并 DhcpInfo 并聚合 rebuild 标志。
集成测试:合并队列
test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy
新增 CoalesceQueueCase,覆盖多签名隔离、批次合并、批次失败传播、返回值映射、结果计算异常、顺序批次与高并发场景的测试。
集成测试:DHCP 批处理验证
test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy
新增 testBatchStartVmApplyDhcp(),通过并发/阶段性启动 VM 验证 DHCP 批处理的分批到达、token 与主机名一致性等并发场景断言。
测试辅助类
testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java
新增用于测试的 FailCoalesceQueue,在 executeBatch 中始终抛出异常以触发失败路径。
代码清理
compute/src/main/java/org/zstack/compute/zone/AbstractZone.java
移除私有静态字段 DatabaseFacade dbf 的静态初始化赋值(删除通过 Platform.getComponentLoader() 获取的初始化)。

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant CoalesceQueue as CoalesceQueue/ReturnValueCoalesceQueue
    participant SignatureQueue as SignatureQueue
    participant ThreadFacade
    participant BatchExecutor as executeBatch()
    participant Completion

    Client->>CoalesceQueue: submitRequest(signature, item, completion)
    CoalesceQueue->>SignatureQueue: enqueue(item, completion)
    alt 首次为该签名提交
        SignatureQueue->>ThreadFacade: submit ChainTask
    end
    ThreadFacade->>BatchExecutor: 触发批处理任务
    BatchExecutor->>SignatureQueue: takeAll items
    BatchExecutor->>BatchExecutor: 创建 batchCompletion 并调用 executeBatch(items, batchCompletion)
    alt 批次成功
        BatchExecutor->>BatchExecutor: per-item calculateResult -> 调用各请求 success 回调
    else 批次失败或异常
        BatchExecutor->>Completion: fail(errorCode) -> 通知所有请求失败
    end
    Completion->>Client: 每个请求的 Completion/ReturnValueCompletion 被回调
Loading

Estimated code review effort

🎯 4 (复杂) | ⏱️ ~50 分钟

Poem

🐰 耳朵竖起数请求,签名一聚成队列,
批次并行轻又稳,失败成功各回馈。
DHCP 有序过门槛,测试守望不迷糊,
小兔笑看代码变,新章悄然在枝头。

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 9.52% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed 标题完全符合规范要求,遵循 [scope]: 格式,包含特性前缀和JIRA键,长度为70个字符,不超过72字符限制。
Description check ✅ Passed 描述清晰关联到变更集,详细说明了引入CoalesceQueue框架和在FlatDhcpBackend中的应用,包括来源信息和JIRA编号。

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch sync/jin.ma/fix/ZSTAC-83039

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.41.0)
plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java

[]


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java (1)

24-27: 缺少 @SuppressWarnings 注解

Line 26 的类型转换 (ReturnValueCompletion<R>) batchCompletion 会产生编译器警告。虽然这个转换是安全的(因为 createBatchCompletion 返回的就是 ReturnValueCompletion<R>),但建议添加 @SuppressWarnings("unchecked") 注解以明确意图并消除警告。

♻️ 建议添加注解
 `@Override`
+@SuppressWarnings("unchecked")
 protected final void executeBatch(List<T> items, AbstractCompletion batchCompletion) {
     executeBatch(items, (ReturnValueCompletion<R>) batchCompletion);
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java`
around lines 24 - 27, The unchecked cast in
ReturnValueCoalesceQueue.executeBatch(List<T> items, AbstractCompletion
batchCompletion) causes a compiler warning; add `@SuppressWarnings`("unchecked")
to that method (or immediately above the cast) to document and suppress the safe
cast to (ReturnValueCompletion<R>) batchCompletion so the warning is removed;
reference the executeBatch override and the cast to ReturnValueCompletion<R> in
your change.
test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy (2)

26-27: 未使用的导入

AtomicInteger 在此文件中未被使用,可以移除。

♻️ 建议移除未使用的导入
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy`
around lines 26 - 27, Remove the unused import
"java.util.concurrent.atomic.AtomicInteger" from
VerifyPrepareDhcpWhenReconnectHostCase.groovy (delete the import line), verify
there are no remaining references to AtomicInteger in that file, and re-run
tests/build to ensure no compile errors.

287-292: 冗余的重复断言

Line 287-292 连续两次 retryInSecs 检查相同的条件(batchCmds.size() == 2),这是不必要的重复。

♻️ 建议移除冗余断言
-       retryInSecs(5) {
-           assert batchCmds.size() == 2
-       }
-       retryInSecs(2) {
+       retryInSecs(5) {
            assert batchCmds.size() == 2
        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy`
around lines 287 - 292, Remove the redundant repeated assertion block: there are
two consecutive retryInSecs { assert batchCmds.size() == 2 } checks; keep a
single retryInSecs(...) { assert batchCmds.size() == 2 } and delete the
duplicate to avoid unnecessary repetition (look for the retryInSecs closure
referencing batchCmds.size() in VerifyPrepareDhcpWhenReconnectHostCase).
core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java (1)

70-91: SignatureQueue 中存在冗余的同步机制

pendingList 使用了 Collections.synchronizedList 包装,但 SignatureQueue 的方法(takeAlladdisEmpty)又都标记为 synchronized。这造成了双重同步,虽然不影响正确性,但会带来不必要的性能开销。

建议选择其一:要么移除 Collections.synchronizedList,要么移除方法上的 synchronized 关键字。

♻️ 建议移除 Collections.synchronizedList
 private class SignatureQueue {
     final String syncSignature;
-    List<PendingRequest> pendingList = Collections.synchronizedList(new ArrayList<>());
+    List<PendingRequest> pendingList = new ArrayList<>();

     SignatureQueue(String syncSignature) {
         this.syncSignature = syncSignature;
     }

     synchronized List<PendingRequest> takeAll() {
         List<PendingRequest> toProcess = pendingList;
-        pendingList = Collections.synchronizedList(new ArrayList<>());
+        pendingList = new ArrayList<>();
         return toProcess;
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java` around
lines 70 - 91, SignatureQueue currently double-synchronizes pendingList by
wrapping it with Collections.synchronizedList while also synchronizing its
methods (takeAll, add, isEmpty); remove the redundant wrapper by initializing
pendingList as a plain ArrayList (e.g., new ArrayList<>()) and keep the
method-level synchronized declarations on takeAll, add and isEmpty so the queue
synchronization is handled only by the instance monitor; ensure takeAll still
swaps pendingList with a new ArrayList when returning the old list.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java`:
- Around line 126-136: DhcpApplyRequest currently stores the incoming dhcpInfos
reference directly which can be mutated by callers after construction; modify
the DhcpApplyRequest constructor to defensively copy the list (e.g., create a
new ArrayList from the passed dhcpInfos and assign that to this.dhcpInfos) and
consider wrapping it with Collections.unmodifiableList if immutability is
desired so the internal state of the DhcpApplyRequest (class DhcpApplyRequest,
constructor DhcpApplyRequest(String hostUuid, List<DhcpInfo> dhcpInfos, boolean
rebuild)) cannot be changed by external code after creation.

In
`@test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy`:
- Around line 343-431: The test deadlocks because the first batch blocks in
executeBatch (firstBatchContinue.await()) preventing the second batch from
starting; fix by releasing firstBatchContinue earlier so the first ChainTask can
complete and allow the second batch to begin: after submitting the (1..<4) items
to queue.submit(signature, idx, ...) call firstBatchContinue.countDown() before
asserting secondBatchStart.await(5, TimeUnit.SECONDS); keep the rest of the
synchronization (secondBatchContinue, allComplete, and assertions on batches)
unchanged.

---

Nitpick comments:
In `@core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java`:
- Around line 70-91: SignatureQueue currently double-synchronizes pendingList by
wrapping it with Collections.synchronizedList while also synchronizing its
methods (takeAll, add, isEmpty); remove the redundant wrapper by initializing
pendingList as a plain ArrayList (e.g., new ArrayList<>()) and keep the
method-level synchronized declarations on takeAll, add and isEmpty so the queue
synchronization is handled only by the instance monitor; ensure takeAll still
swaps pendingList with a new ArrayList when returning the old list.

In `@core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java`:
- Around line 24-27: The unchecked cast in
ReturnValueCoalesceQueue.executeBatch(List<T> items, AbstractCompletion
batchCompletion) causes a compiler warning; add `@SuppressWarnings`("unchecked")
to that method (or immediately above the cast) to document and suppress the safe
cast to (ReturnValueCompletion<R>) batchCompletion so the warning is removed;
reference the executeBatch override and the cast to ReturnValueCompletion<R> in
your change.

In
`@test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy`:
- Around line 26-27: Remove the unused import
"java.util.concurrent.atomic.AtomicInteger" from
VerifyPrepareDhcpWhenReconnectHostCase.groovy (delete the import line), verify
there are no remaining references to AtomicInteger in that file, and re-run
tests/build to ensure no compile errors.
- Around line 287-292: Remove the redundant repeated assertion block: there are
two consecutive retryInSecs { assert batchCmds.size() == 2 } checks; keep a
single retryInSecs(...) { assert batchCmds.size() == 2 } and delete the
duplicate to avoid unnecessary repetition (look for the retryInSecs closure
referencing batchCmds.size() in VerifyPrepareDhcpWhenReconnectHostCase).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: http://open.zstack.ai:20001/code-reviews/zstack-cloud.yaml (via .coderabbit.yaml)

Review profile: CHILL

Plan: Pro

Run ID: df7e394d-8861-4473-bdb0-825595098a71

📥 Commits

Reviewing files that changed from the base of the PR and between 3a6ad0f and 48e190a.

⛔ Files ignored due to path filters (4)
  • .omc/state/hud-state.json is excluded by !**/*.json
  • .omc/state/last-tool-error.json is excluded by !**/*.json
  • test/.omc/state/hud-state.json is excluded by !**/*.json
  • testlib/pom.xml is excluded by !**/*.xml
📒 Files selected for processing (8)
  • compute/src/main/java/org/zstack/compute/zone/AbstractZone.java
  • core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java
  • core/src/main/java/org/zstack/core/thread/CoalesceQueue.java
  • core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java
  • plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java
  • test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy
  • test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy
  • testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java
💤 Files with no reviewable changes (1)
  • compute/src/main/java/org/zstack/compute/zone/AbstractZone.java

Comment on lines +126 to +136
private static class DhcpApplyRequest {
final String hostUuid;
final List<DhcpInfo> dhcpInfos;
final boolean rebuild;

DhcpApplyRequest(String hostUuid, List<DhcpInfo> dhcpInfos, boolean rebuild) {
this.hostUuid = hostUuid;
this.dhcpInfos = dhcpInfos;
this.rebuild = rebuild;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

DhcpApplyRequest 应对 dhcpInfos 进行防御性复制

DhcpApplyRequest 直接持有传入的 dhcpInfos 列表引用。如果调用方在提交后修改了该列表,可能导致批量执行时数据不一致。建议在构造函数中进行防御性复制。

🛡️ 建议的修复
 private static class DhcpApplyRequest {
     final String hostUuid;
     final List<DhcpInfo> dhcpInfos;
     final boolean rebuild;

     DhcpApplyRequest(String hostUuid, List<DhcpInfo> dhcpInfos, boolean rebuild) {
         this.hostUuid = hostUuid;
-        this.dhcpInfos = dhcpInfos;
+        this.dhcpInfos = new ArrayList<>(dhcpInfos);
         this.rebuild = rebuild;
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java`
around lines 126 - 136, DhcpApplyRequest currently stores the incoming dhcpInfos
reference directly which can be mutated by callers after construction; modify
the DhcpApplyRequest constructor to defensively copy the list (e.g., create a
new ArrayList from the passed dhcpInfos and assign that to this.dhcpInfos) and
consider wrapping it with Collections.unmodifiableList if immutability is
desired so the internal state of the DhcpApplyRequest (class DhcpApplyRequest,
constructor DhcpApplyRequest(String hostUuid, List<DhcpInfo> dhcpInfos, boolean
rebuild)) cannot be changed by external code after creation.

Comment on lines +343 to +431
void testSequentialBatches() {
def firstBatchStart = new CountDownLatch(1)
def firstBatchContinue = new CountDownLatch(1)
def secondBatchStart = new CountDownLatch(1)
def secondBatchContinue = new CountDownLatch(1)
def allComplete = new CountDownLatch(6)
def batches = Collections.synchronizedList(new ArrayList<List<Integer>>())

def queue = new CoalesceQueue<Integer>() {
@Override
protected String getName() {
return "test-sequential"
}

@Override
protected void executeBatch(List<Integer> items, Completion completion) {
batches.add(new ArrayList<>(items))

if (batches.size() == 1) {
firstBatchStart.countDown()
try {
firstBatchContinue.await(5, TimeUnit.SECONDS)
} catch (InterruptedException ignored) {
}
} else if (batches.size() == 2) {
secondBatchStart.countDown()
try {
secondBatchContinue.await(5, TimeUnit.SECONDS)
} catch (InterruptedException ignored) {
}
}

completion.success()
}
}

def signature = "host-seq"
queue.submit(signature, 0, new Completion(null) {
@Override
void success() {
allComplete.countDown()
}

@Override
void fail(ErrorCode errorCode) {
allComplete.countDown()
}
})

assert firstBatchStart.await(5, TimeUnit.SECONDS)

(1..<4).each { idx ->
queue.submit(signature, idx, new Completion(null) {
@Override
void success() {
allComplete.countDown()
}

@Override
void fail(ErrorCode errorCode) {
allComplete.countDown()
}
})
}

assert secondBatchStart.await(5, TimeUnit.SECONDS)

(4..<6).each { idx ->
queue.submit(signature, idx, new Completion(null) {
@Override
void success() {
allComplete.countDown()
}

@Override
void fail(ErrorCode errorCode) {
allComplete.countDown()
}
})
}

firstBatchContinue.countDown()
secondBatchContinue.countDown()
assert allComplete.await(10, TimeUnit.SECONDS)
assert batches.size() == 3
assert batches.get(0) == [0]
assert batches.get(1).containsAll([1, 2, 3])
assert batches.get(2).containsAll([4, 5])
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# 查找测试类的完整路径和相关配置
fd -t f "CoalesceQueueCase.groovy" --exec head -50 {}

# 检查 ChainTask 的 syncLevel 实现
ast-grep --pattern 'protected int getSyncLevel() {
  $$$
}'

Repository: MatheMatrix/zstack

Length of output: 1855


🏁 Script executed:

# 查看CoalesceQueue的实现
fd -t f "CoalesceQueue.java" --exec cat {} \;

# 检查测试文件中是否有中文
grep -n "[^\x00-\x7F]" test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy

Repository: MatheMatrix/zstack

Length of output: 28233


🏁 Script executed:

# Double-check the timing by viewing the exact test flow around line 408
sed -n '392,425p' test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy | cat -n

Repository: MatheMatrix/zstack

Length of output: 1255


testSequentialBatches 中存在严重的时序死锁问题,导致测试在第 408 行超时失败

第 408 行的 assert secondBatchStart.await(5, TimeUnit.SECONDS) 会无限期地等待。原因是:

  1. 第 1 批次在 executeBatch 中阻塞于 firstBatchContinue.await() (第 364 行)
  2. ChainTask 使用 syncLevel=1,同一 signature 的任务串行执行
  3. 第 2 批次的 executeBatch 无法启动,因为第 1 批次的 ChainTask 尚未完成(chain.next() 未被调用)
  4. chain.next() 仅在 handleSuccess 中被调用,而 handleSuccess 需要 executeBatch 返回后才会触发
  5. 但 executeBatch 需要等待第 424 行的 firstBatchContinue.countDown()

第 408 行在第 424 行之前执行,导致断言在 5 秒后超时失败。需要调整测试顺序,在第 408 行之前释放 firstBatchContinue,或重新设计测试逻辑以支持真正的并行批次执行。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy`
around lines 343 - 431, The test deadlocks because the first batch blocks in
executeBatch (firstBatchContinue.await()) preventing the second batch from
starting; fix by releasing firstBatchContinue earlier so the first ChainTask can
complete and allow the second batch to begin: after submitting the (1..<4) items
to queue.submit(signature, idx, ...) call firstBatchContinue.countDown() before
asserting secondBatchStart.await(5, TimeUnit.SECONDS); keep the rest of the
synchronization (secondBatchContinue, allComplete, and assertions on batches)
unchanged.

@MatheMatrix MatheMatrix force-pushed the sync/jin.ma/fix/ZSTAC-83039 branch from 48e190a to 7078a37 Compare March 10, 2026 07:01
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy (3)

245-256: 模拟器钩子中的阻塞等待需要注意超时处理

Line 253 在模拟器回调中使用 releaseFirstBatch.await(10, TimeUnit.SECONDS) 阻塞等待。虽然这是实现测试同步的有效方法,但如果 releaseFirstBatch.countDown() 未被调用,会导致 10 秒的超时延迟。建议确认所有代码路径都能正确释放闩锁,或添加注释说明这种设计意图。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy`
around lines 245 - 256, The simulator callback registered via
env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) currently blocks on
releaseFirstBatch.await(10, TimeUnit.SECONDS) which can introduce a hard 10s
delay if releaseFirstBatch.countDown() is never invoked; change the callback to
handle timeout safely by replacing the blind await with a try/catch that checks
the boolean result of await and logs/continues on timeout (or uses a shorter
timeout), and ensure every test path that uses
firstBatchArrived/releaseFirstBatch explicitly calls
releaseFirstBatch.countDown() in success and teardown paths (or moves the latch
release into a finally) so releaseFirstBatch cannot be left unreleased.

287-292: 重复的 retryInSecs 断言

这两个 retryInSecs 块检查相同的断言(batchCmds.size() == 2)。第一个等待 5 秒,第二个再等待 2 秒,看起来是冗余的。如果是为了确保稳定性,建议合并为一个更长超时的断言,或添加注释说明两次检查的目的。

♻️ 建议简化
-        retryInSecs(5) {
-            assert batchCmds.size() == 2
-        }
-        retryInSecs(2) {
+        // Wait for both batches to complete
+        retryInSecs(5) {
             assert batchCmds.size() == 2
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy`
around lines 287 - 292, The two retryInSecs blocks both assert batchCmds.size()
== 2 and are redundant; either remove the second retryInSecs block or merge them
into a single retryInSecs with a longer timeout (e.g., retryInSecs(7) { assert
batchCmds.size() == 2 }) in the VerifyPrepareDhcpWhenReconnectHostCase test, or
if both checks are intentional add a clarifying comment above the two
retryInSecs explaining why two sequential waits are required; update the block
containing retryInSecs and the assertion on batchCmds.size() accordingly.

279-282: 使用内部 API 进行验证可能较脆弱

thdf.getChainTaskInfo() 是内部 API,如果 ThreadFacade 实现变更,此断言可能会失败。不过,对于测试合并队列的内部行为来说,这是可接受的权衡。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy`
around lines 279 - 282, The test currently depends on the internal
ThreadFacade.getChainTaskInfo API (thdf.getChainTaskInfo(...)) which is fragile;
either replace the assertion with a public API that exposes the coalesce queue
size or, if no suitable public API exists, add a short justification comment in
VerifyPrepareDhcpWhenReconnectHostCase next to the thdf/getChainTaskInfo usage
explaining why the internal API is used and that this is an acceptable trade-off
for verifying the merge queue behavior; ensure the assertion still checks the
pendingTask.size() == 3 and keep the retryInSecs wrapper.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy`:
- Line 198: Remove the dead local variable declaration "testError" in
CoalesceQueueCase (the unused def testError =
org.zstack.core.Platform.operr("test error")); either delete that line or
replace it with actual usage if an error object was intended to be asserted/used
in the test, ensuring no unused local remains.

---

Nitpick comments:
In
`@test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy`:
- Around line 245-256: The simulator callback registered via
env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) currently blocks on
releaseFirstBatch.await(10, TimeUnit.SECONDS) which can introduce a hard 10s
delay if releaseFirstBatch.countDown() is never invoked; change the callback to
handle timeout safely by replacing the blind await with a try/catch that checks
the boolean result of await and logs/continues on timeout (or uses a shorter
timeout), and ensure every test path that uses
firstBatchArrived/releaseFirstBatch explicitly calls
releaseFirstBatch.countDown() in success and teardown paths (or moves the latch
release into a finally) so releaseFirstBatch cannot be left unreleased.
- Around line 287-292: The two retryInSecs blocks both assert batchCmds.size()
== 2 and are redundant; either remove the second retryInSecs block or merge them
into a single retryInSecs with a longer timeout (e.g., retryInSecs(7) { assert
batchCmds.size() == 2 }) in the VerifyPrepareDhcpWhenReconnectHostCase test, or
if both checks are intentional add a clarifying comment above the two
retryInSecs explaining why two sequential waits are required; update the block
containing retryInSecs and the assertion on batchCmds.size() accordingly.
- Around line 279-282: The test currently depends on the internal
ThreadFacade.getChainTaskInfo API (thdf.getChainTaskInfo(...)) which is fragile;
either replace the assertion with a public API that exposes the coalesce queue
size or, if no suitable public API exists, add a short justification comment in
VerifyPrepareDhcpWhenReconnectHostCase next to the thdf/getChainTaskInfo usage
explaining why the internal API is used and that this is an acceptable trade-off
for verifying the merge queue behavior; ensure the assertion still checks the
pendingTask.size() == 3 and keep the retryInSecs wrapper.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: http://open.zstack.ai:20001/code-reviews/zstack-cloud.yaml (via .coderabbit.yaml)

Review profile: CHILL

Plan: Pro

Run ID: 19222c43-1a30-4509-a128-a894246554ff

📥 Commits

Reviewing files that changed from the base of the PR and between 48e190a and 7078a37.

⛔ Files ignored due to path filters (1)
  • testlib/pom.xml is excluded by !**/*.xml
📒 Files selected for processing (8)
  • compute/src/main/java/org/zstack/compute/zone/AbstractZone.java
  • core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java
  • core/src/main/java/org/zstack/core/thread/CoalesceQueue.java
  • core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java
  • plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java
  • test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy
  • test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy
  • testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java
💤 Files with no reviewable changes (1)
  • compute/src/main/java/org/zstack/compute/zone/AbstractZone.java
🚧 Files skipped from review as they are similar to previous changes (2)
  • testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java
  • core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java

def requestCount = 5
def completionLatch = new CountDownLatch(requestCount)
def failureCount = new AtomicInteger(0)
def testError = org.zstack.core.Platform.operr("test error")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

移除未使用的变量 testError

该变量被定义但从未使用,属于死代码。

🧹 建议修复
     def requestCount = 5
     def completionLatch = new CountDownLatch(requestCount)
     def failureCount = new AtomicInteger(0)
-    def testError = org.zstack.core.Platform.operr("test error")
     def completedTokens = Collections.synchronizedSet(new LinkedHashSet<String>())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def testError = org.zstack.core.Platform.operr("test error")
def requestCount = 5
def completionLatch = new CountDownLatch(requestCount)
def failureCount = new AtomicInteger(0)
def completedTokens = Collections.synchronizedSet(new LinkedHashSet<String>())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy`
at line 198, Remove the dead local variable declaration "testError" in
CoalesceQueueCase (the unused def testError =
org.zstack.core.Platform.operr("test error")); either delete that line or
replace it with actual usage if an error object was intended to be asserted/used
in the test, ensuring no unused local remains.

@MatheMatrix MatheMatrix force-pushed the sync/jin.ma/fix/ZSTAC-83039 branch from 7078a37 to 2672df6 Compare March 10, 2026 07:05
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy (1)

343-431: ⚠️ Potential issue | 🔴 Critical

testSequentialBatches 存在时序死锁问题

测试逻辑存在死锁:

  1. 第一批次在 firstBatchContinue.await() (第 364 行) 处阻塞
  2. ChainTask 使用 syncLevel=1,同一签名的任务串行执行
  3. 第二批次的 executeBatch 无法启动,因为第一批次的 chain.next() 尚未调用
  4. 第 408 行 secondBatchStart.await() 会在 5 秒后超时

需要在第 408 行断言之前释放 firstBatchContinue

🔧 建议修复
     (1..<4).each { idx ->
         queue.submit(signature, idx, new Completion(null) {
             // ...
         })
     }

+    firstBatchContinue.countDown()
     assert secondBatchStart.await(5, TimeUnit.SECONDS)

     (4..<6).each { idx ->
         // ...
     }

-    firstBatchContinue.countDown()
     secondBatchContinue.countDown()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy`
around lines 343 - 431, The testSequentialBatches test deadlocks because the
first batch stays blocked on firstBatchContinue.await(), preventing the chain
from advancing to the second batch; to fix, release firstBatchContinue before
waiting for secondBatchStart — i.e., after submitting the (1..<4) tasks to the
CoalesceQueue in testSequentialBatches (refer to the variables
firstBatchContinue, secondBatchStart and method executeBatch in the anonymous
CoalesceQueue), call firstBatchContinue.countDown() so the first batch can
complete and allow the second batch’s executeBatch to start, then assert
secondBatchStart.await(...).
🧹 Nitpick comments (1)
core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java (1)

70-91: SignatureQueue 的 takeAll() 方法存在潜在的线程安全考虑

takeAll() 方法的实现:

synchronized List<PendingRequest> takeAll() {
    List<PendingRequest> toProcess = pendingList;
    pendingList = Collections.synchronizedList(new ArrayList<>());
    return toProcess;
}

虽然方法本身是 synchronized 的,但返回的 toProcess 列表(原 pendingList)在返回后不再受保护。由于 ChainTasksyncLevel=1 确保同一签名的任务串行执行,且调用方在 run() 中立即处理返回的列表,实际使用中是安全的。

如果未来需要更严格的不可变性保证,可以考虑返回 Collections.unmodifiableList(toProcess)new ArrayList<>(toProcess)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java` around
lines 70 - 91, SignatureQueue.takeAll currently swaps out the shared pendingList
and returns the old list instance which remains mutable and accessible outside
the synchronized block; to make this thread-safe/immutable, replace the returned
mutable list with an immutable or defensive copy: in SignatureQueue.takeAll()
(class SignatureQueue, method takeAll) after swapping pendingList, return either
Collections.unmodifiableList(toProcess) or new ArrayList<>(toProcess) so callers
cannot mutate the internal state after it is released.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In
`@test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy`:
- Around line 343-431: The testSequentialBatches test deadlocks because the
first batch stays blocked on firstBatchContinue.await(), preventing the chain
from advancing to the second batch; to fix, release firstBatchContinue before
waiting for secondBatchStart — i.e., after submitting the (1..<4) tasks to the
CoalesceQueue in testSequentialBatches (refer to the variables
firstBatchContinue, secondBatchStart and method executeBatch in the anonymous
CoalesceQueue), call firstBatchContinue.countDown() so the first batch can
complete and allow the second batch’s executeBatch to start, then assert
secondBatchStart.await(...).

---

Nitpick comments:
In `@core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java`:
- Around line 70-91: SignatureQueue.takeAll currently swaps out the shared
pendingList and returns the old list instance which remains mutable and
accessible outside the synchronized block; to make this thread-safe/immutable,
replace the returned mutable list with an immutable or defensive copy: in
SignatureQueue.takeAll() (class SignatureQueue, method takeAll) after swapping
pendingList, return either Collections.unmodifiableList(toProcess) or new
ArrayList<>(toProcess) so callers cannot mutate the internal state after it is
released.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: http://open.zstack.ai:20001/code-reviews/zstack-cloud.yaml (via .coderabbit.yaml)

Review profile: CHILL

Plan: Pro

Run ID: af4192c3-1bbb-43c7-b321-a89c50ca3fcb

📥 Commits

Reviewing files that changed from the base of the PR and between 7078a37 and 2672df6.

⛔ Files ignored due to path filters (1)
  • testlib/pom.xml is excluded by !**/*.xml
📒 Files selected for processing (8)
  • compute/src/main/java/org/zstack/compute/zone/AbstractZone.java
  • core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java
  • core/src/main/java/org/zstack/core/thread/CoalesceQueue.java
  • core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java
  • plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java
  • test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy
  • test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy
  • testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java
💤 Files with no reviewable changes (1)
  • compute/src/main/java/org/zstack/compute/zone/AbstractZone.java

Resolves: ZSTAC-83039

Change-Id: I737574686f6e6b69726e79736c6279616e66706b
@MatheMatrix MatheMatrix force-pushed the sync/jin.ma/fix/ZSTAC-83039 branch from 2672df6 to ddc02b0 Compare March 10, 2026 07:13
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java (1)

70-91: SignatureQueue 中存在冗余的同步机制

pendingList 已经使用 Collections.synchronizedList 包装,而 add()takeAll()isEmpty() 方法又使用了 synchronized 关键字。这种双重同步是冗余的。

建议选择其中一种同步方式:

  1. 使用 synchronized 方法并改用普通 ArrayList
  2. 使用 synchronizedList 并移除方法上的 synchronized

由于 takeAll() 需要原子地交换列表引用,建议保留 synchronized 方法并使用普通 ArrayList

♻️ 建议的修改
 private class SignatureQueue {
     final String syncSignature;
-    List<PendingRequest> pendingList = Collections.synchronizedList(new ArrayList<>());
+    List<PendingRequest> pendingList = new ArrayList<>();

     SignatureQueue(String syncSignature) {
         this.syncSignature = syncSignature;
     }

     synchronized List<PendingRequest> takeAll() {
         List<PendingRequest> toProcess = pendingList;
-        pendingList = Collections.synchronizedList(new ArrayList<>());
+        pendingList = new ArrayList<>();
         return toProcess;
     }

     synchronized void add(PendingRequest request) {
         pendingList.add(request);
     }

     synchronized boolean isEmpty() {
         return pendingList.isEmpty();
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java` around
lines 70 - 91, SignatureQueue currently double-synchronizes pendingList (wrapped
with Collections.synchronizedList) while also marking its methods synchronized;
remove the Collections.synchronizedList wrapper and make pendingList a plain
ArrayList so that synchronized methods (takeAll(), add(), isEmpty()) provide the
necessary atomicity (especially for the swap in takeAll()); keep the
synchronized keywords on those methods and only change the pendingList
initialization to new ArrayList<>() within the SignatureQueue class.
test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy (1)

287-292: 重复的 retryInSecs 断言

连续两次 retryInSecs 检查相同的条件 (batchCmds.size() == 2),第二次检查是冗余的。

♻️ 建议的修改
         retryInSecs(5) {
             assert batchCmds.size() == 2
         }
-        retryInSecs(2) {
-            assert batchCmds.size() == 2
-        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy`
around lines 287 - 292, The two consecutive retryInSecs blocks both assert the
same condition (batchCmds.size() == 2) and the second is redundant; remove the
second retryInSecs { assert batchCmds.size() == 2 } block in
VerifyPrepareDhcpWhenReconnectHostCase.groovy (leave the first retryInSecs(5)
assertion) or, if a longer wait is required, replace the redundant block by
increasing the timeout on the single retryInSecs call (referencing retryInSecs
and the batchCmds.size() assertion).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java`:
- Around line 176-183: The cleanup method has a race where
signatureQueues.computeIfPresent can remove a SignatureQueue after another
thread's doSubmit has added to it; modify cleanup(String syncSignature) to
perform the empty-check atomically with removal by either adding a new atomic
operation on SignatureQueue (e.g., implement a takeAllAndCheckEmpty() or
isEmptyAfterTake() that does takeAll and returns whether the queue is empty
under the same lock) and call that from cleanup, or synchronize cleanup and
doSubmit on the same SignatureQueue instance so computeIfPresent only returns
null when the queue truly remained empty during the guarded operation; update
cleanup to use the new SignatureQueue atomic method (or explicit
synchronization) instead of calling queue.isEmpty() outside the queue's lock to
prevent dropping newly submitted requests.

In
`@test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy`:
- Around line 279-282: The test is depending on an internal signature string
"coalesce-queue-flat-dhcp-apply-%s" when calling ThreadFacade.getChainTaskInfo,
which couples the spec to AbstractCoalesceQueue implementation; change the test
to avoid hardcoding that format by either (A) adding a public constant or helper
method in AbstractCoalesceQueue (e.g., getCoalesceSignature(hostUuid) or
COALESCE_SIGNATURE_FORMAT) and use that from the test, or (B) replace the direct
queue-inspection assertion on ThreadFacade.getChainTaskInfo(...) with a
behavioral assertion that verifies batching occurred (for example assert the
expected number of apply operations were executed or that DHCP apply was invoked
N times for vms[0].hostUuid via the provider/mocked handler), referencing
ThreadFacade, getChainTaskInfo, AbstractCoalesceQueue and vms[0].hostUuid to
locate the code to change.

---

Nitpick comments:
In `@core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java`:
- Around line 70-91: SignatureQueue currently double-synchronizes pendingList
(wrapped with Collections.synchronizedList) while also marking its methods
synchronized; remove the Collections.synchronizedList wrapper and make
pendingList a plain ArrayList so that synchronized methods (takeAll(), add(),
isEmpty()) provide the necessary atomicity (especially for the swap in
takeAll()); keep the synchronized keywords on those methods and only change the
pendingList initialization to new ArrayList<>() within the SignatureQueue class.

In
`@test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy`:
- Around line 287-292: The two consecutive retryInSecs blocks both assert the
same condition (batchCmds.size() == 2) and the second is redundant; remove the
second retryInSecs { assert batchCmds.size() == 2 } block in
VerifyPrepareDhcpWhenReconnectHostCase.groovy (leave the first retryInSecs(5)
assertion) or, if a longer wait is required, replace the redundant block by
increasing the timeout on the single retryInSecs call (referencing retryInSecs
and the batchCmds.size() assertion).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: http://open.zstack.ai:20001/code-reviews/zstack-cloud.yaml (via .coderabbit.yaml)

Review profile: CHILL

Plan: Pro

Run ID: a9a35dc1-f49c-49a8-9eea-fac0951ac082

📥 Commits

Reviewing files that changed from the base of the PR and between 2672df6 and ddc02b0.

⛔ Files ignored due to path filters (1)
  • testlib/pom.xml is excluded by !**/*.xml
📒 Files selected for processing (8)
  • compute/src/main/java/org/zstack/compute/zone/AbstractZone.java
  • core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java
  • core/src/main/java/org/zstack/core/thread/CoalesceQueue.java
  • core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java
  • plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java
  • test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy
  • test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy
  • testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java
💤 Files with no reviewable changes (1)
  • compute/src/main/java/org/zstack/compute/zone/AbstractZone.java
🚧 Files skipped from review as they are similar to previous changes (1)
  • core/src/main/java/org/zstack/core/thread/CoalesceQueue.java

Comment on lines +176 to +183
private void cleanup(String syncSignature) {
signatureQueues.computeIfPresent(syncSignature, (k, queue) -> {
if (queue.isEmpty()) {
return null;
}
return queue;
});
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# 检查是否有其他地方也存在类似的队列清理模式
rg -n "computeIfPresent.*isEmpty" --type java

Repository: MatheMatrix/zstack

Length of output: 44


🏁 Script executed:

# First, let's find and read the AbstractCoalesceQueue.java file
fd "AbstractCoalesceQueue.java" -type f

Repository: MatheMatrix/zstack

Length of output: 232


🏁 Script executed:

# Get an overview of the file structure and key methods
wc -l core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java

Repository: MatheMatrix/zstack

Length of output: 134


🏁 Script executed:

# Read the file to understand the class structure, focusing on cleanup, doSubmit, and synchronization
cat -n core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java

Repository: MatheMatrix/zstack

Length of output: 8011


🏁 Script executed:

# Let's check how ChainTask and queue lifecycle work - examine the queue usage more carefully
rg -n "queue\." core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java -A 2 -B 2

Repository: MatheMatrix/zstack

Length of output: 757


🏁 Script executed:

# Check if there are tests or other usages of AbstractCoalesceQueue that might show the issue
fd "Test.*CoalesceQueue\|.*CoalesceQueue.*Test" -type f

Repository: MatheMatrix/zstack

Length of output: 232


🏁 Script executed:

# Let's trace through the execution flow to understand when cleanup is called and how queue references work
rg -n "cleanup" core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java -B 3 -A 1

Repository: MatheMatrix/zstack

Length of output: 614


🏁 Script executed:

# Check if there are implementations of AbstractCoalesceQueue to see real usage patterns
fd -e java -type f | head -20 | xargs grep -l "extends AbstractCoalesceQueue" 2>/dev/null | head -5

Repository: MatheMatrix/zstack

Length of output: 232


🏁 Script executed:

# Let's understand the synchronization flow better - check if cleanup is called before or after queue.takeAll()
cat -n core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java | sed -n '136,174p'

Repository: MatheMatrix/zstack

Length of output: 1794


🏁 Script executed:

# Search for any synchronization blocks or additional locking mechanisms in the class
rg -n "synchronized|lock" core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java

Repository: MatheMatrix/zstack

Length of output: 394


🏁 Script executed:

# Let's verify the exact timing by checking when cleanup is called relative to queue processing
cat -n core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java | sed -n '102,122p'

Repository: MatheMatrix/zstack

Length of output: 1171


🏁 Script executed:

# Let me trace through the actual scenario to understand request loss
# Check if there are any wait conditions or other protections I'm missing
rg -n "ChainTask\|getSyncSignature\|chainSubmit" core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java

Repository: MatheMatrix/zstack

Length of output: 44


🏁 Script executed:

# Check the ChainTask interface to understand synchronization guarantees
fd "ChainTask.java" -type f

Repository: MatheMatrix/zstack

Length of output: 232


🏁 Script executed:

# Let's check if there are other implementations or tests that show if this is an actual problem
find core -name "*.java" -type f -exec grep -l "extends AbstractCoalesceQueue" {} \; 2>/dev/null | head -3

Repository: MatheMatrix/zstack

Length of output: 194


🏁 Script executed:

# Look for any potential issues in how items could be lost or duplicated
rg -n "queue\|cleanup" core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java | head -20

Repository: MatheMatrix/zstack

Length of output: 44


🏁 Script executed:

# Let's examine one implementation to see how this is used in practice
cat core/src/main/java/org/zstack/core/thread/CoalesceQueue.java | head -50

Repository: MatheMatrix/zstack

Length of output: 1729


🏁 Script executed:

# Let's understand better - when new items arrive during/after batch processing
# Check the full lifecycle in one implementation
cat core/src/main/java/org/zstack/core/thread/CoalesceQueue.java

Repository: MatheMatrix/zstack

Length of output: 1999


cleanup() 中存在竞态条件导致请求丢失

computeIfPresent 的 lambda 中,queue.isEmpty() 和返回 null 之间存在竞态窗口。另一个线程可能在 doSubmit 中获取到相同的队列对象并调用 queue.add(),之后该队列被 cleanup() 从 signatureQueues 中移除,导致新添加的请求永久丢失。

具体场景:

  1. ChainTask A 执行 queue.takeAll() 清空队列并处理请求
  2. ChainTask A 调用 cleanup(syncSignature),lambda 执行 queue.isEmpty() 返回 true
  3. 同时,线程 B 调用 doSubmit(同一 syncSignature, request),通过 computeIfAbsent() 获取已存在的队列对象,并执行 queue.add()
  4. cleanup() 的 lambda 返回 null,将队列从 signatureQueues 中移除
  5. 请求被添加到了一个不再被 signatureQueues 引用的队列中,永远无法被处理

建议通过以下方式修复:在 SignatureQueue 中提供原子性的 "takeAllAndCheckEmpty" 操作,确保 isEmpty() 检查和队列移除在同步块内进行;或在 cleanup() 中加入显式的同步机制,协调 computeIfPresent 的执行与并发的 doSubmit() 调用。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java` around
lines 176 - 183, The cleanup method has a race where
signatureQueues.computeIfPresent can remove a SignatureQueue after another
thread's doSubmit has added to it; modify cleanup(String syncSignature) to
perform the empty-check atomically with removal by either adding a new atomic
operation on SignatureQueue (e.g., implement a takeAllAndCheckEmpty() or
isEmptyAfterTake() that does takeAll and returns whether the queue is empty
under the same lock) and call that from cleanup, or synchronize cleanup and
doSubmit on the same SignatureQueue instance so computeIfPresent only returns
null when the queue truly remained empty during the guarded operation; update
cleanup to use the new SignatureQueue atomic method (or explicit
synchronization) instead of calling queue.isEmpty() outside the queue's lock to
prevent dropping newly submitted requests.

Comment on lines +279 to +282
ThreadFacade thdf = bean(ThreadFacade.class)
retryInSecs {
assert thdf.getChainTaskInfo(String.format("coalesce-queue-flat-dhcp-apply-%s", vms[0].hostUuid)).pendingTask.size() == 3
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

测试依赖内部实现细节

此处直接使用 coalesce-queue-flat-dhcp-apply-%s 格式的签名字符串来获取链式任务信息。如果 AbstractCoalesceQueue 中的签名格式发生变化,此测试将会失败。

建议:

  1. AbstractCoalesceQueue 中暴露一个常量或方法来构造签名
  2. 或者通过其他方式验证批处理行为,而不是直接检查内部队列状态
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy`
around lines 279 - 282, The test is depending on an internal signature string
"coalesce-queue-flat-dhcp-apply-%s" when calling ThreadFacade.getChainTaskInfo,
which couples the spec to AbstractCoalesceQueue implementation; change the test
to avoid hardcoding that format by either (A) adding a public constant or helper
method in AbstractCoalesceQueue (e.g., getCoalesceSignature(hostUuid) or
COALESCE_SIGNATURE_FORMAT) and use that from the test, or (B) replace the direct
queue-inspection assertion on ThreadFacade.getChainTaskInfo(...) with a
behavioral assertion that verifies batching occurred (for example assert the
expected number of apply operations were executed or that DHCP apply was invoked
N times for vms[0].hostUuid via the provider/mocked handler), referencing
ThreadFacade, getChainTaskInfo, AbstractCoalesceQueue and vms[0].hostUuid to
locate the code to change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants