diff --git a/conf/springConfigXml/sugonSdnController.xml b/conf/springConfigXml/sugonSdnController.xml index f0ff6ad0033..c55e6da4903 100644 --- a/conf/springConfigXml/sugonSdnController.xml +++ b/conf/springConfigXml/sugonSdnController.xml @@ -67,11 +67,7 @@ - - - - - + diff --git a/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/controller/SugonSdnController.java b/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/controller/SugonSdnController.java index 45a68537e38..e63d69927c4 100644 --- a/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/controller/SugonSdnController.java +++ b/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/controller/SugonSdnController.java @@ -6,6 +6,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Configurable; import org.zstack.core.cloudbus.CloudBus; +import org.zstack.core.cloudbus.MessageSafe; import org.zstack.core.db.DatabaseFacade; import org.zstack.core.db.Q; import org.zstack.header.core.Completion; @@ -22,10 +23,13 @@ import org.zstack.network.l3.L3NetworkSystemTags; import org.zstack.sdnController.SdnController; import org.zstack.sdnController.SdnControllerL2; +import org.zstack.sdnController.SdnControllerPingMsg; +import org.zstack.sdnController.SdnControllerPingReply; import org.zstack.sdnController.header.*; import org.zstack.sugonSdnController.controller.api.*; import org.zstack.sugonSdnController.controller.api.types.*; import org.zstack.sugonSdnController.header.APICreateL2TfNetworkMsg; +import org.zstack.sugonSdnController.network.TfZstackPortSync; import org.zstack.utils.StringDSL; import org.zstack.utils.Utils; import org.zstack.utils.logging.CLogger; @@ -44,6 +48,8 @@ public class SugonSdnController implements TfSdnController, SdnController, SdnCo CloudBus bus; @Autowired DatabaseFacade dbf; + @Autowired + TfZstackPortSync tfZstackPortSync; private SdnControllerVO sdnControllerVO; private TfHttpClient client; @@ -55,8 +61,30 @@ public SugonSdnController(SdnControllerVO vo) { } @Override + @MessageSafe public void handleMessage(SdnControllerMessage msg) { - bus.dealWithUnknownMessage((Message) msg); + if (msg instanceof SdnControllerPingMsg) { + handMessage((SdnControllerPingMsg) msg); + } else { + bus.dealWithUnknownMessage((Message) msg); + } + } + + void handMessage(SdnControllerPingMsg msg) { + SdnControllerPingReply reply = new SdnControllerPingReply(); + try { + Domain domain = (Domain) client.getDomain(); + if (domain == null) { + reply.setError(operr("get default domain on tf controller failed")); + } else { + tfZstackPortSync.triggerSyncIfDue(msg.getSdnControllerUuid()); + reply.setSuccess(true); + } + } catch (Exception e) { + logger.warn("ping tf sdn controller failed", e); + reply.setError(operr("get default domain on tf controller failed")); + } + bus.reply(msg, reply); } @Override diff --git a/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/controller/neutronClient/TfPortClient.java b/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/controller/neutronClient/TfPortClient.java index 1054f708696..2c531684d1b 100644 --- a/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/controller/neutronClient/TfPortClient.java +++ b/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/controller/neutronClient/TfPortClient.java @@ -32,6 +32,22 @@ public TfPortClient(){ if (sdn == null){ throw new RuntimeException("Can not find a tf sdn controller."); } + init(sdn); + } + + public TfPortClient(String sdnControllerUuid) { + SdnControllerVO sdn = Q.New(SdnControllerVO.class).eq( + SdnControllerVO_.uuid, sdnControllerUuid).find(); + if (sdn == null) { + throw new RuntimeException(String.format("Can not find a tf sdn controller[uuid:%s].", sdnControllerUuid)); + } + if (!Objects.equals(sdn.getVendorType(), SdnControllerConstant.TF_CONTROLLER)) { + throw new RuntimeException(String.format("Sdn controller[uuid:%s] is not a tf sdn controller.", sdnControllerUuid)); + } + init(sdn); + } + + private void init(SdnControllerVO sdn) { client = new TfHttpClient(sdn.getIp()); tenantId = StringDSL.transToTfUuid(sdn.getAccountUuid()); } @@ -509,4 +525,3 @@ public Status updateTfPort(String tfPortUUid, String accountId, String deviceId, } } - diff --git a/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/network/TfPortService.java b/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/network/TfPortService.java index c4767689bd1..e7a9f0c0419 100644 --- a/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/network/TfPortService.java +++ b/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/network/TfPortService.java @@ -33,6 +33,11 @@ public List getTfPortsDetail() { return tfPortClient.getVirtualMachineInterfaceDetail(); } + public List getTfPortsDetail(String sdnControllerUuid) { + TfPortClient tfPortClient = new TfPortClient(sdnControllerUuid); + return tfPortClient.getVirtualMachineInterfaceDetail(); + } + public TfPortResponse createTfPort(String tfPortUUid, String l2NetworkUuid, String l3NetworkUuid, String mac, String ip) { //invoke tf rest interface to retrieve real ip and mac and portId TfPortClient tfPortClient = new TfPortClient(); @@ -115,6 +120,12 @@ public TfPortResponse deleteTfPort(String portUUid) { return tfPortClient.deletePort(tfPortUUid); } + public TfPortResponse deleteTfPort(String sdnControllerUuid, String portUUid) { + String tfPortUUid = StringDSL.transToTfUuid(portUUid); + TfPortClient tfPortClient = new TfPortClient(sdnControllerUuid); + return tfPortClient.deletePort(tfPortUUid); + } + public Status updateTfPort(String portUUid, String bmUuid, KeyValuePairs bindInfo) { String accountId = StringDSL.transToTfUuid(acntMgr.getOwnerAccountUuidOfResource(bmUuid)); String tfBmUUid = StringDSL.transToTfUuid(bmUuid); diff --git a/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/network/TfZstackPortSync.java b/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/network/TfZstackPortSync.java index 184b2e721ed..710b63cdb2b 100644 --- a/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/network/TfZstackPortSync.java +++ b/plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/network/TfZstackPortSync.java @@ -2,9 +2,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.zstack.core.db.Q; -import org.zstack.core.thread.PeriodicTask; +import org.zstack.core.thread.Task; import org.zstack.core.thread.ThreadFacade; -import org.zstack.header.managementnode.ManagementNodeReadyExtensionPoint; import org.zstack.header.network.l2.L2NetworkVO; import org.zstack.header.network.l2.L2NetworkVO_; import org.zstack.header.vm.VmNicVO; @@ -19,39 +18,57 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.concurrent.Future; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -public class TfZstackPortSync implements ManagementNodeReadyExtensionPoint { +public class TfZstackPortSync { + private static final long SYNC_INTERVAL_MILLIS = TimeUnit.DAYS.toMillis(1); + private static final int MAX_DELETE_COUNT = 10; @Autowired protected ThreadFacade thdf; - private Future trackerThread = null; @Autowired private TfPortService tfPortService; private final static CLogger logger = Utils.getLogger(TfZstackPortSync.class); private final List excludeTypes = new ArrayList(Arrays.asList("neutron:LOADBALANCER", "VIP", "BMS")); + private final Map lastSyncTime = new ConcurrentHashMap<>(); + private final Set runningSyncs = Collections.newSetFromMap(new ConcurrentHashMap()); - @Override - public void managementNodeReady() { - if (trackerThread != null) { - trackerThread.cancel(true); + public void triggerSyncIfDue(String sdnControllerUuid) { + if (sdnControllerUuid == null) { + logger.warn("Port_Sync_Task: skip sync because sdn controller uuid is null."); + return; } - trackerThread = thdf.submitPeriodicTask(new SyncPort()); - } - private class SyncPort implements PeriodicTask { + long now = System.currentTimeMillis(); + Long lastSync = lastSyncTime.get(sdnControllerUuid); + if (lastSync != null && now - lastSync < SYNC_INTERVAL_MILLIS) { + return; + } + if (!runningSyncs.add(sdnControllerUuid)) { + return; + } - @Override - public TimeUnit getTimeUnit() { - return TimeUnit.DAYS; + lastSyncTime.put(sdnControllerUuid, now); + try { + thdf.submit(new SyncPort(sdnControllerUuid)); + } catch (RuntimeException e) { + lastSyncTime.remove(sdnControllerUuid); + runningSyncs.remove(sdnControllerUuid); + throw e; } + } - @Override - public long getInterval() { - return 1; + private class SyncPort implements Task { + private final String sdnControllerUuid; + + private SyncPort(String sdnControllerUuid) { + this.sdnControllerUuid = sdnControllerUuid; } @Override @@ -64,7 +81,10 @@ private HashSet getPortToDelete() { List zstackL2NetworksUuid = Q.New(L2NetworkVO.class).select(L2NetworkVO_.uuid).listValues(); List tfPortsUuid = new ArrayList<>(); try{ - List tfPorts = tfPortService.getTfPortsDetail(); + List tfPorts = tfPortService.getTfPortsDetail(sdnControllerUuid); + if (tfPorts == null) { + return new HashSet<>(); + } for (VirtualMachineInterface vmi : tfPorts) { // skip port if it's network not in zstack List> tfNetworks = vmi.getVirtualNetwork(); @@ -83,7 +103,7 @@ private HashSet getPortToDelete() { } } catch (Exception e) { logger.error(String.format("Port_Sync_Task: Fetch tf VirtualMachineInterface failed: %s.", e)); - return null; + return new HashSet<>(); } HashSet result = new HashSet<>(tfPortsUuid); result.removeAll(zstackPortsUuid); @@ -92,13 +112,13 @@ private HashSet getPortToDelete() { } @Override - public void run() { + public Void call() { logger.info("Port_Sync_Task: begin."); try { HashSet portsToDelete = getPortToDelete(); - int maxDeleteCount = 10; + int maxDeleteCount = MAX_DELETE_COUNT; for (String portUuid: portsToDelete) { - TfPortResponse response = tfPortService.deleteTfPort(portUuid); + TfPortResponse response = tfPortService.deleteTfPort(sdnControllerUuid, portUuid); if (response.getCode() == 200) { logger.info(String.format("Port_Sync_Task: VirtualMachineInterface: %s delete success.", portUuid)); @@ -113,7 +133,10 @@ public void run() { } } catch (Exception e) { logger.error(String.format("Port_Sync_Task failed: %s.", e)); + } finally { + runningSyncs.remove(sdnControllerUuid); } + return null; } } } diff --git a/test/src/test/groovy/org/zstack/test/integration/network/sdnController/SugonSdnControllerCase.groovy b/test/src/test/groovy/org/zstack/test/integration/network/sdnController/SugonSdnControllerCase.groovy index 8e28757ba5a..e954020fa11 100644 --- a/test/src/test/groovy/org/zstack/test/integration/network/sdnController/SugonSdnControllerCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/network/sdnController/SugonSdnControllerCase.groovy @@ -1,7 +1,12 @@ package org.zstack.test.integration.network.sdnController +import org.springframework.http.HttpEntity import org.zstack.core.db.DatabaseFacade +import org.zstack.core.cloudbus.CloudBus +import org.zstack.header.message.MessageReply import org.zstack.header.network.sdncontroller.SdnControllerVO +import org.zstack.header.network.sdncontroller.SdnControllerConstant +import org.zstack.sdnController.SdnControllerPingMsg import org.zstack.sdk.* import org.zstack.sugonSdnController.controller.SugonSdnControllerConstant import org.zstack.sugonSdnController.controller.api.types.MacAddressesType @@ -16,6 +21,9 @@ import javax.persistence.TypedQuery; import org.springframework.http.ResponseEntity import org.springframework.http.HttpStatus +import javax.servlet.http.HttpServletRequest +import java.util.concurrent.atomic.AtomicInteger + class SugonSdnControllerCase extends SubCase { EnvSpec env @@ -37,6 +45,7 @@ class SugonSdnControllerCase extends SubCase { void test() { env.create { dbf = bean(DatabaseFacade.class) + testTfPortSyncTriggeredByPingSuccess() testTfApi() } } @@ -46,6 +55,53 @@ class SugonSdnControllerCase extends SubCase { env.delete() } + void testTfPortSyncTriggeredByPingSuccess() { + String sql = "select sdn" + + " from SdnControllerVO sdn" + + " where sdn.vendorType = :vendorType"; + TypedQuery q = dbf.getEntityManager().createQuery(sql, SdnControllerVO.class); + q.setParameter("vendorType", SugonSdnControllerConstant.TF_CONTROLLER); + SdnControllerVO sdn = q.getResultList().get(0); + AtomicInteger syncCount = new AtomicInteger(0) + + env.simulator(TfCommands.TF_CREATE_VMI) { HttpServletRequest req, HttpEntity e, EnvSpec spec -> + if (req.method == "GET") { + syncCount.incrementAndGet() + return '{"virtual-machine-interfaces":[]}' + } + + VirtualMachineInterface rsp = new VirtualMachineInterface(); + rsp.name = TfCommands.TEST_VMI_UUID + rsp.uuid = TfCommands.TEST_VMI_UUID + Project project = new Project(); + project.name = TfCommands.TEST_PROJECT_UUID + project.uuid = TfCommands.TEST_PROJECT_UUID + project.displayName = "admin"; + rsp.setParent(project) + String json = ApiSerializer.serializeObject("virtual-machine-interface", rsp); + ResponseEntity response = new ResponseEntity(json, HttpStatus.OK); + return response.getBody() + } + + sendTfPing(sdn.uuid) + retryInSecs { + assert syncCount.get() == 1 + } + + sleep(500) + sendTfPing(sdn.uuid) + assert syncCount.get() == 1 + } + + void sendTfPing(String sdnControllerUuid) { + CloudBus bus = bean(CloudBus.class) + SdnControllerPingMsg msg = new SdnControllerPingMsg() + msg.sdnControllerUuid = sdnControllerUuid + bus.makeTargetServiceIdByResourceUuid(msg, SdnControllerConstant.SERVICE_ID, sdnControllerUuid) + MessageReply reply = bus.call(msg) + assert reply.success + } + void testTfApi() { def zone = env.inventoryByName("zone") as ZoneInventory def cluster1 = env.inventoryByName("cluster1") as ClusterInventory