Skip to content

Commit 48e190a

Browse files
committed
<feature>[thread]: support coalesce queue for batch dhcp
Resolves: TIC-4930 Change-Id: I737574686f6e6b69726e79736c6279616e66706b
1 parent 3a6ad0f commit 48e190a

12 files changed

Lines changed: 1041 additions & 4 deletions

File tree

.omc/state/hud-state.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"timestamp": "2026-03-09T03:35:04.494Z",
3+
"backgroundTasks": [],
4+
"sessionStartTimestamp": "2026-03-06T09:49:28.602Z",
5+
"sessionId": "9c273cff63511584"
6+
}

.omc/state/last-tool-error.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"tool_name": "Bash",
3+
"tool_input_preview": "{\"command\":\"git cherry-pick --abort && git cherry-pick 37efe64e1740a1a29f9d0c12aaa34f3bba3871d2 2>&1\",\"description\":\"Abort old cherry-pick and cherry-pick only the CoalesceQueue commit\"}",
4+
"error": "Exit code 1\nAuto-merging plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java\nCONFLICT (content): Merge conflict in plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java\nCONFLICT (file location): test/src/test/groovy/org/zstack/test/integration/core/chaintask/CoalesceQueueCase.groovy added in 37efe64e17 (<feature>[thread]: support coalesce queue for batch dhcp) inside a directory that was renamed in HEAD, suggesting...",
5+
"timestamp": "2026-03-10T06:19:08.294Z",
6+
"retry_count": 1
7+
}

compute/src/main/java/org/zstack/compute/zone/AbstractZone.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import org.zstack.header.zone.ZoneStateEvent;
99

