diff --git a/compute/src/main/java/org/zstack/compute/zone/AbstractZone.java b/compute/src/main/java/org/zstack/compute/zone/AbstractZone.java index dd5e7f1c2fc..e327cd7f367 100755 --- a/compute/src/main/java/org/zstack/compute/zone/AbstractZone.java +++ b/compute/src/main/java/org/zstack/compute/zone/AbstractZone.java @@ -8,7 +8,6 @@ import org.zstack.header.zone.ZoneStateEvent; abstract class AbstractZone implements Zone { - private static DatabaseFacade dbf = Platform.getComponentLoader().getComponent(DatabaseFacade.class); private final static StateMachine stateMachine; static { diff --git a/core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java b/core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java new file mode 100644 index 00000000000..2706e7945eb --- /dev/null +++ b/core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java @@ -0,0 +1,189 @@ +package org.zstack.core.thread; + +import org.springframework.beans.factory.annotation.Autowire; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Configurable; +import org.zstack.header.core.AbstractCompletion; +import org.zstack.header.core.Completion; +import org.zstack.header.core.ReturnValueCompletion; +import org.zstack.header.errorcode.ErrorCode; +import org.zstack.utils.Utils; +import org.zstack.utils.logging.CLogger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * Base implementation for coalesce queues. + * + * @param Request Item Type + * @param Batch Execution Result Type + * @param Single Request Result Type + */ +@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE) +public abstract class AbstractCoalesceQueue { + private static final CLogger logger = Utils.getLogger(AbstractCoalesceQueue.class); + + @Autowired + private ThreadFacade thdf; + + private final ConcurrentHashMap signatureQueues = new ConcurrentHashMap<>(); + + protected class PendingRequest { + final T item; + final AbstractCompletion completion; + + PendingRequest(T item, AbstractCompletion completion) { + this.item = item; + this.completion = completion; + } + + @SuppressWarnings("unchecked") + void notifySuccess(V result) { + if (completion == null) { + return; + } + + if (completion instanceof ReturnValueCompletion) { + ((ReturnValueCompletion) completion).success(result); + } else if (completion instanceof Completion) { + ((Completion) completion).success(); + } + } + + void notifyFailure(ErrorCode errorCode) { + if (completion == null) { + return; + } + + if (completion instanceof ReturnValueCompletion) { + ((ReturnValueCompletion) completion).fail(errorCode); + } else if (completion instanceof Completion) { + ((Completion) completion).fail(errorCode); + } + } + } + + private class SignatureQueue { + final String syncSignature; + List pendingList = Collections.synchronizedList(new ArrayList<>()); + + SignatureQueue(String syncSignature) { + this.syncSignature = syncSignature; + } + + synchronized List takeAll() { + List toProcess = pendingList; + pendingList = Collections.synchronizedList(new ArrayList<>()); + return toProcess; + } + + synchronized void add(PendingRequest request) { + pendingList.add(request); + } + + synchronized boolean isEmpty() { + return pendingList.isEmpty(); + } + } + + protected abstract String getName(); + + // Changed to take AbstractCompletion, subclasses cast it to specific type + protected abstract void executeBatch(List items, AbstractCompletion completion); + + protected abstract AbstractCompletion createBatchCompletion(String syncSignature, List requests, SyncTaskChain chain); + + protected abstract V calculateResult(T item, R batchResult); + + protected final void handleSuccess(String syncSignature, List requests, R batchResult, SyncTaskChain chain) { + for (PendingRequest req : requests) { + try { + V singleResult = calculateResult(req.item, batchResult); + req.notifySuccess(singleResult); + } catch (Throwable t) { + logger.warn(String.format("[%s] failed to calculate result for item %s", getName(), req.item), t); + req.notifyFailure(org.zstack.core.Platform.operr("failed to calculate result: %s", t.getMessage())); + } + } + cleanup(syncSignature); + chain.next(); + } + + protected final void handleFailure(String syncSignature, List requests, ErrorCode errorCode, SyncTaskChain chain) { + for (PendingRequest req : requests) { + req.notifyFailure(errorCode); + } + cleanup(syncSignature); + chain.next(); + } + + void setThreadFacade(ThreadFacade thdf) { + this.thdf = thdf; + } + + protected final void submitRequest(String syncSignature, T item, AbstractCompletion completion) { + doSubmit(syncSignature, new PendingRequest(item, completion)); + } + + private void doSubmit(String syncSignature, PendingRequest request) { + SignatureQueue queue = signatureQueues.computeIfAbsent(syncSignature, SignatureQueue::new); + queue.add(request); + + thdf.chainSubmit(new ChainTask(null) { + @Override + public String getSyncSignature() { + return String.format("coalesce-queue-%s-%s", AbstractCoalesceQueue.this.getName(), syncSignature); + } + + @Override + public void run(SyncTaskChain chain) { + List requests = queue.takeAll(); + + if (requests.isEmpty()) { + chain.next(); + return; + } + + String name = getName(); + logger.debug(String.format("[%s] coalescing %d requests for signature[%s]", + name, requests.size(), syncSignature)); + + + // Create the specific completion type (Completion or ReturnValueCompletion) + AbstractCompletion batchCompletion = createBatchCompletion(syncSignature, requests, chain); + + // Execute batch with the direct completion object + List items = requests.stream().map(req -> req.item).collect(Collectors.toList()); + executeBatch(items, batchCompletion); + } + + @Override + public String getName() { + return String.format("%s-coalesced-batch-%s", AbstractCoalesceQueue.this.getName(), syncSignature); + } + + @Override + protected int getSyncLevel() { + return 1; + } + }); + } + + private void cleanup(String syncSignature) { + signatureQueues.computeIfPresent(syncSignature, (k, queue) -> { + if (queue.isEmpty()) { + return null; + } + return queue; + }); + } + + // For testing + int getActiveQueueCount() { + return signatureQueues.size(); + } +} diff --git a/core/src/main/java/org/zstack/core/thread/CoalesceQueue.java b/core/src/main/java/org/zstack/core/thread/CoalesceQueue.java new file mode 100644 index 00000000000..6fe18d189bb --- /dev/null +++ b/core/src/main/java/org/zstack/core/thread/CoalesceQueue.java @@ -0,0 +1,61 @@ +package org.zstack.core.thread; + +import org.zstack.header.core.AbstractCompletion; +import org.zstack.header.core.Completion; +import org.zstack.header.errorcode.ErrorCode; + +import java.util.List; + +/** + * A coalesce queue for requests that do NOT expect a return value. + * + * @param Request Item Type + */ +public abstract class CoalesceQueue extends AbstractCoalesceQueue { + + /** + * Submit a request. + * + * @param syncSignature the sync signature; requests with the same signature will be coalesced + * @param item the request item + * @param completion the completion callback + */ + public void submit(String syncSignature, T item, Completion completion) { + submitRequest(syncSignature, item, completion); + } + + /** + * Executes the batched requests. + *

+ * Subclasses must implement this method to process the coalesced items. + * + * @param items the list of coalesced request items + * @param completion the completion callback for the batch execution + */ + protected abstract void executeBatch(List items, Completion completion); + + @Override + protected final void executeBatch(List items, AbstractCompletion batchCompletion) { + executeBatch(items, (Completion) batchCompletion); + } + + @Override + protected final AbstractCompletion createBatchCompletion(String syncSignature, List requests, SyncTaskChain chain) { + return new Completion(chain) { + @Override + public void success() { + handleSuccess(syncSignature, requests, null, chain); + } + + @Override + public void fail(ErrorCode errorCode) { + handleFailure(syncSignature, requests, errorCode, chain); + } + }; + } + + @Override + protected final Void calculateResult(T item, Void batchResult) { + return null; + } +} diff --git a/core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java b/core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java new file mode 100644 index 00000000000..346824647fd --- /dev/null +++ b/core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java @@ -0,0 +1,43 @@ +package org.zstack.core.thread; + +import org.zstack.header.core.AbstractCompletion; +import org.zstack.header.core.ReturnValueCompletion; +import org.zstack.header.errorcode.ErrorCode; + +import java.util.List; + +/** + * A coalesce queue for requests that expect a return value. + * + * @param Request Item Type + * @param Batch Execution Result Type + * @param Single Request Result Type + */ +public abstract class ReturnValueCoalesceQueue extends AbstractCoalesceQueue { + + public void submit(String syncSignature, T item, ReturnValueCompletion completion) { + submitRequest(syncSignature, item, completion); + } + + protected abstract void executeBatch(List items, ReturnValueCompletion completion); + + @Override + protected final void executeBatch(List items, AbstractCompletion batchCompletion) { + executeBatch(items, (ReturnValueCompletion) batchCompletion); + } + + @Override + protected final AbstractCompletion createBatchCompletion(String syncSignature, List requests, SyncTaskChain chain) { + return new ReturnValueCompletion(null) { + @Override + public void success(R batchResult) { + handleSuccess(syncSignature, requests, batchResult, chain); + } + + @Override + public void fail(ErrorCode errorCode) { + handleFailure(syncSignature, requests, errorCode, chain); + } + }; + } +} diff --git a/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java b/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java index 962be8fe4ef..22d06c6e5f1 100755 --- a/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java +++ b/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java @@ -18,8 +18,7 @@ import org.zstack.core.db.SQL; import org.zstack.core.defer.Defer; import org.zstack.core.defer.Deferred; -import org.zstack.core.thread.SyncTask; -import org.zstack.core.thread.ThreadFacade; +import org.zstack.core.thread.*; import org.zstack.core.upgrade.GrayVersion; import org.zstack.core.workflow.SimpleFlowChain; import org.zstack.header.AbstractService; @@ -121,6 +120,54 @@ public class FlatDhcpBackend extends AbstractService implements NetworkServiceDh private Map getIpStatisticExts = new HashMap<>(); + /** + * Request wrapper for DHCP apply coalescing. + */ + private static class DhcpApplyRequest { + final String hostUuid; + final List dhcpInfos; + final boolean rebuild; + + DhcpApplyRequest(String hostUuid, List dhcpInfos, boolean rebuild) { + this.hostUuid = hostUuid; + this.dhcpInfos = dhcpInfos; + this.rebuild = rebuild; + } + } + + private class DhcpApplyQueue extends CoalesceQueue { + @Override + protected String getName() { + return "flat-dhcp-apply"; + } + + @Override + protected void executeBatch(List requests, Completion completion) { + if (requests.isEmpty()) { + completion.success(); + return; + } + + // All requests in the same batch have the same hostUuid + String hostUuid = requests.get(0).hostUuid; + + // Merge all DhcpInfo from all requests, grouped by L3 network + // TODO: unify DHCP apply logic and switch to merged/batch flow everywhere + boolean anyRebuild = false; + List mergedInfos = new ArrayList<>(); + for (DhcpApplyRequest req : requests) { + anyRebuild = anyRebuild || req.rebuild; + mergedInfos.addAll(req.dhcpInfos); + } + + logger.debug(String.format("Coalesced %d DHCP apply requests for host[uuid:%s]", requests.size(), hostUuid)); + + applyDhcpToHosts(mergedInfos, hostUuid, anyRebuild, completion); + } + } + + private final DhcpApplyQueue dhcpApplyCoalesceQueue = new DhcpApplyQueue(); + public static final String APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/apply"; public static final String BATCH_APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/batchApply"; public static final String PREPARE_DHCP_PATH = "/flatnetworkprovider/dhcp/prepare"; @@ -2074,7 +2121,10 @@ public void applyDhcpService(List dhcpStructList, VmInstanceSpec spe return; } - applyDhcpToHosts(toDhcpInfo(dhcpStructList), spec.getDestHost().getUuid(), false, completion); + String hostUuid = spec.getDestHost().getUuid(); + DhcpApplyRequest request = new DhcpApplyRequest(hostUuid, toDhcpInfo(dhcpStructList), false); + // Use coalesce queue: requests to the same host will be merged into a single batch + dhcpApplyCoalesceQueue.submit(hostUuid, request, completion); } private void releaseDhcpService(List info, final String vmUuid, final String hostUuid, final NoErrorCompletion completion) { diff --git a/test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy b/test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy new file mode 100644 index 00000000000..dcb797e6d2e --- /dev/null +++ b/test/src/test/groovy/org/zstack/test/integration/core/taskqueue/CoalesceQueueCase.groovy @@ -0,0 +1,479 @@ +package org.zstack.test.integration.core.chaintask + +import org.zstack.core.thread.CoalesceQueue +import org.zstack.core.thread.ReturnValueCoalesceQueue +import org.zstack.header.core.Completion +import org.zstack.header.core.ReturnValueCompletion +import org.zstack.header.errorcode.ErrorCode +import org.zstack.testlib.FailCoalesceQueue +import org.zstack.testlib.SubCase + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +class CoalesceQueueCase extends SubCase { + @Override + void clean() { + } + + @Override + void setup() { + } + + @Override + void environment() { + } + + @Override + void test() { + testCoalesceMultipleRequests() + testDifferentSignaturesNotCoalesced() + testBatchFailureNotifiesAllRequests() + testBatchThrowExceptionNotifiesAllRequests() + testReturnValueCompletion() + testResultCalculationFailure() + testSequentialBatches() + testHighVolumeNoLossAcrossBatches() + } + + void testCoalesceMultipleRequests() { + def requestCount = 10 + def completionLatch = new CountDownLatch(requestCount) + def batchExecutionCount = new AtomicInteger(0) + def processedItems = Collections.synchronizedList(new ArrayList()) + def completedTokens = Collections.synchronizedSet(new LinkedHashSet()) + + def queue = new CoalesceQueue() { + @Override + protected String getName() { + return "test-coalesce" + } + + @Override + protected void executeBatch(List items, Completion completion) { + batchExecutionCount.incrementAndGet() + processedItems.addAll(items) + + new Thread({ + try { + TimeUnit.MILLISECONDS.sleep(100) + } catch (InterruptedException ignored) { + } + completion.success() + }).start() + } + } + + def signature = "host-1" + (0.. + def token = "done-${idx}" + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + completedTokens.add(token) + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + completedTokens.add(token) + completionLatch.countDown() + } + }) + } + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert processedItems.size() == requestCount + assert batchExecutionCount.get() < requestCount + assert completedTokens.size() == requestCount + (0.. + assert completedTokens.contains("done-${idx}") + } + } + + void testDifferentSignaturesNotCoalesced() { + def signaturesCount = 3 + def requestsPerSignature = 5 + def totalRequests = signaturesCount * requestsPerSignature + def completionLatch = new CountDownLatch(totalRequests) + def batchExecutionCount = new AtomicInteger(0) + def completedTokens = Collections.synchronizedSet(new LinkedHashSet()) + + def queue = new CoalesceQueue() { + @Override + protected String getName() { + return "test-multi-sig" + } + + @Override + protected void executeBatch(List items, Completion completion) { + batchExecutionCount.incrementAndGet() + completion.success() + } + } + + (0.. + def signature = "host-${sig}" + (0.. + def item = "${signature}-item-${idx}" + def token = "done-${item}" + queue.submit(signature, item, new Completion(null) { + @Override + void success() { + completedTokens.add(token) + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + completedTokens.add(token) + completionLatch.countDown() + } + }) + } + } + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert batchExecutionCount.get() >= signaturesCount + assert completedTokens.size() == totalRequests + (0.. + def signature = "host-${sig}" + (0.. + assert completedTokens.contains("done-${signature}-item-${idx}") + } + } + } + + void testBatchFailureNotifiesAllRequests() { + def requestCount = 5 + def completionLatch = new CountDownLatch(requestCount) + def failureCount = new AtomicInteger(0) + def testError = org.zstack.core.Platform.operr("test error") + def completedTokens = Collections.synchronizedSet(new LinkedHashSet()) + + def queue = new CoalesceQueue() { + @Override + protected String getName() { + return "test-failure" + } + + @Override + protected void executeBatch(List items, Completion completion) { + completion.fail(testError) + } + } + + def signature = "host-fail" + (0.. + def token = "fail-${idx}" + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + completedTokens.add(token) + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + failureCount.incrementAndGet() + completedTokens.add(token) + completionLatch.countDown() + } + }) + } + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert failureCount.get() == requestCount + assert completedTokens.size() == requestCount + (0.. + assert completedTokens.contains("fail-${idx}") + } + } + + void testBatchThrowExceptionNotifiesAllRequests() { + def requestCount = 5 + def completionLatch = new CountDownLatch(requestCount) + def failureCount = new AtomicInteger(0) + def testError = org.zstack.core.Platform.operr("test error") + def completedTokens = Collections.synchronizedSet(new LinkedHashSet()) + + def queue = new FailCoalesceQueue() + + def signature = "host-throw" + (0.. + def token = "throw-${idx}" + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + completedTokens.add(token) + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + failureCount.incrementAndGet() + completedTokens.add(token) + completionLatch.countDown() + } + }) + } + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert failureCount.get() == requestCount + assert completedTokens.size() == requestCount + (0.. + assert completedTokens.contains("throw-${idx}") + } + } + + + void testReturnValueCompletion() { + def requestCount = 5 + def completionLatch = new CountDownLatch(requestCount) + def receivedResults = Collections.synchronizedMap(new LinkedHashMap()) + def mismatches = Collections.synchronizedList(new ArrayList()) + def batchResult = "batch-success" + + def queue = new ReturnValueCoalesceQueue() { + @Override + protected String getName() { + return "test-return-value" + } + + @Override + protected void executeBatch(List items, ReturnValueCompletion completion) { + completion.success(batchResult) + } + + @Override + protected String calculateResult(Integer item, String r) { + return "${r}-item-${item}" + } + } + + def signature = "host-result" + (0.. + queue.submit(signature, idx, new ReturnValueCompletion(null) { + @Override + void success(String result) { + def expected = String.format("%s-item-%s", batchResult, idx) + if (result != expected) { + mismatches.add(String.format("item-%s=%s", idx, result)) + } + receivedResults.put(idx, result) + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + completionLatch.countDown() + } + }) + } + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert receivedResults.size() == requestCount + assert mismatches.isEmpty() + (0.. + def expected = String.format("%s-item-%s", batchResult, idx) + assert receivedResults.get(idx) == expected + } + } + + void testResultCalculationFailure() { + def completionLatch = new CountDownLatch(2) + def successCount = new AtomicInteger(0) + def failCount = new AtomicInteger(0) + + def queue = new ReturnValueCoalesceQueue() { + @Override + protected String getName() { + return "test-calc-fail" + } + + @Override + protected void executeBatch(List items, ReturnValueCompletion completion) { + completion.success(null) + } + + @Override + protected String calculateResult(Integer item, Void batchResult) { + if (item == 0) { + throw new RuntimeException("Calculation failed for item 0 (on purpose)") + } + return "success" + } + } + + def signature = "host-calc" + queue.submit(signature, 0, new ReturnValueCompletion(null) { + @Override + void success(String ret) { + successCount.incrementAndGet() + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + failCount.incrementAndGet() + completionLatch.countDown() + } + }) + + queue.submit(signature, 1, new ReturnValueCompletion(null) { + @Override + void success(String ret) { + successCount.incrementAndGet() + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + failCount.incrementAndGet() + completionLatch.countDown() + } + }) + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert successCount.get() == 1 + assert failCount.get() == 1 + } + + void testSequentialBatches() { + def firstBatchStart = new CountDownLatch(1) + def firstBatchContinue = new CountDownLatch(1) + def secondBatchStart = new CountDownLatch(1) + def secondBatchContinue = new CountDownLatch(1) + def allComplete = new CountDownLatch(6) + def batches = Collections.synchronizedList(new ArrayList>()) + + def queue = new CoalesceQueue() { + @Override + protected String getName() { + return "test-sequential" + } + + @Override + protected void executeBatch(List items, Completion completion) { + batches.add(new ArrayList<>(items)) + + if (batches.size() == 1) { + firstBatchStart.countDown() + try { + firstBatchContinue.await(5, TimeUnit.SECONDS) + } catch (InterruptedException ignored) { + } + } else if (batches.size() == 2) { + secondBatchStart.countDown() + try { + secondBatchContinue.await(5, TimeUnit.SECONDS) + } catch (InterruptedException ignored) { + } + } + + completion.success() + } + } + + def signature = "host-seq" + queue.submit(signature, 0, new Completion(null) { + @Override + void success() { + allComplete.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + allComplete.countDown() + } + }) + + assert firstBatchStart.await(5, TimeUnit.SECONDS) + + (1..<4).each { idx -> + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + allComplete.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + allComplete.countDown() + } + }) + } + + assert secondBatchStart.await(5, TimeUnit.SECONDS) + + (4..<6).each { idx -> + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + allComplete.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + allComplete.countDown() + } + }) + } + + firstBatchContinue.countDown() + secondBatchContinue.countDown() + assert allComplete.await(10, TimeUnit.SECONDS) + assert batches.size() == 3 + assert batches.get(0) == [0] + assert batches.get(1).containsAll([1, 2, 3]) + assert batches.get(2).containsAll([4, 5]) + } + + void testHighVolumeNoLossAcrossBatches() { + def requestCount = 300 + def completionLatch = new CountDownLatch(requestCount) + def processedItems = Collections.synchronizedSet(new LinkedHashSet()) + def batchCount = new AtomicInteger(0) + + def queue = new CoalesceQueue() { + @Override + protected String getName() { + return "test-high-volume" + } + + @Override + protected void executeBatch(List items, Completion completion) { + batchCount.incrementAndGet() + processedItems.addAll(items) + + new Thread({ + try { + TimeUnit.MILLISECONDS.sleep(3) + } catch (InterruptedException ignored) { + } + completion.success() + }).start() + } + } + + def signature = "host-high-volume" + (0.. + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + completionLatch.countDown() + } + }) + } + + assert completionLatch.await(6, TimeUnit.SECONDS) + assert processedItems.size() == requestCount + assert batchCount.get() > 1 + } +} diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy index 6e7d69c10eb..f3b9239e879 100644 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy @@ -1,6 +1,7 @@ package org.zstack.test.integration.networkservice.provider.flat.dhcp import org.springframework.http.HttpEntity +import org.zstack.core.thread.ThreadFacade import org.zstack.header.network.service.NetworkServiceType import org.zstack.network.securitygroup.SecurityGroupConstant import org.zstack.network.service.eip.EipConstant @@ -9,6 +10,9 @@ import org.zstack.network.service.flat.FlatNetworkServiceConstant import org.zstack.network.service.userdata.UserdataConstant import org.zstack.network.service.virtualrouter.vyos.VyosConstants import org.zstack.sdk.HostInventory +import org.zstack.sdk.ImageInventory +import org.zstack.sdk.InstanceOfferingInventory +import org.zstack.sdk.L3NetworkInventory import org.zstack.sdk.VirtualRouterVmInventory import org.zstack.sdk.VmInstanceInventory import org.zstack.test.integration.networkservice.provider.NetworkServiceProviderTest @@ -17,6 +21,10 @@ import org.zstack.testlib.SubCase import org.zstack.utils.data.SizeUnit import org.zstack.utils.gson.JSONObjectUtil +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { EnvSpec env @Override @@ -147,12 +155,14 @@ class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { void test() { env.create { checkDhcpWork() + testBatchStartVmApplyDhcp() } } void checkDhcpWork(){ def host = queryHost {}[0] as HostInventory def vm = env.inventoryByName("vm") as VmInstanceInventory + def vmItemTokens = new LinkedHashSet() setVmHostname { uuid = vm.uuid @@ -164,6 +174,11 @@ class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) called += 1 + cmd.dhcpInfos.each { info -> + info.dhcp.each { dhcp -> + vmItemTokens.add(String.format("%s-%s-%s", dhcp.ip, dhcp.netmask, dhcp.gateway)) + } + } return rsp } @@ -176,12 +191,17 @@ class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { assert called == 1 assert cmd.dhcpInfos.size() == 1 assert cmd.dhcpInfos.get(0).dhcp.get(0).hostname == "test-name" + def vmNic = vm.vmNics.get(0) + def expectedToken = String.format("%s-%s-%s", vmNic.ip, vmNic.netmask, vmNic.gateway) + assert vmItemTokens.contains(expectedToken) called = 0 cmd = null + vmItemTokens.clear() reconnectHost { uuid=host.uuid } assert called == 1 assert cmd.dhcpInfos.get(0).dhcp.get(0).hostname == "test-name" + assert vmItemTokens.contains(expectedToken) def vr = queryVirtualRouterVm {}[0] as VirtualRouterVmInventory assert vr != null @@ -192,6 +212,129 @@ class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { assert called == 1 } + void testBatchStartVmApplyDhcp() { + L3NetworkInventory l3 = env.inventoryByName("l3-1") as L3NetworkInventory + ImageInventory image = env.inventoryByName("image") as ImageInventory + InstanceOfferingInventory offering = env.inventoryByName("instanceOffering") as InstanceOfferingInventory + + def vmCount = 4 + def vms = new ArrayList() + def hostnameByIp = new LinkedHashMap() + (0.. + def hname = "batch-${idx}" + VmInstanceInventory inv = createVmInstance { + name = "batch-vm-${idx}" + imageUuid = image.uuid + l3NetworkUuids = [l3.uuid] + instanceOfferingUuid = offering.uuid + } as VmInstanceInventory + setVmHostname { + uuid = inv.uuid + hostname = hname + } + hostnameByIp.put(inv.vmNics.get(0).ip, hname) + vms.add(inv) + } + + vms.each { vmInv -> + stopVmInstance { + uuid = vmInv.uuid + } + } + + def batchCmds = Collections.synchronizedList(new ArrayList()) + def firstBatchArrived = new CountDownLatch(1) + def releaseFirstBatch = new CountDownLatch(1) + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> + FlatDhcpBackend.BatchApplyDhcpCmd cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) + batchCmds.add(cmd) + if (batchCmds.size() == 1) { + firstBatchArrived.countDown() + releaseFirstBatch.await(10, TimeUnit.SECONDS) + } + return rsp + } + + VmInstanceInventory blocker = vms.remove(0) + new Thread({ + startVmInstance { + uuid = blocker.uuid + } + }).start() + assert firstBatchArrived.await(10, TimeUnit.SECONDS) + + CountDownLatch doneLatch = new CountDownLatch(vms.size()) + vms.each { vmInv -> + new Thread({ + try { + startVmInstance { + uuid = vmInv.uuid + } + } finally { + doneLatch.countDown() + } + }).start() + } + + ThreadFacade thdf = bean(ThreadFacade.class) + retryInSecs { + assert thdf.getChainTaskInfo(String.format("coalesce-queue-flat-dhcp-apply-%s", vms[0].hostUuid)).pendingTask.size() == 3 + } + + releaseFirstBatch.countDown() + assert doneLatch.await(2, TimeUnit.MINUTES) + + retryInSecs(5) { + assert batchCmds.size() == 2 + } + retryInSecs(2) { + assert batchCmds.size() == 2 + } + + Closure> toTokenSet = { FlatDhcpBackend.BatchApplyDhcpCmd batch -> + def tokens = new LinkedHashSet() + batch.dhcpInfos.each { info -> + info.dhcp.each { dhcp -> + tokens.add(String.format("%s-%s-%s", dhcp.ip, dhcp.netmask, dhcp.gateway)) + } + } + return tokens + } + + Closure> toHostnameMap = { FlatDhcpBackend.BatchApplyDhcpCmd batch -> + def hostnames = new LinkedHashMap() + batch.dhcpInfos.each { info -> + info.dhcp.each { dhcp -> + hostnames.put(dhcp.ip, dhcp.hostname) + } + } + return hostnames + } + + def firstBatchTokens = toTokenSet(batchCmds.get(0)) + def secondBatchTokens = toTokenSet(batchCmds.get(1)) + def firstBatchHostnames = toHostnameMap(batchCmds.get(0)) + def secondBatchHostnames = toHostnameMap(batchCmds.get(1)) + + def blockerNic = blocker.vmNics.get(0) + def blockerToken = String.format("%s-%s-%s", blockerNic.ip, blockerNic.netmask, blockerNic.gateway) + assert firstBatchTokens.size() == 1 + assert firstBatchTokens.contains(blockerToken) + assert firstBatchHostnames.size() == 1 + assert firstBatchHostnames.get(blockerNic.ip) == hostnameByIp.get(blockerNic.ip) + + def expectedTokens = new LinkedHashSet() + def expectedHostnames = new LinkedHashMap() + vms.each { vmInv -> + def nic = vmInv.vmNics.get(0) + expectedTokens.add(String.format("%s-%s-%s", nic.ip, nic.netmask, nic.gateway)) + expectedHostnames.put(nic.ip, hostnameByIp.get(nic.ip)) + } + assert secondBatchTokens.containsAll(expectedTokens) + assert secondBatchTokens.size() == expectedTokens.size() + assert secondBatchHostnames == expectedHostnames + } + @Override void clean() { env.delete() diff --git a/testlib/pom.xml b/testlib/pom.xml index 4d1688012a6..d1927c6d4fd 100644 --- a/testlib/pom.xml +++ b/testlib/pom.xml @@ -247,6 +247,39 @@ + + org.codehaus.mojo + aspectj-maven-plugin + ${aspectj.plugin.version} + + + + compile + test-compile + + + + + ${project.java.version} + ${project.java.version} + ${project.java.version} + true + + + org.springframework + spring-aspects + + + org.zstack + core + + + org.zstack + header + + + + diff --git a/testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java b/testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java new file mode 100644 index 00000000000..50c8e22d1d9 --- /dev/null +++ b/testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java @@ -0,0 +1,21 @@ +package org.zstack.testlib; + +import org.zstack.core.thread.CoalesceQueue; +import org.zstack.header.core.Completion; +import org.zstack.header.errorcode.OperationFailureException; + +import java.util.List; + +import static org.zstack.core.Platform.operr; + +public class FailCoalesceQueue extends CoalesceQueue { + @Override + protected String getName() { + return "test-failure"; + } + + @Override + protected void executeBatch(List items, Completion completion) { + throw new OperationFailureException(operr("test error")); + } +}