Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions conf/springConfigXml/sugonSdnController.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@
</zstack:plugin>
</bean>

<bean id="TfZstackPortSync" class="org.zstack.sugonSdnController.network.TfZstackPortSync">
<zstack:plugin>
<zstack:extension interface="org.zstack.header.managementnode.ManagementNodeReadyExtensionPoint" />
</zstack:plugin>
</bean>
<bean id="TfZstackPortSync" class="org.zstack.sugonSdnController.network.TfZstackPortSync" />

<bean id="SugonApiInterceptor" class="org.zstack.sugonSdnController.header.SugonApiInterceptor">
<zstack:plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.zstack.core.cloudbus.CloudBus;
import org.zstack.core.cloudbus.MessageSafe;
import org.zstack.core.db.DatabaseFacade;
import org.zstack.core.db.Q;
import org.zstack.header.core.Completion;
Expand All @@ -22,10 +23,13 @@
import org.zstack.network.l3.L3NetworkSystemTags;
import org.zstack.sdnController.SdnController;
import org.zstack.sdnController.SdnControllerL2;
import org.zstack.sdnController.SdnControllerPingMsg;
import org.zstack.sdnController.SdnControllerPingReply;
import org.zstack.sdnController.header.*;
import org.zstack.sugonSdnController.controller.api.*;
import org.zstack.sugonSdnController.controller.api.types.*;
import org.zstack.sugonSdnController.header.APICreateL2TfNetworkMsg;
import org.zstack.sugonSdnController.network.TfZstackPortSync;
import org.zstack.utils.StringDSL;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;
Expand All @@ -44,6 +48,8 @@ public class SugonSdnController implements TfSdnController, SdnController, SdnCo
CloudBus bus;
@Autowired
DatabaseFacade dbf;
@Autowired
TfZstackPortSync tfZstackPortSync;

private SdnControllerVO sdnControllerVO;
private TfHttpClient client;
Expand All @@ -55,8 +61,30 @@ public SugonSdnController(SdnControllerVO vo) {
}

