Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.zstack.header.zone.ZoneStateEvent;

abstract class AbstractZone implements Zone {
private static DatabaseFacade dbf = Platform.getComponentLoader().getComponent(DatabaseFacade.class);
private final static StateMachine<ZoneState, ZoneStateEvent> stateMachine;

static {
Expand Down
189 changes: 189 additions & 0 deletions core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package org.zstack.core.thread;

import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.zstack.header.core.AbstractCompletion;
import org.zstack.header.core.Completion;
import org.zstack.header.core.ReturnValueCompletion;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* Base implementation for coalesce queues.
*
* @param <T> Request Item Type
* @param <R> Batch Execution Result Type
* @param <V> Single Request Result Type
*/
@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE)
public abstract class AbstractCoalesceQueue<T, R, V> {
private static final CLogger logger = Utils.getLogger(AbstractCoalesceQueue.class);

@Autowired
private ThreadFacade thdf;

private final ConcurrentHashMap<String, SignatureQueue> signatureQueues = new ConcurrentHashMap<>();

protected class PendingRequest {
final T item;
final AbstractCompletion completion;

PendingRequest(T item, AbstractCompletion completion) {
this.item = item;
this.completion = completion;
}

@SuppressWarnings("unchecked")
void notifySuccess(V result) {
if (completion == null) {
return;
}

if (completion instanceof ReturnValueCompletion) {
((ReturnValueCompletion<V>) completion).success(result);
} else if (completion instanceof Completion) {
((Completion) completion).success();
}
}

void notifyFailure(ErrorCode errorCode) {
if (completion == null) {
return;
}

if (completion instanceof ReturnValueCompletion) {
((ReturnValueCompletion<V>) completion).fail(errorCode);
} else if (completion instanceof Completion) {
((Completion) completion).fail(errorCode);
}
}
}

private class SignatureQueue {
final String syncSignature;
List<PendingRequest> pendingList = Collections.synchronizedList(new ArrayList<>());

SignatureQueue(String syncSignature) {
this.syncSignature = syncSignature;
}

synchronized List<PendingRequest> takeAll() {
List<PendingRequest> toProcess = pendingList;
pendingList = Collections.synchronizedList(new ArrayList<>());
return toProcess;
}

synchronized void add(PendingRequest request) {
pendingList.add(request);
}

synchronized boolean isEmpty() {
return pendingList.isEmpty();
}
}

protected abstract String getName();

// Changed to take AbstractCompletion, subclasses cast it to specific type
protected abstract void executeBatch(List<T> items, AbstractCompletion completion);

protected abstract AbstractCompletion createBatchCompletion(String syncSignature, List<PendingRequest> requests, SyncTaskChain chain);

protected abstract V calculateResult(T item, R batchResult);

protected final void handleSuccess(String syncSignature, List<PendingRequest> requests, R batchResult, SyncTaskChain chain) {
for (PendingRequest req : requests) {
try {
V singleResult = calculateResult(req.item, batchResult);
req.notifySuccess(singleResult);
} catch (Throwable t) {
logger.warn(String.format("[%s] failed to calculate result for item %s", getName(), req.item), t);
req.notifyFailure(org.zstack.core.Platform.operr("failed to calculate result: %s", t.getMessage()));
}
}
cleanup(syncSignature);
chain.next();
}

protected final void handleFailure(String syncSignature, List<PendingRequest> requests, ErrorCode errorCode, SyncTaskChain chain) {
for (PendingRequest req : requests) {
req.notifyFailure(errorCode);
}
cleanup(syncSignature);
chain.next();
}

void setThreadFacade(ThreadFacade thdf) {
this.thdf = thdf;
}

protected final void submitRequest(String syncSignature, T item, AbstractCompletion completion) {
doSubmit(syncSignature, new PendingRequest(item, completion));
}

private void doSubmit(String syncSignature, PendingRequest request) {
SignatureQueue queue = signatureQueues.computeIfAbsent(syncSignature, SignatureQueue::new);
queue.add(request);

thdf.chainSubmit(new ChainTask(null) {
@Override
public String getSyncSignature() {
return String.format("coalesce-queue-%s-%s", AbstractCoalesceQueue.this.getName(), syncSignature);
}

@Override
public void run(SyncTaskChain chain) {
List<PendingRequest> requests = queue.takeAll();

if (requests.isEmpty()) {
chain.next();
return;
}

String name = getName();
logger.debug(String.format("[%s] coalescing %d requests for signature[%s]",
name, requests.size(), syncSignature));


// Create the specific completion type (Completion or ReturnValueCompletion)
AbstractCompletion batchCompletion = createBatchCompletion(syncSignature, requests, chain);

// Execute batch with the direct completion object
List<T> items = requests.stream().map(req -> req.item).collect(Collectors.toList());
executeBatch(items, batchCompletion);
}

@Override
public String getName() {
return String.format("%s-coalesced-batch-%s", AbstractCoalesceQueue.this.getName(), syncSignature);
}

@Override
protected int getSyncLevel() {
return 1;
}
});
}

private void cleanup(String syncSignature) {
signatureQueues.computeIfPresent(syncSignature, (k, queue) -> {
if (queue.isEmpty()) {
return null;
}
return queue;
});
}
Comment on lines +176 to +183
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# 检查是否有其他地方也存在类似的队列清理模式
rg -n "computeIfPresent.*isEmpty" --type java

Repository: MatheMatrix/zstack

Length of output: 44


🏁 Script executed:

# First, let's find and read the AbstractCoalesceQueue.java file
fd "AbstractCoalesceQueue.java" -type f

Repository: MatheMatrix/zstack

Length of output: 232


🏁 Script executed:

# Get an overview of the file structure and key methods
wc -l core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java

Repository: MatheMatrix/zstack

Length of output: 134


🏁 Script executed:

# Read the file to understand the class structure, focusing on cleanup, doSubmit, and synchronization
cat -n core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java

Repository: MatheMatrix/zstack

Length of output: 8011


🏁 Script executed:

# Let's check how ChainTask and queue lifecycle work - examine the queue usage more carefully
rg -n "queue\." core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java -A 2 -B 2

Repository: MatheMatrix/zstack

Length of output: 757


🏁 Script executed:

# Check if there are tests or other usages of AbstractCoalesceQueue that might show the issue
fd "Test.*CoalesceQueue\|.*CoalesceQueue.*Test" -type f

Repository: MatheMatrix/zstack

Length of output: 232


🏁 Script executed:

# Let's trace through the execution flow to understand when cleanup is called and how queue references work
rg -n "cleanup" core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java -B 3 -A 1

Repository: MatheMatrix/zstack

Length of output: 614


🏁 Script executed:

# Check if there are implementations of AbstractCoalesceQueue to see real usage patterns
fd -e java -type f | head -20 | xargs grep -l "extends AbstractCoalesceQueue" 2>/dev/null | head -5

Repository: MatheMatrix/zstack

Length of output: 232


🏁 Script executed:

# Let's understand the synchronization flow better - check if cleanup is called before or after queue.takeAll()
cat -n core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java | sed -n '136,174p'

Repository: MatheMatrix/zstack

Length of output: 1794


🏁 Script executed:

# Search for any synchronization blocks or additional locking mechanisms in the class
rg -n "synchronized|lock" core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java

Repository: MatheMatrix/zstack

Length of output: 394


🏁 Script executed:

# Let's verify the exact timing by checking when cleanup is called relative to queue processing
cat -n core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java | sed -n '102,122p'

Repository: MatheMatrix/zstack

Length of output: 1171


🏁 Script executed:

# Let me trace through the actual scenario to understand request loss
# Check if there are any wait conditions or other protections I'm missing
rg -n "ChainTask\|getSyncSignature\|chainSubmit" core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java

Repository: MatheMatrix/zstack

Length of output: 44


🏁 Script executed:

# Check the ChainTask interface to understand synchronization guarantees
fd "ChainTask.java" -type f

Repository: MatheMatrix/zstack

Length of output: 232


🏁 Script executed:

# Let's check if there are other implementations or tests that show if this is an actual problem
find core -name "*.java" -type f -exec grep -l "extends AbstractCoalesceQueue" {} \; 2>/dev/null | head -3

Repository: MatheMatrix/zstack

Length of output: 194


🏁 Script executed:

# Look for any potential issues in how items could be lost or duplicated
rg -n "queue\|cleanup" core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java | head -20

Repository: MatheMatrix/zstack

Length of output: 44


🏁 Script executed:

# Let's examine one implementation to see how this is used in practice
cat core/src/main/java/org/zstack/core/thread/CoalesceQueue.java | head -50

Repository: MatheMatrix/zstack

Length of output: 1729


🏁 Script executed:

# Let's understand better - when new items arrive during/after batch processing
# Check the full lifecycle in one implementation
cat core/src/main/java/org/zstack/core/thread/CoalesceQueue.java

Repository: MatheMatrix/zstack

Length of output: 1999


cleanup() 中存在竞态条件导致请求丢失

computeIfPresent 的 lambda 中,queue.isEmpty() 和返回 null 之间存在竞态窗口。另一个线程可能在 doSubmit 中获取到相同的队列对象并调用 queue.add(),之后该队列被 cleanup() 从 signatureQueues 中移除,导致新添加的请求永久丢失。

具体场景:

  1. ChainTask A 执行 queue.takeAll() 清空队列并处理请求
  2. ChainTask A 调用 cleanup(syncSignature),lambda 执行 queue.isEmpty() 返回 true
  3. 同时,线程 B 调用 doSubmit(同一 syncSignature, request),通过 computeIfAbsent() 获取已存在的队列对象,并执行 queue.add()
  4. cleanup() 的 lambda 返回 null,将队列从 signatureQueues 中移除
  5. 请求被添加到了一个不再被 signatureQueues 引用的队列中,永远无法被处理

建议通过以下方式修复:在 SignatureQueue 中提供原子性的 "takeAllAndCheckEmpty" 操作,确保 isEmpty() 检查和队列移除在同步块内进行;或在 cleanup() 中加入显式的同步机制,协调 computeIfPresent 的执行与并发的 doSubmit() 调用。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java` around
lines 176 - 183, The cleanup method has a race where
signatureQueues.computeIfPresent can remove a SignatureQueue after another
thread's doSubmit has added to it; modify cleanup(String syncSignature) to
perform the empty-check atomically with removal by either adding a new atomic
operation on SignatureQueue (e.g., implement a takeAllAndCheckEmpty() or
isEmptyAfterTake() that does takeAll and returns whether the queue is empty
under the same lock) and call that from cleanup, or synchronize cleanup and
doSubmit on the same SignatureQueue instance so computeIfPresent only returns
null when the queue truly remained empty during the guarded operation; update
cleanup to use the new SignatureQueue atomic method (or explicit
synchronization) instead of calling queue.isEmpty() outside the queue's lock to
prevent dropping newly submitted requests.


// For testing
int getActiveQueueCount() {
return signatureQueues.size();
}
}
61 changes: 61 additions & 0 deletions core/src/main/java/org/zstack/core/thread/CoalesceQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.zstack.core.thread;

import org.zstack.header.core.AbstractCompletion;
import org.zstack.header.core.Completion;
import org.zstack.header.errorcode.ErrorCode;

import java.util.List;

/**
* A coalesce queue for requests that do NOT expect a return value.
*
* @param <T> Request Item Type
*/
public abstract class CoalesceQueue<T> extends AbstractCoalesceQueue<T, Void, Void> {

/**
* Submit a request.
*
* @param syncSignature the sync signature; requests with the same signature will be coalesced
* @param item the request item
* @param completion the completion callback
*/
public void submit(String syncSignature, T item, Completion completion) {
submitRequest(syncSignature, item, completion);
}

/**
* Executes the batched requests.
* <p>
* Subclasses must implement this method to process the coalesced items.
*
* @param items the list of coalesced request items
* @param completion the completion callback for the batch execution
*/
protected abstract void executeBatch(List<T> items, Completion completion);

@Override
protected final void executeBatch(List<T> items, AbstractCompletion batchCompletion) {
executeBatch(items, (Completion) batchCompletion);
}

@Override
protected final AbstractCompletion createBatchCompletion(String syncSignature, List<PendingRequest> requests, SyncTaskChain chain) {
return new Completion(chain) {
@Override
public void success() {
handleSuccess(syncSignature, requests, null, chain);
}

@Override
public void fail(ErrorCode errorCode) {
handleFailure(syncSignature, requests, errorCode, chain);
}
};
}

@Override
protected final Void calculateResult(T item, Void batchResult) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.zstack.core.thread;

import org.zstack.header.core.AbstractCompletion;
import org.zstack.header.core.ReturnValueCompletion;
import org.zstack.header.errorcode.ErrorCode;

import java.util.List;

/**
* A coalesce queue for requests that expect a return value.
*
* @param <T> Request Item Type
* @param <R> Batch Execution Result Type
* @param <V> Single Request Result Type
*/
public abstract class ReturnValueCoalesceQueue<T, R, V> extends AbstractCoalesceQueue<T, R, V> {

public void submit(String syncSignature, T item, ReturnValueCompletion<V> completion) {
submitRequest(syncSignature, item, completion);
}

protected abstract void executeBatch(List<T> items, ReturnValueCompletion<R> completion);

@Override
protected final void executeBatch(List<T> items, AbstractCompletion batchCompletion) {
executeBatch(items, (ReturnValueCompletion<R>) batchCompletion);
}

@Override
protected final AbstractCompletion createBatchCompletion(String syncSignature, List<PendingRequest> requests, SyncTaskChain chain) {
return new ReturnValueCompletion<R>(null) {
@Override
public void success(R batchResult) {
handleSuccess(syncSignature, requests, batchResult, chain);
}

@Override
public void fail(ErrorCode errorCode) {
handleFailure(syncSignature, requests, errorCode, chain);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
import org.zstack.core.db.SQL;
import org.zstack.core.defer.Defer;
import org.zstack.core.defer.Deferred;
import org.zstack.core.thread.SyncTask;
import org.zstack.core.thread.ThreadFacade;
import org.zstack.core.thread.*;
import org.zstack.core.upgrade.GrayVersion;
import org.zstack.core.workflow.SimpleFlowChain;
import org.zstack.header.AbstractService;
Expand Down Expand Up @@ -121,6 +120,54 @@ public class FlatDhcpBackend extends AbstractService implements NetworkServiceDh

private Map<String, L3NetworkGetIpStatisticExtensionPoint> getIpStatisticExts = new HashMap<>();

/**
* Request wrapper for DHCP apply coalescing.
*/
private static class DhcpApplyRequest {
final String hostUuid;
final List<DhcpInfo> dhcpInfos;
final boolean rebuild;

DhcpApplyRequest(String hostUuid, List<DhcpInfo> dhcpInfos, boolean rebuild) {
this.hostUuid = hostUuid;
this.dhcpInfos = dhcpInfos;
this.rebuild = rebuild;
}
}
Comment on lines +126 to +136
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

DhcpApplyRequest 应对 dhcpInfos 进行防御性复制

DhcpApplyRequest 直接持有传入的 dhcpInfos 列表引用。如果调用方在提交后修改了该列表,可能导致批量执行时数据不一致。建议在构造函数中进行防御性复制。

🛡️ 建议的修复
 private static class DhcpApplyRequest {
     final String hostUuid;
     final List<DhcpInfo> dhcpInfos;
     final boolean rebuild;

     DhcpApplyRequest(String hostUuid, List<DhcpInfo> dhcpInfos, boolean rebuild) {
         this.hostUuid = hostUuid;
-        this.dhcpInfos = dhcpInfos;
+        this.dhcpInfos = new ArrayList<>(dhcpInfos);
         this.rebuild = rebuild;
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java`
around lines 126 - 136, DhcpApplyRequest currently stores the incoming dhcpInfos
reference directly which can be mutated by callers after construction; modify
the DhcpApplyRequest constructor to defensively copy the list (e.g., create a
new ArrayList from the passed dhcpInfos and assign that to this.dhcpInfos) and
consider wrapping it with Collections.unmodifiableList if immutability is
desired so the internal state of the DhcpApplyRequest (class DhcpApplyRequest,
constructor DhcpApplyRequest(String hostUuid, List<DhcpInfo> dhcpInfos, boolean
rebuild)) cannot be changed by external code after creation.


private class DhcpApplyQueue extends CoalesceQueue<DhcpApplyRequest> {
@Override
protected String getName() {
return "flat-dhcp-apply";
}

@Override
protected void executeBatch(List<DhcpApplyRequest> requests, Completion completion) {
if (requests.isEmpty()) {
completion.success();
return;
}

// All requests in the same batch have the same hostUuid
String hostUuid = requests.get(0).hostUuid;

// Merge all DhcpInfo from all requests, grouped by L3 network
// TODO: unify DHCP apply logic and switch to merged/batch flow everywhere
boolean anyRebuild = false;
List<DhcpInfo> mergedInfos = new ArrayList<>();
for (DhcpApplyRequest req : requests) {
anyRebuild = anyRebuild || req.rebuild;
mergedInfos.addAll(req.dhcpInfos);
}

logger.debug(String.format("Coalesced %d DHCP apply requests for host[uuid:%s]", requests.size(), hostUuid));

applyDhcpToHosts(mergedInfos, hostUuid, anyRebuild, completion);
}
}

private final DhcpApplyQueue dhcpApplyCoalesceQueue = new DhcpApplyQueue();

public static final String APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/apply";
public static final String BATCH_APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/batchApply";
public static final String PREPARE_DHCP_PATH = "/flatnetworkprovider/dhcp/prepare";
Expand Down Expand Up @@ -2074,7 +2121,10 @@ public void applyDhcpService(List<DhcpStruct> dhcpStructList, VmInstanceSpec spe
return;
}

applyDhcpToHosts(toDhcpInfo(dhcpStructList), spec.getDestHost().getUuid(), false, completion);
String hostUuid = spec.getDestHost().getUuid();
DhcpApplyRequest request = new DhcpApplyRequest(hostUuid, toDhcpInfo(dhcpStructList), false);
// Use coalesce queue: requests to the same host will be merged into a single batch
dhcpApplyCoalesceQueue.submit(hostUuid, request, completion);
}

private void releaseDhcpService(List<DhcpInfo> info, final String vmUuid, final String hostUuid, final NoErrorCompletion completion) {
Expand Down
Loading