1010
abstract class AbstractZone implements Zone {
11-
private static DatabaseFacade dbf = Platform.getComponentLoader().getComponent(DatabaseFacade.class);
1211
private final static StateMachine<ZoneState, ZoneStateEvent> stateMachine;
1312

1413
static {
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package org.zstack.core.thread;
2+
3+
import org.springframework.beans.factory.annotation.Autowire;
4+
import org.springframework.beans.factory.annotation.Autowired;
5+
import org.springframework.beans.factory.annotation.Configurable;
6+
import org.zstack.header.core.AbstractCompletion;
7+
import org.zstack.header.core.Completion;
8+
import org.zstack.header.core.ReturnValueCompletion;
9+
import org.zstack.header.errorcode.ErrorCode;
10+
import org.zstack.utils.Utils;
11+
import org.zstack.utils.logging.CLogger;
12+
13+
import java.util.ArrayList;
14+
import java.util.Collections;
15+
import java.util.List;
16+
import java.util.concurrent.ConcurrentHashMap;
17+
import java.util.stream.Collectors;
18+
19+
/**
20+
* Base implementation for coalesce queues.
21+
*
22+
* @param <T> Request Item Type
23+
* @param <R> Batch Execution Result Type
24+
* @param <V> Single Request Result Type
25+
*/
26+
@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE)
27+
public abstract class AbstractCoalesceQueue<T, R, V> {
28+
private static final CLogger logger = Utils.getLogger(AbstractCoalesceQueue.class);
29+
30+
@Autowired
31+
private ThreadFacade thdf;
32+
33+
private final ConcurrentHashMap<String, SignatureQueue> signatureQueues = new ConcurrentHashMap<>();
34+
35+
protected class PendingRequest {
36+
final T item;
37+
final AbstractCompletion completion;
38+
39+
PendingRequest(T item, AbstractCompletion completion) {
40+
this.item = item;
41+
this.completion = completion;
42+
}
43+
44+
@SuppressWarnings("unchecked")
45+
void notifySuccess(V result) {
46+
if (completion == null) {
47+
return;
48+
}
49+
50+
if (completion instanceof ReturnValueCompletion) {
51+
((ReturnValueCompletion<V>) completion).success(result);
52+
} else if (completion instanceof Completion) {
53+
((Completion) completion).success();
54+
}
55+
}
56+
57+
void notifyFailure(ErrorCode errorCode) {
58+
if (completion == null) {
59+
return;
60+
}
61+
62+
if (completion instanceof ReturnValueCompletion) {
63+
((ReturnValueCompletion<V>) completion).fail(errorCode);
64+
} else if (completion instanceof Completion) {
65+
((Completion) completion).fail(errorCode);
66+
}
67+
}
68+
}
69+
70+
private class SignatureQueue {
71+
final String syncSignature;
72+
List<PendingRequest> pendingList = Collections.synchronizedList(new ArrayList<>());
73+
74+
SignatureQueue(String syncSignature) {
75+
this.syncSignature = syncSignature;
76+
}
77+
78+
synchronized List<PendingRequest> takeAll() {
79+
List<PendingRequest> toProcess = pendingList;
80+
pendingList = Collections.synchronizedList(new ArrayList<>());
81+
return toProcess;
82+
}
83+
84+
synchronized void add(PendingRequest request) {
85+
pendingList.add(request);
86+
}
87+
88+
synchronized boolean isEmpty() {
89+
return pendingList.isEmpty();
90+
}
91+
}
92+
93+
protected abstract String getName();
94+
95+
// Changed to take AbstractCompletion, subclasses cast it to specific type
96+
protected abstract void executeBatch(List<T> items, AbstractCompletion completion);
97+
98+
protected abstract AbstractCompletion createBatchCompletion(String syncSignature, List<PendingRequest> requests, SyncTaskChain chain);
99+
100+
protected abstract V calculateResult(T item, R batchResult);
101+
102+
protected final void handleSuccess(String syncSignature, List<PendingRequest> requests, R batchResult, SyncTaskChain chain) {
103+
for (PendingRequest req : requests) {
104+
try {
105+
V singleResult = calculateResult(req.item, batchResult);
106+
req.notifySuccess(singleResult);
107+
} catch (Throwable t) {
108+
logger.warn(String.format("[%s] failed to calculate result for item %s", getName(), req.item), t);
109+
req.notifyFailure(org.zstack.core.Platform.operr("failed to calculate result: %s", t.getMessage()));
110+
}
111+
}
112+
cleanup(syncSignature);
113+
chain.next();
114+
}
115+
116+
protected final void handleFailure(String syncSignature, List<PendingRequest> requests, ErrorCode errorCode, SyncTaskChain chain) {
117+
for (PendingRequest req : requests) {
118+
req.notifyFailure(errorCode);
119+
}
120+
cleanup(syncSignature);
121+
chain.next();
122+
}
123+
124+
void setThreadFacade(ThreadFacade thdf) {
125+
this.thdf = thdf;
126+
}
127+
128+
protected final void submitRequest(String syncSignature, T item, AbstractCompletion completion) {
129+
doSubmit(syncSignature, new PendingRequest(item, completion));
130+
}
131+
132+
private void doSubmit(String syncSignature, PendingRequest request) {
133+
SignatureQueue queue = signatureQueues.computeIfAbsent(syncSignature, SignatureQueue::new);
134+
queue.add(request);
135+
136+
thdf.chainSubmit(new ChainTask(null) {
137+
@Override
138+
public String getSyncSignature() {
139+
return String.format("coalesce-queue-%s-%s", AbstractCoalesceQueue.this.getName(), syncSignature);
140+
}
141+
142+
@Override
143+
public void run(SyncTaskChain chain) {
144+
List<PendingRequest> requests = queue.takeAll();
145+
146+
if (requests.isEmpty()) {
147+
chain.next();
148+
return;
149+
}
150+
151+
String name = getName();
152+
logger.debug(String.format("[%s] coalescing %d requests for signature[%s]",
153+
name, requests.size(), syncSignature));
154+
155+
156+
// Create the specific completion type (Completion or ReturnValueCompletion)
157+
AbstractCompletion batchCompletion = createBatchCompletion(syncSignature, requests, chain);
158+
159+
// Execute batch with the direct completion object
160+
List<T> items = requests.stream().map(req -> req.item).collect(Collectors.toList());
161+
executeBatch(items, batchCompletion);
162+
}
163+
164+
@Override
165+
public String getName() {
166+
return String.format("%s-coalesced-batch-%s", AbstractCoalesceQueue.this.getName(), syncSignature);
167+
}
168+
169+
@Override
170+
protected int getSyncLevel() {
171+
return 1;
172+
}
173+
});
174+
}
175+
176+
private void cleanup(String syncSignature) {
177+
signatureQueues.computeIfPresent(syncSignature, (k, queue) -> {
178+
if (queue.isEmpty()) {
179+
return null;
180+
}
181+
return queue;
182+
});
183+
}
184+
185+
// For testing
186+
int getActiveQueueCount() {
187+
return signatureQueues.size();
188+
}
189+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package org.zstack.core.thread;
2+
3+
import org.zstack.header.core.AbstractCompletion;
4+
import org.zstack.header.core.Completion;
5+
import org.zstack.header.errorcode.ErrorCode;
6+
7+
import java.util.List;
8+
9+
/**
10+
* A coalesce queue for requests that do NOT expect a return value.
11+
*
12+
* @param <T> Request Item Type
13+
*/
14+
public abstract class CoalesceQueue<T> extends AbstractCoalesceQueue<T, Void, Void> {
15+
16+
/**
17+
* Submit a request.
18+
*
19+
* @param syncSignature the sync signature; requests with the same signature will be coalesced
20+
* @param item the request item
21+
* @param completion the completion callback
22+
*/
23+
public void submit(String syncSignature, T item, Completion completion) {
24+
submitRequest(syncSignature, item, completion);
25+
}
26+
27+
/**
28+
* Executes the batched requests.
29+
* <p>
30+
* Subclasses must implement this method to process the coalesced items.
31+
*
32+
* @param items the list of coalesced request items
33+
* @param completion the completion callback for the batch execution
34+
*/
35+
protected abstract void executeBatch(List<T> items, Completion completion);
36+
37+
@Override
38+
protected final void executeBatch(List<T> items, AbstractCompletion batchCompletion) {
39+
executeBatch(items, (Completion) batchCompletion);
40+
}
41+
42+
@Override
43+
protected final AbstractCompletion createBatchCompletion(String syncSignature, List<PendingRequest> requests, SyncTaskChain chain) {
44+
return new Completion(chain) {
45+
@Override
46+
public void success() {
47+
handleSuccess(syncSignature, requests, null, chain);
48+
}
49+
50+
@Override
51+
public void fail(ErrorCode errorCode) {
52+
handleFailure(syncSignature, requests, errorCode, chain);
53+
}
54+
};
55+
}
56+
57+
@Override
58+
protected final Void calculateResult(T item, Void batchResult) {
59+
return null;
60+
}
61+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.zstack.core.thread;
2+
3+
import org.zstack.header.core.AbstractCompletion;
4+
import org.zstack.header.core.ReturnValueCompletion;
5+
import org.zstack.header.errorcode.ErrorCode;
6+
7+
import java.util.List;
8+
9+
/**
10+
* A coalesce queue for requests that expect a return value.
11+
*
12+
* @param <T> Request Item Type
13+
* @param <R> Batch Execution Result Type
14+
* @param <V> Single Request Result Type
15+
*/
16+
public abstract class ReturnValueCoalesceQueue<T, R, V> extends AbstractCoalesceQueue<T, R, V> {
17+
18+
public void submit(String syncSignature, T item, ReturnValueCompletion<V> completion) {
19+
submitRequest(syncSignature, item, completion);
20+
}
21+
22+
protected abstract void executeBatch(List<T> items, ReturnValueCompletion<R> completion);
23+
24+
@Override
25+
protected final void executeBatch(List<T> items, AbstractCompletion batchCompletion) {
26+
executeBatch(items, (ReturnValueCompletion<R>) batchCompletion);
27+
}
28+
29+
@Override
30+
protected final AbstractCompletion createBatchCompletion(String syncSignature, List<PendingRequest> requests, SyncTaskChain chain) {
31+
return new ReturnValueCompletion<R>(null) {
32+
@Override
33+
public void success(R batchResult) {
34+
handleSuccess(syncSignature, requests, batchResult, chain);
35+
}
36+
37+
@Override
38+
public void fail(ErrorCode errorCode) {
39+
handleFailure(syncSignature, requests, errorCode, chain);
40+
}
41+
};
42+
}
43+
}

plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
import org.zstack.core.db.SQL;
1919
import org.zstack.core.defer.Defer;
2020
import org.zstack.core.defer.Deferred;
21-
import org.zstack.core.thread.SyncTask;
22-
import org.zstack.core.thread.ThreadFacade;
21+
import org.zstack.core.thread.*;
2322
import org.zstack.core.upgrade.GrayVersion;
2423
import org.zstack.core.workflow.SimpleFlowChain;
2524
import org.zstack.header.AbstractService;
@@ -121,6 +120,54 @@ public class FlatDhcpBackend extends AbstractService implements NetworkServiceDh
121120

122121
private Map<String, L3NetworkGetIpStatisticExtensionPoint> getIpStatisticExts = new HashMap<>();
123122

123+
/**
124+
* Request wrapper for DHCP apply coalescing.
125+
*/
126+
private static class DhcpApplyRequest {
127+
final String hostUuid;
128+
final List<DhcpInfo> dhcpInfos;
129+
final boolean rebuild;
130+
131+
DhcpApplyRequest(String hostUuid, List<DhcpInfo> dhcpInfos, boolean rebuild) {
132+
this.hostUuid = hostUuid;
133+
this.dhcpInfos = dhcpInfos;
134+
this.rebuild = rebuild;
135+
}
136+
}
137+
138+
private class DhcpApplyQueue extends CoalesceQueue<DhcpApplyRequest> {
139+
@Override
140+
protected String getName() {
141+
return "flat-dhcp-apply";
142+
}
143+
144+
@Override
145+
protected void executeBatch(List<DhcpApplyRequest> requests, Completion completion) {
146+
if (requests.isEmpty()) {
147+
completion.success();
148+
return;
149+
}
150+
151+
// All requests in the same batch have the same hostUuid
152+
String hostUuid = requests.get(0).hostUuid;
153+
154+
// Merge all DhcpInfo from all requests, grouped by L3 network
155+
// TODO: unify DHCP apply logic and switch to merged/batch flow everywhere
156+
boolean anyRebuild = false;
157+
List<DhcpInfo> mergedInfos = new ArrayList<>();
158+
for (DhcpApplyRequest req : requests) {
159+
anyRebuild = anyRebuild || req.rebuild;
160+
mergedInfos.addAll(req.dhcpInfos);
161+
}
162+
163+
logger.debug(String.format("Coalesced %d DHCP apply requests for host[uuid:%s]", requests.size(), hostUuid));
164+
165+
applyDhcpToHosts(mergedInfos, hostUuid, anyRebuild, completion);
166+
}
167+
}
168+
169+
private final DhcpApplyQueue dhcpApplyCoalesceQueue = new DhcpApplyQueue();
170+
124171
public static final String APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/apply";
125172
public static final String BATCH_APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/batchApply";
126173
public static final String PREPARE_DHCP_PATH = "/flatnetworkprovider/dhcp/prepare";
@@ -2074,7 +2121,10 @@ public void applyDhcpService(List<DhcpStruct> dhcpStructList, VmInstanceSpec spe
20742121
return;
20752122
}
20762123

2077-
applyDhcpToHosts(toDhcpInfo(dhcpStructList), spec.getDestHost().getUuid(), false, completion);
2124+
String hostUuid = spec.getDestHost().getUuid();
2125+
DhcpApplyRequest request = new DhcpApplyRequest(hostUuid, toDhcpInfo(dhcpStructList), false);
2126+
// Use coalesce queue: requests to the same host will be merged into a single batch
2127+
dhcpApplyCoalesceQueue.submit(hostUuid, request, completion);
20782128
}
20792129

20802130
private void releaseDhcpService(List<DhcpInfo> info, final String vmUuid, final String hostUuid, final NoErrorCompletion completion) {

0 commit comments

Comments
 (0)