@Override
@MessageSafe
public void handleMessage(SdnControllerMessage msg) {
bus.dealWithUnknownMessage((Message) msg);
if (msg instanceof SdnControllerPingMsg) {
handMessage((SdnControllerPingMsg) msg);
} else {
bus.dealWithUnknownMessage((Message) msg);
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

void handMessage(SdnControllerPingMsg msg) {
SdnControllerPingReply reply = new SdnControllerPingReply();
try {
Domain domain = (Domain) client.getDomain();
if (domain == null) {
reply.setError(operr("get default domain on tf controller failed"));
} else {
tfZstackPortSync.triggerSyncIfDue(msg.getSdnControllerUuid());
reply.setSuccess(true);
}
} catch (Exception e) {
logger.warn("ping tf sdn controller failed", e);
reply.setError(operr("get default domain on tf controller failed"));
}
bus.reply(msg, reply);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ public TfPortClient(){
if (sdn == null){
throw new RuntimeException("Can not find a tf sdn controller.");
}
init(sdn);
}

public TfPortClient(String sdnControllerUuid) {
SdnControllerVO sdn = Q.New(SdnControllerVO.class).eq(
SdnControllerVO_.uuid, sdnControllerUuid).find();
if (sdn == null) {
throw new RuntimeException(String.format("Can not find a tf sdn controller[uuid:%s].", sdnControllerUuid));
}
if (!Objects.equals(sdn.getVendorType(), SdnControllerConstant.TF_CONTROLLER)) {
throw new RuntimeException(String.format("Sdn controller[uuid:%s] is not a tf sdn controller.", sdnControllerUuid));
}
init(sdn);
}

private void init(SdnControllerVO sdn) {
client = new TfHttpClient(sdn.getIp());
tenantId = StringDSL.transToTfUuid(sdn.getAccountUuid());
}
Expand Down Expand Up @@ -509,4 +525,3 @@ public Status updateTfPort(String tfPortUUid, String accountId, String deviceId,
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public List<VirtualMachineInterface> getTfPortsDetail() {
return tfPortClient.getVirtualMachineInterfaceDetail();
}

public List<VirtualMachineInterface> getTfPortsDetail(String sdnControllerUuid) {
TfPortClient tfPortClient = new TfPortClient(sdnControllerUuid);
return tfPortClient.getVirtualMachineInterfaceDetail();
}

public TfPortResponse createTfPort(String tfPortUUid, String l2NetworkUuid, String l3NetworkUuid, String mac, String ip) {
//invoke tf rest interface to retrieve real ip and mac and portId
TfPortClient tfPortClient = new TfPortClient();
Expand Down Expand Up @@ -115,6 +120,12 @@ public TfPortResponse deleteTfPort(String portUUid) {
return tfPortClient.deletePort(tfPortUUid);
}

public TfPortResponse deleteTfPort(String sdnControllerUuid, String portUUid) {
String tfPortUUid = StringDSL.transToTfUuid(portUUid);
TfPortClient tfPortClient = new TfPortClient(sdnControllerUuid);
return tfPortClient.deletePort(tfPortUUid);
}

public Status updateTfPort(String portUUid, String bmUuid, KeyValuePairs bindInfo) {
String accountId = StringDSL.transToTfUuid(acntMgr.getOwnerAccountUuidOfResource(bmUuid));
String tfBmUUid = StringDSL.transToTfUuid(bmUuid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

import org.springframework.beans.factory.annotation.Autowired;
import org.zstack.core.db.Q;
import org.zstack.core.thread.PeriodicTask;
import org.zstack.core.thread.Task;
import org.zstack.core.thread.ThreadFacade;
import org.zstack.header.managementnode.ManagementNodeReadyExtensionPoint;
import org.zstack.header.network.l2.L2NetworkVO;
import org.zstack.header.network.l2.L2NetworkVO_;
import org.zstack.header.vm.VmNicVO;
Expand All @@ -19,39 +18,57 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Future;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class TfZstackPortSync implements ManagementNodeReadyExtensionPoint {
public class TfZstackPortSync {
private static final long SYNC_INTERVAL_MILLIS = TimeUnit.DAYS.toMillis(1);
private static final int MAX_DELETE_COUNT = 10;

@Autowired
protected ThreadFacade thdf;
private Future<Void> trackerThread = null;
@Autowired
private TfPortService tfPortService;
private final static CLogger logger = Utils.getLogger(TfZstackPortSync.class);
private final List<String> excludeTypes = new ArrayList<String>(Arrays.asList("neutron:LOADBALANCER", "VIP", "BMS"));
private final Map<String, Long> lastSyncTime = new ConcurrentHashMap<>();
private final Set<String> runningSyncs = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

@Override
public void managementNodeReady() {
if (trackerThread != null) {
trackerThread.cancel(true);
public void triggerSyncIfDue(String sdnControllerUuid) {
if (sdnControllerUuid == null) {
logger.warn("Port_Sync_Task: skip sync because sdn controller uuid is null.");
return;
}
trackerThread = thdf.submitPeriodicTask(new SyncPort());
}

private class SyncPort implements PeriodicTask {
long now = System.currentTimeMillis();
Long lastSync = lastSyncTime.get(sdnControllerUuid);
if (lastSync != null && now - lastSync < SYNC_INTERVAL_MILLIS) {
return;
}
if (!runningSyncs.add(sdnControllerUuid)) {
return;
}

@Override
public TimeUnit getTimeUnit() {
return TimeUnit.DAYS;
lastSyncTime.put(sdnControllerUuid, now);
try {
thdf.submit(new SyncPort(sdnControllerUuid));
} catch (RuntimeException e) {
lastSyncTime.remove(sdnControllerUuid);
runningSyncs.remove(sdnControllerUuid);
throw e;
}
Comment on lines +42 to 64

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

修正同步节流状态的竞态和失败标记。

当前 lastSyncTime 在任务成功前写入;而 getPortToDelete/call 会吞掉异常,导致一次失败同步也会被 Line 50 节流一天。另外,Line 49 的读取和 Line 57 的写入不是原子操作,并发 Ping 下可能绕过 runningSyncs 重复提交。

建议调整
-    public void triggerSyncIfDue(String sdnControllerUuid) {
+    public synchronized void triggerSyncIfDue(String sdnControllerUuid) {
         if (sdnControllerUuid == null) {
             logger.warn("Port_Sync_Task: skip sync because sdn controller uuid is null.");
             return;
         }
 
@@
         if (!runningSyncs.add(sdnControllerUuid)) {
             return;
         }
 
-        lastSyncTime.put(sdnControllerUuid, now);
         try {
             thdf.submit(new SyncPort(sdnControllerUuid));
         } catch (RuntimeException e) {
-            lastSyncTime.remove(sdnControllerUuid);
             runningSyncs.remove(sdnControllerUuid);
             throw e;
         }
@@
             } catch (Exception e) {
                 logger.error(String.format("Port_Sync_Task: Fetch tf VirtualMachineInterface failed: %s.", e));
-                return new HashSet<>();
+                throw new RuntimeException("Fetch tf VirtualMachineInterface failed", e);
             }
@@
         public Void call() {
             logger.info("Port_Sync_Task: begin.");
+            boolean syncSucceeded = false;
             try {
                 HashSet<String> portsToDelete = getPortToDelete();
                 int maxDeleteCount = MAX_DELETE_COUNT;
                 for (String portUuid: portsToDelete) {
@@
                         break;
                     }
                 }
+                syncSucceeded = true;
             } catch (Exception e) {
                 logger.error(String.format("Port_Sync_Task failed: %s.", e));
             } finally {
+                if (syncSucceeded) {
+                    lastSyncTime.put(sdnControllerUuid, System.currentTimeMillis());
+                }
                 runningSyncs.remove(sdnControllerUuid);
             }
             return null;
         }

Also applies to: 104-139

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@plugin/sugonSdnController/src/main/java/org/zstack/sugonSdnController/network/TfZstackPortSync.java`
around lines 42 - 64, `TfZstackPortSync.triggerSyncIfDue` is recording
`lastSyncTime` before `SyncPort` actually succeeds, and the current
read-check-add sequence around `lastSyncTime`/`runningSyncs` is not atomic.
Update the throttling so the sync timestamp is written only after a successful
`SyncPort.call` path, and make failures in `getPortToDelete`/`call` leave the
entry eligible for retry instead of being treated as a completed sync. Also
guard the `lastSyncTime.get(...)` plus `runningSyncs.add(...)` decision with a
single atomic section (or equivalent concurrent primitive) so concurrent
triggers cannot bypass `runningSyncs` and submit duplicate syncs.

}

@Override
public long getInterval() {
return 1;
private class SyncPort implements Task<Void> {
private final String sdnControllerUuid;

private SyncPort(String sdnControllerUuid) {
this.sdnControllerUuid = sdnControllerUuid;
}

@Override
Expand All @@ -64,7 +81,10 @@ private HashSet<String> getPortToDelete() {
List<String> zstackL2NetworksUuid = Q.New(L2NetworkVO.class).select(L2NetworkVO_.uuid).listValues();
List<String> tfPortsUuid = new ArrayList<>();
try{
List<VirtualMachineInterface> tfPorts = tfPortService.getTfPortsDetail();
List<VirtualMachineInterface> tfPorts = tfPortService.getTfPortsDetail(sdnControllerUuid);
if (tfPorts == null) {
return new HashSet<>();
}
for (VirtualMachineInterface vmi : tfPorts) {
// skip port if it's network not in zstack
List<ObjectReference<ApiPropertyBase>> tfNetworks = vmi.getVirtualNetwork();
Expand All @@ -83,7 +103,7 @@ private HashSet<String> getPortToDelete() {
}
} catch (Exception e) {
logger.error(String.format("Port_Sync_Task: Fetch tf VirtualMachineInterface failed: %s.", e));
return null;
return new HashSet<>();
}
HashSet<String> result = new HashSet<>(tfPortsUuid);
result.removeAll(zstackPortsUuid);
Expand All @@ -92,13 +112,13 @@ private HashSet<String> getPortToDelete() {
}

@Override
public void run() {
public Void call() {
logger.info("Port_Sync_Task: begin.");
try {
HashSet<String> portsToDelete = getPortToDelete();
int maxDeleteCount = 10;
int maxDeleteCount = MAX_DELETE_COUNT;
for (String portUuid: portsToDelete) {
TfPortResponse response = tfPortService.deleteTfPort(portUuid);
TfPortResponse response = tfPortService.deleteTfPort(sdnControllerUuid, portUuid);
if (response.getCode() == 200) {
logger.info(String.format("Port_Sync_Task: VirtualMachineInterface: %s delete success.",
portUuid));
Expand All @@ -113,7 +133,10 @@ public void run() {
}
} catch (Exception e) {
logger.error(String.format("Port_Sync_Task failed: %s.", e));
} finally {
runningSyncs.remove(sdnControllerUuid);
}
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package org.zstack.test.integration.network.sdnController

import org.springframework.http.HttpEntity
import org.zstack.core.db.DatabaseFacade
import org.zstack.core.cloudbus.CloudBus
import org.zstack.header.message.MessageReply
import org.zstack.header.network.sdncontroller.SdnControllerVO
import org.zstack.header.network.sdncontroller.SdnControllerConstant
import org.zstack.sdnController.SdnControllerPingMsg
import org.zstack.sdk.*
import org.zstack.sugonSdnController.controller.SugonSdnControllerConstant
import org.zstack.sugonSdnController.controller.api.types.MacAddressesType
Expand All @@ -16,6 +21,9 @@ import javax.persistence.TypedQuery;
import org.springframework.http.ResponseEntity
import org.springframework.http.HttpStatus

import javax.servlet.http.HttpServletRequest
import java.util.concurrent.atomic.AtomicInteger


class SugonSdnControllerCase extends SubCase {
EnvSpec env
Expand All @@ -37,6 +45,7 @@ class SugonSdnControllerCase extends SubCase {
void test() {
env.create {
dbf = bean(DatabaseFacade.class)
testTfPortSyncTriggeredByPingSuccess()
testTfApi()
}
}
Expand All @@ -46,6 +55,53 @@ class SugonSdnControllerCase extends SubCase {
env.delete()
}

void testTfPortSyncTriggeredByPingSuccess() {
String sql = "select sdn" +
" from SdnControllerVO sdn" +
" where sdn.vendorType = :vendorType";
TypedQuery<SdnControllerVO> q = dbf.getEntityManager().createQuery(sql, SdnControllerVO.class);
q.setParameter("vendorType", SugonSdnControllerConstant.TF_CONTROLLER);
SdnControllerVO sdn = q.getResultList().get(0);
AtomicInteger syncCount = new AtomicInteger(0)

env.simulator(TfCommands.TF_CREATE_VMI) { HttpServletRequest req, HttpEntity<String> e, EnvSpec spec ->
if (req.method == "GET") {
syncCount.incrementAndGet()
return '{"virtual-machine-interfaces":[]}'
}

VirtualMachineInterface rsp = new VirtualMachineInterface();
rsp.name = TfCommands.TEST_VMI_UUID
rsp.uuid = TfCommands.TEST_VMI_UUID
Project project = new Project();
project.name = TfCommands.TEST_PROJECT_UUID
project.uuid = TfCommands.TEST_PROJECT_UUID
project.displayName = "admin";
rsp.setParent(project)
String json = ApiSerializer.serializeObject("virtual-machine-interface", rsp);
ResponseEntity<String> response = new ResponseEntity<String>(json, HttpStatus.OK);
return response.getBody()
}

sendTfPing(sdn.uuid)
retryInSecs {
assert syncCount.get() == 1
}

sleep(500)
sendTfPing(sdn.uuid)
assert syncCount.get() == 1
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

void sendTfPing(String sdnControllerUuid) {
CloudBus bus = bean(CloudBus.class)
SdnControllerPingMsg msg = new SdnControllerPingMsg()
msg.sdnControllerUuid = sdnControllerUuid
bus.makeTargetServiceIdByResourceUuid(msg, SdnControllerConstant.SERVICE_ID, sdnControllerUuid)
MessageReply reply = bus.call(msg)
assert reply.success
}

void testTfApi() {
def zone = env.inventoryByName("zone") as ZoneInventory
def cluster1 = env.inventoryByName("cluster1") as ClusterInventory
Expand Down