diff --git a/compute/src/main/java/org/zstack/compute/vm/AbstractVmInstance.java b/compute/src/main/java/org/zstack/compute/vm/AbstractVmInstance.java index 6972470c964..9af3af26ee7 100755 --- a/compute/src/main/java/org/zstack/compute/vm/AbstractVmInstance.java +++ b/compute/src/main/java/org/zstack/compute/vm/AbstractVmInstance.java @@ -205,6 +205,12 @@ public abstract class AbstractVmInstance implements VmInstance { APIDestroyVmInstanceMsg.class.getName(), DestroyVmInstanceMsg.class.getName()); + // Registering state: only metadata-related reads, destroy (for cleanup/rollback), + // and ChangeVmMetaDataMsg (for state transitions during registration) are allowed. + allowedOperations.addState(VmInstanceState.Registering, + ChangeVmMetaDataMsg.class.getName(), + APIDestroyVmInstanceMsg.class.getName(), + DestroyVmInstanceMsg.class.getName()); stateChangeChecker.addState(VmInstanceStateEvent.unknown.toString(), VmInstanceState.Created.toString(), diff --git a/compute/src/main/java/org/zstack/compute/vm/VmExpungeMetadataFlow.java b/compute/src/main/java/org/zstack/compute/vm/VmExpungeMetadataFlow.java new file mode 100644 index 00000000000..946eeb3dce0 --- /dev/null +++ b/compute/src/main/java/org/zstack/compute/vm/VmExpungeMetadataFlow.java @@ -0,0 +1,91 @@ +package org.zstack.compute.vm; + +import org.springframework.beans.factory.annotation.Autowire; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Configurable; +import org.zstack.core.db.Q; +import org.zstack.header.core.Completion; +import org.zstack.header.core.workflow.FlowTrigger; +import org.zstack.header.core.workflow.NoRollbackFlow; +import org.zstack.header.errorcode.ErrorCode; +import org.zstack.header.vm.MetadataStorageHandler; +import org.zstack.header.vm.VmInstanceConstant; +import org.zstack.header.vm.VmInstanceSpec; +import org.zstack.header.volume.VolumeVO; +import org.zstack.header.volume.VolumeVO_; +import org.zstack.header.volume.VolumeType; +import org.zstack.utils.Utils; +import org.zstack.utils.logging.CLogger; + +import java.util.Map; + +/** + * VM 彻底删除(Expunge)时清理主存储上的元数据文件。 + * + *
设计要点(Part 02b §8.3):
+ *删除时机说明(Δ-5):元数据在 Expunge(物理删除)而非 Destroy(软删除) + * 阶段清理。Destroy 时 VM 可通过 Recover 恢复,过早删除会导致恢复后元数据丢失。
+ */ +@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE) +public class VmExpungeMetadataFlow extends NoRollbackFlow { + private static final CLogger logger = Utils.getLogger(VmExpungeMetadataFlow.class); + + @Autowired + private MetadataStorageHandler metadataStorageHandler; + + @Override + public void run(FlowTrigger trigger, Map data) { + final VmInstanceSpec spec = (VmInstanceSpec) data.get(VmInstanceConstant.Params.VmInstanceSpec.toString()); + final String vmUuid = spec.getVmInventory().getUuid(); + + // 功能开关检查:即使功能关闭,也尝试清理已有的元数据文件(best-effort) + // 不检查 VM_METADATA 开关——Expunge 是不可逆操作,应始终尝试清理残留 + + // 通过根卷查找 PS UUID + String rootVolumeUuid = spec.getVmInventory().getRootVolumeUuid(); + if (rootVolumeUuid == null) { + // VM 处于中间状态,无根卷,跳过 + logger.debug(String.format("[MetadataExpunge] vm[uuid:%s] has no root volume, skipping metadata cleanup", vmUuid)); + trigger.next(); + return; + } + + String psUuid = Q.New(VolumeVO.class) + .eq(VolumeVO_.uuid, rootVolumeUuid) + .select(VolumeVO_.primaryStorageUuid) + .findValue(); + + if (psUuid == null) { + // 根卷已被删除或无 PS 信息,跳过 + logger.debug(String.format("[MetadataExpunge] vm[uuid:%s] root volume[uuid:%s] has no primaryStorageUuid, " + + "skipping metadata cleanup", vmUuid, rootVolumeUuid)); + trigger.next(); + return; + } + + logger.info(String.format("[MetadataExpunge] deleting metadata for vm[uuid:%s] on ps[uuid:%s]", vmUuid, psUuid)); + + metadataStorageHandler.deleteMetadata(psUuid, vmUuid, new Completion(trigger) { + @Override + public void success() { + logger.info(String.format("[MetadataExpunge] metadata deleted for vm[uuid:%s] on ps[uuid:%s]", vmUuid, psUuid)); + trigger.next(); + } + + @Override + public void fail(ErrorCode errorCode) { + // best-effort:失败不阻塞 VM 物理清除 + logger.warn(String.format("[MetadataExpunge] failed to delete metadata for vm[uuid:%s] on ps[uuid:%s], " + + "continuing expunge. Error: %s", vmUuid, psUuid, errorCode)); + trigger.next(); + } + }); + } +} diff --git a/compute/src/main/java/org/zstack/compute/vm/VmGlobalConfig.java b/compute/src/main/java/org/zstack/compute/vm/VmGlobalConfig.java index bd79900c13c..c0442dcf4c2 100755 --- a/compute/src/main/java/org/zstack/compute/vm/VmGlobalConfig.java +++ b/compute/src/main/java/org/zstack/compute/vm/VmGlobalConfig.java @@ -133,4 +133,139 @@ public class VmGlobalConfig { @GlobalConfigValidation(validValues = {"None", "AuthenticAMD"}) @BindResourceConfig(value = {VmInstanceVO.class}) public static GlobalConfig VM_CPUID_VENDOR = new GlobalConfig(CATEGORY, "vm.cpuid.vendor"); + + @GlobalConfigValidation(numberGreaterThan = 1) + public static GlobalConfig GC_INTERVAL = new GlobalConfig(CATEGORY, "deletion.gcInterval"); + + @GlobalConfigValidation(validValues = {"true", "false"}) + public static GlobalConfig VM_METADATA = new GlobalConfig(CATEGORY, "vm.metadata"); + + @GlobalConfigDef(defaultValue = "5", type = Integer.class, + description = "Max concurrent metadata writes per primary storage per MN") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_PS_MAX_CONCURRENT = new GlobalConfig(CATEGORY, "vm.metadata.ps.maxConcurrent"); + + @GlobalConfigDef(defaultValue = "10", type = Integer.class, + description = "Max concurrent VM metadata updates globally per MN") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_GLOBAL_MAX_CONCURRENT = new GlobalConfig(CATEGORY, "vm.metadata.global.maxConcurrent"); + + @GlobalConfigDef(defaultValue = "10", type = Integer.class, + description = "Initial GC delay in seconds after API success") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_GC_INITIAL_DELAY_SEC = new GlobalConfig(CATEGORY, "vm.metadata.gc.initialDelaySec"); + + @GlobalConfigDef(defaultValue = "5", type = Integer.class, + description = "Max retry count before giving up metadata flush") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_MAX_RETRY = new GlobalConfig(CATEGORY, "vm.metadata.maxRetry"); + + @GlobalConfigDef(defaultValue = "5", type = Long.class, + description = "Dirty poller interval in seconds") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_DIRTY_POLL_INTERVAL = new GlobalConfig(CATEGORY, "vm.metadata.dirty.pollIntervalSec"); + + @GlobalConfigDef(defaultValue = "20", type = Integer.class, + description = "Max dirty rows to claim per poller cycle") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_DIRTY_BATCH_SIZE = new GlobalConfig(CATEGORY, "vm.metadata.dirty.batchSize"); + + @GlobalConfigDef(defaultValue = "300", type = Long.class, + description = "Path fingerprint check interval in seconds") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_PATH_CHECK_INTERVAL = new GlobalConfig(CATEGORY, "vm.metadata.pathCheck.intervalSec"); + + @GlobalConfigDef(defaultValue = "500", type = Integer.class, + description = "Path fingerprint check keyset pagination batch size") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_PATH_CHECK_BATCH_SIZE = new GlobalConfig(CATEGORY, "vm.metadata.pathCheck.batchSize"); + + @GlobalConfigDef(defaultValue = "600", type = Long.class, + description = "Delay in seconds before full refresh after upgrade, waiting for rolling upgrade to complete") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_UPGRADE_REFRESH_DELAY = new GlobalConfig(CATEGORY, "vm.metadata.upgrade.refreshDelaySec"); + + @GlobalConfigDef(defaultValue = "1000", type = Integer.class, + description = "Upgrade full refresh SQL batch size") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_UPGRADE_REFRESH_BATCH_SIZE = new GlobalConfig(CATEGORY, "vm.metadata.upgrade.refreshBatchSize"); + + @GlobalConfigDef(defaultValue = "5", type = Long.class, + description = "Delay in seconds after nodeLeft before takeover, reduces zombie MN race condition") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_NODE_LEFT_DELAY = new GlobalConfig(CATEGORY, "vm.metadata.nodeLeft.delaySec"); + + @GlobalConfigDef(defaultValue = "1800", type = Long.class, + description = "MetadataStaleRecoveryTask scan interval in seconds") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_STALE_RECOVERY_INTERVAL = new GlobalConfig(CATEGORY, "vm.metadata.staleRecovery.intervalSec"); + + @GlobalConfigDef(defaultValue = "100", type = Integer.class, + description = "MetadataStaleRecoveryTask rows per scan batch") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_STALE_RECOVERY_BATCH_SIZE = new GlobalConfig(CATEGORY, "vm.metadata.staleRecovery.batchSize"); + + @GlobalConfigDef(defaultValue = "10", type = Integer.class, + description = "Max consecutive stale recovery cycles per VM before circuit-break") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_STALE_RECOVERY_MAX_CYCLES = new GlobalConfig(CATEGORY, "vm.metadata.staleRecovery.maxCycles"); + + @GlobalConfigDef(defaultValue = "45", type = Long.class, + description = "Pending API timeout cleanup threshold in minutes") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_PENDING_API_TIMEOUT = new GlobalConfig(CATEGORY, "vm.metadata.pendingApi.timeoutMinutes"); + + @GlobalConfigDef(defaultValue = "10", type = Integer.class, + description = "Exponential backoff base delay in seconds") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_RETRY_BASE_DELAY = new GlobalConfig(CATEGORY, "vm.metadata.retry.baseDelaySeconds"); + + @GlobalConfigDef(defaultValue = "10", type = Integer.class, + description = "Exponential backoff max exponent") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_RETRY_MAX_EXPONENT = new GlobalConfig(CATEGORY, "vm.metadata.retry.maxExponent"); + + @GlobalConfigDef(defaultValue = "200", type = Integer.class, + description = "Batch size per round when enabling metadata (false to true init)") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_INIT_BATCH_SIZE = new GlobalConfig(CATEGORY, "vm.metadata.init.batchSize"); + + @GlobalConfigDef(defaultValue = "5", type = Long.class, + description = "Delay in seconds between init batches to prevent IO storm") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_INIT_BATCH_DELAY = new GlobalConfig(CATEGORY, "vm.metadata.init.batchDelaySec"); + + @GlobalConfigDef(defaultValue = "3600", type = Long.class, + description = "Orphan metadata detection interval in seconds") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_ORPHAN_CHECK_INTERVAL = new GlobalConfig(CATEGORY, "vm.metadata.orphanCheck.intervalSec"); + + @GlobalConfigDef(defaultValue = "15", type = Long.class, + description = "Zombie claim threshold in minutes: claimed dirty rows older than this are released") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_ZOMBIE_CLAIM_THRESHOLD = new GlobalConfig(CATEGORY, "vm.metadata.zombieClaim.thresholdMinutes"); + + @GlobalConfigDef(defaultValue = "30", type = Long.class, + description = "Stale claim threshold in minutes for background recovery task") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_STALE_CLAIM_THRESHOLD = new GlobalConfig(CATEGORY, "vm.metadata.staleClaim.thresholdMinutes"); + + @GlobalConfigDef(defaultValue = "10", type = Long.class, + description = "Inline stale claim takeover threshold in minutes for triggerFlushForVm hot path") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_TRIGGER_FLUSH_STALE = new GlobalConfig(CATEGORY, "vm.metadata.triggerFlush.staleMinutes"); + + @GlobalConfigDef(defaultValue = "3", type = Integer.class, + description = "Max retry count for deleteMetadata in ExpungeVmInstanceFlow") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_DELETE_MAX_RETRY = new GlobalConfig(CATEGORY, "vm.metadata.delete.maxRetry"); + + @GlobalConfigDef(defaultValue = "30", type = Long.class, + description = "Base delay in seconds for deleteMetadata retry backoff") + @GlobalConfigValidation(numberGreaterThan = 0) + public static GlobalConfig VM_METADATA_DELETE_BASE_DELAY = new GlobalConfig(CATEGORY, "vm.metadata.delete.baseDelaySec"); + + @GlobalConfigDef(defaultValue = "", type = String.class, + description = "Last completed upgrade refresh version, prevents duplicate triggers across MNs. Internal use only") + public static GlobalConfig VM_METADATA_LAST_REFRESH_VERSION = new GlobalConfig(CATEGORY, "vm.metadata.lastRefreshVersion"); } diff --git a/compute/src/main/java/org/zstack/compute/vm/VmInstanceApiInterceptor.java b/compute/src/main/java/org/zstack/compute/vm/VmInstanceApiInterceptor.java index c17cf5d5179..0ce04419dcb 100755 --- a/compute/src/main/java/org/zstack/compute/vm/VmInstanceApiInterceptor.java +++ b/compute/src/main/java/org/zstack/compute/vm/VmInstanceApiInterceptor.java @@ -21,6 +21,7 @@ import org.zstack.header.message.APIMessage; import org.zstack.header.network.l2.*; import org.zstack.header.network.l3.*; +import org.zstack.header.storage.primary.APIRegisterVmInstanceMsg; import org.zstack.header.storage.primary.PrimaryStorageClusterRefVO; import org.zstack.header.storage.primary.PrimaryStorageClusterRefVO_; import org.zstack.header.storage.snapshot.VolumeSnapshotVO; diff --git a/compute/src/main/java/org/zstack/compute/vm/VmInstanceBase.java b/compute/src/main/java/org/zstack/compute/vm/VmInstanceBase.java index e31bc001218..7dcb5e6899a 100755 --- a/compute/src/main/java/org/zstack/compute/vm/VmInstanceBase.java +++ b/compute/src/main/java/org/zstack/compute/vm/VmInstanceBase.java @@ -45,6 +45,15 @@ import org.zstack.header.message.*; import org.zstack.header.network.l3.*; import org.zstack.header.storage.primary.*; +import org.zstack.header.storage.snapshot.*; +import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupRefVO; +import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupRefVO_; +import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupVO; +import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupVO_; +import org.zstack.header.tag.SystemTagVO; +import org.zstack.header.tag.SystemTagVO_; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import org.zstack.header.vm.*; import org.zstack.header.vm.ChangeVmMetaDataMsg.AtomicHostUuid; import org.zstack.header.vm.ChangeVmMetaDataMsg.AtomicVmState; @@ -66,23 +75,19 @@ import org.zstack.network.l3.L3NetworkManager; import org.zstack.network.service.DnsUtils; import org.zstack.network.service.NetworkServiceManager; -import org.zstack.resourceconfig.ResourceConfig; -import org.zstack.resourceconfig.ResourceConfigFacade; +import org.zstack.resourceconfig.*; import org.zstack.tag.SystemTagCreator; import org.zstack.tag.SystemTagUtils; import org.zstack.tag.TagManager; -import org.zstack.utils.CollectionUtils; -import org.zstack.utils.ExceptionDSL; -import org.zstack.utils.ObjectUtils; -import org.zstack.utils.Utils; +import org.zstack.utils.*; import org.zstack.utils.function.ForEachFunction; import org.zstack.utils.function.Function; import org.zstack.utils.gson.JSONObjectUtil; import org.zstack.utils.logging.CLogger; -import org.zstack.utils.network.NicIpAddressInfo; import org.zstack.utils.network.IPv6Constants; import org.zstack.utils.network.IPv6NetworkUtils; import org.zstack.utils.network.NetworkUtils; +import org.zstack.utils.network.NicIpAddressInfo; import javax.persistence.PersistenceException; import javax.persistence.Tuple; @@ -90,6 +95,7 @@ import java.sql.Timestamp; import java.time.LocalDateTime; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static java.util.Arrays.asList; @@ -140,6 +146,10 @@ public class VmInstanceBase extends AbstractVmInstance { private VmInstanceResourceMetadataManager vidm; @Autowired private NetworkServiceManager nwServiceMgr; + @Autowired + private ResourceDestinationMaker destMaker; + @Autowired + private org.zstack.compute.vm.metadata.VmMetadataBuilder vmMetadataBuilder; protected VmInstanceVO self; protected VmInstanceVO originalCopy; @@ -533,6 +543,8 @@ protected void handleLocalMessage(Message msg) { handle((CancelFlattenVmInstanceMsg) msg); } else if (msg instanceof KvmReportVmShutdownEventMsg) { handle((KvmReportVmShutdownEventMsg) msg); + } else if (msg instanceof UpdateVmInstanceMetadataMsg) { + handle((UpdateVmInstanceMetadataMsg) msg); } else { VmInstanceBaseExtensionFactory ext = vmMgr.getVmInstanceBaseExtensionFactory(msg); if (ext != null) { @@ -3184,6 +3196,17 @@ protected List通过 ChainTask 确保同一 VM 的元数据更新串行执行。 + * 该消息由 VmMetadataDirtyMarker 发送到本地 VM 服务, + * 内部从 DB 全量构建 metadata payload 后写入主存储。
+ * + *失败路径直接返回错误 reply,由 VmMetadataDirtyMarker 的 + * onFlushFailure() 统一处理重试和指数退避。
+ */ + private void handle(UpdateVmInstanceMetadataMsg msg) { + thdf.chainSubmit(new ChainTask(msg) { + @Override + public String getSyncSignature() { + return String.format("handle-update-vm-%s-metadata", msg.getUuid()); + } + + @Override + public void run(SyncTaskChain chain) { + doHandleUpdateVmInstanceMetadata(msg); + chain.next(); + } + + @Override + public String getName() { + return String.format("handle-update-vm-%s-metadata-task", msg.getUuid()); + } + }); + } + + private void doHandleUpdateVmInstanceMetadata(UpdateVmInstanceMetadataMsg msg) { + // 1. 构建 payload(通过 VmMetadataBuilder 在 @Transactional(readOnly=true) 事务内完成) + String metadata = vmMetadataBuilder.buildVmInstanceMetadata(msg.getUuid()); + + // 2. Payload 大小保护 + int payloadSize = metadata.getBytes(java.nio.charset.StandardCharsets.UTF_8).length; + if (payloadSize > org.zstack.compute.vm.metadata.VmMetadataBuilder.REJECT_THRESHOLD) { + logger.error(String.format("metadata payload too large: %d bytes for vm[uuid:%s], rejecting", + payloadSize, msg.getUuid())); + MessageReply reply = new MessageReply(); + reply.setError(Platform.operr("metadata payload too large (%d bytes, limit %d) for vm[uuid=%s]", + payloadSize, org.zstack.compute.vm.metadata.VmMetadataBuilder.REJECT_THRESHOLD, msg.getUuid())); + bus.reply(msg, reply); + return; + } + if (payloadSize > org.zstack.compute.vm.metadata.VmMetadataBuilder.WARN_THRESHOLD) { + logger.warn(String.format("metadata payload large: %d bytes for vm[uuid:%s]", + payloadSize, msg.getUuid())); + } + + // 3. 发送到主存储 + Tuple tuple = Q.New(VolumeVO.class).select(VolumeVO_.primaryStorageUuid, VolumeVO_.uuid) + .eq(VolumeVO_.vmInstanceUuid, msg.getUuid()).eq(VolumeVO_.type, VolumeType.Root).findTuple(); + String primaryStorageUuid = tuple.get(0, String.class); + String rootVolumeUuid = tuple.get(1, String.class); + + UpdateVmInstanceMetadataOnPrimaryStorageMsg umsg = new UpdateVmInstanceMetadataOnPrimaryStorageMsg(); + umsg.setMetadata(metadata); + umsg.setPrimaryStorageUuid(primaryStorageUuid); + umsg.setRootVolumeUuid(rootVolumeUuid); + umsg.setStorageStructureChange(msg.isStorageStructureChange()); + bus.makeLocalServiceId(umsg, PrimaryStorageConstant.SERVICE_ID); + bus.send(umsg, new CloudBusCallBack(msg) { + @Override + public void run(MessageReply r) { + UpdateVmInstanceMetadataOnPrimaryStorageReply reply = new UpdateVmInstanceMetadataOnPrimaryStorageReply(); + + if (!r.isSuccess()) { + reply.setError(Platform.operr("failed to update vm[uuid=%s] metadata on primary storage", + msg.getUuid()).withCause(r.getError())); + } + bus.reply(msg, reply); + } + }); + } +} diff --git a/compute/src/main/java/org/zstack/compute/vm/VmInstanceMetadataFieldProcessor.java b/compute/src/main/java/org/zstack/compute/vm/VmInstanceMetadataFieldProcessor.java new file mode 100644 index 00000000000..e2b74d42f02 --- /dev/null +++ b/compute/src/main/java/org/zstack/compute/vm/VmInstanceMetadataFieldProcessor.java @@ -0,0 +1,233 @@ +package org.zstack.compute.vm; + +import org.zstack.header.vm.VmInstanceMetadataDTO; +import org.zstack.header.vm.VmInstanceMetadataRegistrationSpec; +import org.zstack.utils.gson.JSONObjectUtil; + +import java.util.*; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * 虚拟机元数据注册时的字段处理器。 + * + *根据"注册字段处理矩阵"的规则,对反序列化后的 VO JSON 字段执行: + * 保留 / 替换 / 设 null / 重新生成 / 硬编码 等操作。
+ * + *处理采用 Map 操作方式(而非反序列化为具体 VO 类), + * 避免字段类型变更导致的兼容性问题。
+ * + * @see VmInstanceMetadataRegistrationSpec + */ +public class VmInstanceMetadataFieldProcessor { + + private VmInstanceMetadataFieldProcessor() { + } + + // ================================================================ + // VmInstanceVO + // ================================================================ + + /** + * VmInstanceVO 中注册时需要设为 null 的字段。 + */ + private static final Set处理规则: + *
处理规则: + *
处理规则: + *
注册时,仅处理属于当前存储的 volume 及其关联快照。 + * 不属于当前存储的 volume 跳过。
+ * + * @param dto 完整元数据 DTO + * @param pathIdentifier 旧存储路径标识符 + * @return 属于该存储的 volume resourceUuid 集合 + */ + public static Set覆盖绝大多数 VM 直接 API(如 APIUpdateVmInstanceMsg、APIStartVmInstanceMsg 等)。
+ */ +public class DefaultVmUuidFromApiResolver implements VmUuidFromApiResolver { + + @Override + public boolean supports(APIMessage msg) { + return msg instanceof VmInstanceMessage; + } + + @Override + public List{@code @MetadataImpact} 注解仅标注在 {@code APIMessage} 子类上, + * 通过 {@link VmMetadataUpdateInterceptor} 自动触发标脏。 + * 但系统中存在不经过 API 拦截器的级联删除操作也会修改 VM 存储拓扑, + * 例如:删除 PrimaryStorage → 级联删除 Volume → VM 失去数据卷。 + * 本扩展在级联清理阶段({@code DELETION_CLEANUP_CODE})捕获这些事件, + * 为受影响的 VM 调用 markDirty。
+ * + *+ * ... → PrimaryStorageVO → VolumeVO → VmInstanceMetadata (本扩展) + * → VolumeSnapshotVO → ... + *+ * + *
当前支持的 parentIssuer:
+ *VM 删除时元数据同步清理可能因 IO 错误失败(3 次重试后放弃), + * 或 VM 创建失败导致残留。本检测器作为安全网,周期性地扫描每个 + * 支持元数据的 PS,比对存储侧 vmUuid 列表与 DB 中实际存在的 VM, + * 发现孤儿后仅记录日志告警,不执行自动删除。
+ * + *{@code MetadataStorageHandler} 接口及其实现(SblkMetadataStorageHandler, + * LocalNfsMetadataStorageHandler)尚未创建。本类在 scanMetadataVmUuids 可用后 + * 需取消 TODO 标记并完成 Agent 调用接入。
+ */ +public class MetadataOrphanDetector implements Component, ManagementNodeReadyExtensionPoint { + private static final CLogger logger = Utils.getLogger(MetadataOrphanDetector.class); + + // TODO: 待 MetadataStorageHandler 接口创建后注入 + // @Autowired + // private ListC-02B-14: 仅报告(WARN 日志),不执行 deleteMetadata。
+ */ + private void detectOrphans() { + if (!VmGlobalConfig.VM_METADATA.value(Boolean.class)) { + return; + } + + // 查询所有已启用且支持元数据的 PS + ListTODO: 当前为骨架实现。待 MetadataStorageHandler.scanMetadataVmUuids() 接口 + * 就绪后,替换下方 TODO 块为实际 agent 调用。
+ * + * @param ps 目标 PrimaryStorageVO + * @return 检测到的孤儿数量 + */ + private int scanPsForOrphans(PrimaryStorageVO ps) { + String psUuid = ps.getUuid(); + String psType = ps.getType(); + + // =================================================================== + // TODO: 替换为 MetadataStorageHandler.scanMetadataVmUuids(psUuid) 调用 + // + // 预期调用模式: + // MetadataStorageHandler handler = findHandler(psType); + // handler.scanMetadataVmUuids(psUuid, new ReturnValueCompletion孤儿条件:
+ *C-02B-14: 仅报告(WARN 日志),不执行自动删除。
+ * + * @param psUuid 当前扫描的 PS UUID + * @param metadataVmUuids agent 扫描返回的 vmUuid 列表 + * @return 检测到的孤儿数量 + */ + int checkOrphanEntries(String psUuid, List当存储拓扑变更绕过了 {@code @MetadataImpact} 拦截器(例如底层存储迁移、 + * 手动数据库修改等),dirty mark 不会被触发,导致元数据与实际拓扑不一致。 + * 本巡检任务作为安全网,定期比对每个 VM 的当前路径快照与上次刷写时记录的 + * 路径指纹,发现漂移则调用 markDirty 触发重新刷写。
+ * + *C-02B-3: 禁止 listAll,使用 {@code vmInstanceUuid > lastUuid} 分页。 + * 因 PK 为 vmInstanceUuid(非自增 id),keyset 分页天然适用。
+ */ + private void detectPathDrift() { + if (!VmGlobalConfig.VM_METADATA.value(Boolean.class)) { + return; + } + + int batchSize = VmGlobalConfig.VM_METADATA_PATH_CHECK_BATCH_SIZE.value(Integer.class); + String lastUuid = ""; + int driftCount = 0; + int totalChecked = 0; + + while (true) { + List与 {@link MetadataPathSnapshotBuilder#buildPathJson} 使用完全相同的逻辑, + * 确保比对结果一致:
+ *同时被 {@link VmMetadataDirtyMarker#savePathFingerprint} 和 + * {@link MetadataPathDriftDetector#buildCurrentPathSnapshot} 使用, + * 确保写入时记录的指纹与巡检时构建的指纹使用完全相同的逻辑。
+ * + *
+ * {
+ * "volumes": [
+ * {"uuid": "vol-aaa", "installPath": "/dev/vg/vol-aaa"},
+ * {"uuid": "vol-bbb", "installPath": "/dev/vg/vol-bbb"}
+ * ],
+ * "snapshots": [
+ * {"uuid": "sp-001", "installPath": "/dev/vg/sp-001"},
+ * {"uuid": "sp-002", "installPath": "/dev/vg/sp-002"}
+ * ]
+ * }
+ *
+ *
+ * Gson 按字段声明顺序序列化:volumes → snapshots。
+ */ + private static class PathSnapshot { + final ListGson 按字段声明顺序序列化:uuid → installPath。
+ */ + private static class PathEntry { + final String uuid; + final String installPath; + + PathEntry(String uuid, String installPath) { + this.uuid = uuid; + this.installPath = installPath; + } + } +} diff --git a/compute/src/main/java/org/zstack/compute/vm/metadata/MetadataStaleRecoveryTask.java b/compute/src/main/java/org/zstack/compute/vm/metadata/MetadataStaleRecoveryTask.java new file mode 100644 index 00000000000..af4a89ca023 --- /dev/null +++ b/compute/src/main/java/org/zstack/compute/vm/metadata/MetadataStaleRecoveryTask.java @@ -0,0 +1,186 @@ +package org.zstack.compute.vm.metadata; + +import org.springframework.beans.factory.annotation.Autowired; +import org.zstack.compute.vm.VmGlobalConfig; +import org.zstack.core.db.SQL; +import org.zstack.core.thread.PeriodicTask; +import org.zstack.core.thread.ThreadFacade; +import org.zstack.header.Component; +import org.zstack.header.managementnode.ManagementNodeReadyExtensionPoint; +import org.zstack.header.vm.VmMetadataPathFingerprintVO; +import org.zstack.utils.Utils; +import org.zstack.utils.logging.CLogger; + +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Stale 恢复任务:为重试耗尽(lastFlushFailed=true)的 VM 重新入队 markDirty。 + * + *当 dirty 行因重试耗尽被删除后,低频 VM(长期无 {@code @MetadataImpact} API)将失去 + * 自愈机会。本任务作为独立低频扫描器,周期性地将这些 VM 重新标脏,给予全新重试机会。
+ * + *+ * lastFlushFailed=true + * → StaleRecoveryTask markDirty(retryCount=0) + * → Poller 5 次重试 + * → 若仍失败 → lastFlushFailed=true → 30min 后再来 + * → 若成功 → 正常完成 + *+ * + *
当 PS 长期不可达时,staleRecoveryCount 累加。达到上限(默认 10 ≈ 5 小时)后 + * 停止自动恢复,记 WARN 日志提示管理员手动触发。
+ * + *覆盖的 API:
+ *+ * vmNicUuid → VmNicVO.vmInstanceUuid + *+ * + *
由于不存在统一的 VmNicMessage 接口,本解析器通过反射检测消息上的 {@code getVmNicUuid()} 方法获取 nicUuid。 + * 对于 {@code APIDeleteVmNicMsg},其 uuid 字段即为 nicUuid,通过 {@code getUuid()} 获取。
+ * + *本解析器不处理 BondingMessage 类消息({@code APIAttachNicToBondingMsg}、{@code APIDetachNicFromBondingMsg}), + * 因为 bonding → VM 的解析链涉及 PCI 设备关联,复杂度过高且为边缘场景。 + * 这些消息由 {@link ReflectionBasedVmUuidFromApiResolver} 兜底处理。
+ * + *在 API 执行前(BeforeDeliveryMessageInterceptor)调用。此时 VmNicVO 仍在数据库中, + * 即使是 delete 场景也能查到关联的 vmInstanceUuid。
+ */ +public class NicBasedVmUuidFromApiResolver implements VmUuidFromApiResolver { + private static final CLogger logger = Utils.getLogger(NicBasedVmUuidFromApiResolver.class); + + @Override + public boolean supports(APIMessage msg) { + // 检测消息是否携带 vmNicUuid(通过反射,因无统一接口) + return getVmNicUuid(msg) != null; + } + + @Override + public List按优先级尝试:
+ *此解析器作为所有其他 Resolver 的兜底,处理未被显式 Resolver 覆盖但仍然携带 + * vmInstanceUuid 或 resourceUuid 的 API 消息。
+ * + *注册顺序必须排在所有显式 Resolver 之后(在 XML 中排最后)。
+ */ +public class ReflectionBasedVmUuidFromApiResolver implements VmUuidFromApiResolver { + private static final CLogger logger = Utils.getLogger(ReflectionBasedVmUuidFromApiResolver.class); + + @Override + public boolean supports(APIMessage msg) { + // 兜底:对所有消息返回 true,但 resolveVmUuids 可能返回空 + return true; + } + + @Override + public List通过 resourceType + resourceUuid 判断资源所属 VM:
+ *APIDeleteTagMsg 需要先查询 Tag 获取 resourceType/resourceUuid, + * 因此必须在 API 执行前(beforeDeliveryMessage)调用。
+ */ +public class ResourceBasedVmUuidFromApiResolver implements VmUuidFromApiResolver { + private static final CLogger logger = Utils.getLogger(ResourceBasedVmUuidFromApiResolver.class); + + @Autowired + private DatabaseFacade dbf; + + @Override + public boolean supports(APIMessage msg) { + return msg instanceof APIAbstractCreateTagMsg + || msg instanceof APIDeleteTagMsg + || msg instanceof APIUpdateResourceConfigMsg + || msg instanceof APIDeleteResourceConfigMsg; + } + + @Override + public List覆盖的 API:
+ *+ * snapshotUuid → VolumeSnapshotVO.volumeUuid → VolumeVO.vmInstanceUuid + *+ * + *
在 API 执行前(BeforeDeliveryMessageInterceptor)调用。此时快照和关联的 Volume 仍存在于数据库中, + * 因此无需 pre-capture 机制。对于 delete 场景,快照本身被删除但 Volume 仍在;对于 revert 场景, + * Volume 关联关系不变。
+ */ +public class SnapshotBasedVmUuidFromApiResolver implements VmUuidFromApiResolver { + + @Override + public boolean supports(APIMessage msg) { + return msg instanceof VolumeSnapshotMessage; + } + + @Override + public List覆盖的 API:
+ *+ * groupUuid → VolumeSnapshotGroupRefVO.volumeUuid(多个) → VolumeVO.vmInstanceUuid(去重) + *+ * + *
一个快照组可能关联多个 Volume(根盘 + 数据盘),这些 Volume 可能分属不同 VM(虽然通常同属一个 VM)。 + * 本解析器返回所有关联 VM 的 uuid 列表。
+ */ +public class SnapshotGroupBasedVmUuidFromApiResolver implements VmUuidFromApiResolver { + + @Override + public boolean supports(APIMessage msg) { + return msg instanceof VolumeSnapshotGroupMessage; + } + + @Override + public List从 VmInstanceBase 中提取出来,以获得 Spring AOP 代理的 {@code @Transactional} 支持。 + * VmInstanceBase 实例不是 Spring 单例 Bean,其内部方法调用不经过 AOP 代理, + * 因此 {@code @Transactional} 注解在 VmInstanceBase 自身方法上不生效。
+ * + *{@link #buildVmInstanceMetadata(String)} 执行 6+ 条 SELECT 查询, + * 必须在同一个 REPEATABLE READ 事务快照内完成,以保证读一致性。
+ * + * @see VmInstanceMetadataDTO + */ +public class VmMetadataBuilder { + private static final CLogger logger = Utils.getLogger(VmMetadataBuilder.class); + + /** Payload 大小预警阈值(8 MB) */ + public static final int WARN_THRESHOLD = 8 * 1024 * 1024; + + /** Payload 大小拒绝阈值(30 MB) */ + public static final int REJECT_THRESHOLD = 30 * 1024 * 1024; + + @Autowired + private DatabaseFacade dbf; + + /** + * 从 DB 全量构建指定 VM 的元数据 JSON 字符串。 + * + *使用 {@code @Transactional(readOnly = true)} 确保所有 SELECT 查询 + * 在同一个 InnoDB REPEATABLE READ 事务快照内执行,保证读一致性。
+ * + * @param vmInstanceUuid 目标虚拟机 UUID + * @return 元数据 JSON 字符串;若 VM 不符合构建条件则返回 null + */ + @Transactional(readOnly = true) + public String buildVmInstanceMetadata(String vmInstanceUuid) { + // ── 查询 VM 本体 ── + VmInstanceVO vm = Q.New(VmInstanceVO.class).eq(VmInstanceVO_.uuid, vmInstanceUuid).find(); + if (vm == null) { + logger.warn(String.format("VM[uuid:%s] not found, skip metadata build", vmInstanceUuid)); + return null; + } + + // ── UserVm 类型检查 ── + if (!VmInstanceConstant.USER_VM_TYPE.equals(vm.getType())) { + logger.debug(String.format("VM[uuid:%s] type is [%s], not UserVm, skip metadata build", + vmInstanceUuid, vm.getType())); + return null; + } + + // ── 云盘(挂载的 + 已卸载但 lastVmInstanceUuid 指向本 VM 的) ── + List按 volumeUuid 分组,再按 treeUuid 分组(双层 TreeMap 保证 ASC 排序), + * 同一 tree 内使用 {@link VolumeSnapshotTree#fromVOs(List)} + + * {@link VolumeSnapshotTree#levelOrderTraversal()} 进行 BFS 层序遍历。
+ * + * @param allSnapshots 待排序的全部快照 VO + * @param vmUuid VM UUID(仅用于日志) + * @return 拓扑排序后的快照 VO 列表 + */ + private List包含 VO JSON、SystemTag、ResourceConfig 以及 + * 该 Volume 关联的快照引用(VolumeSnapshotReferenceVO)和引用树(VolumeSnapshotReferenceTreeVO)。
+ * + * @param vol VolumeVO 对象 + * @return 填充完毕的 VolumeResourceMetadata + */ + private VolumeResourceMetadata buildVolumeResourceMetadata(VolumeVO vol) { + VolumeResourceMetadata meta = new VolumeResourceMetadata(); + meta.resourceUuid = vol.getUuid(); + meta.vo = JSONObjectUtil.toJsonString(vol); + + // SystemTag: 排序 by uuid → JSON 数组 → Base64 + ListVO 全量 JSON 明文存储;SystemTagVO 和 ResourceConfigVO 整体列表序列化为 JSON 数组后 + * 一次性 Base64 编码,以保护可能包含的密码、密钥等敏感信息。
+ * + * @param resourceUuid 资源 UUID + * @param vo 资源 VO 对象(VmInstanceVO / VmNicVO) + * @return 填充完毕的 ResourceMetadata + */ + private VmInstanceMetadataDTO.ResourceMetadata buildResourceMetadata(String resourceUuid, Object vo) { + VmInstanceMetadataDTO.ResourceMetadata meta = new VmInstanceMetadataDTO.ResourceMetadata(); + meta.resourceUuid = resourceUuid; + meta.vo = JSONObjectUtil.toJsonString(vo); + + // SystemTagVO: 排序 by uuid → JSON 数组 → Base64 + List
+ * Layer 1 — DB CAS 认领:UPDATE WHERE managementNodeUuid IS NULL → 同一行只被一个 MN 处理
+ * Layer 2 — AtomicInteger 全局限流:globalFlushInFlight(默认上限 10)
+ * Layer 3 — ChainTask 队列 "update-vm-{vmUuid}-metadata":syncLevel=1, maxPending=1
+ * Layer 4 — 主存储级队列 "update-metadata-on-ps-{psUuid}"(在 PS handler 内部实现)
+ *
+ *
+ * @see VmMetadataDirtyVO
+ * @see VmMetadataUpdateInterceptor
+ */
+public class VmMetadataDirtyMarker implements Component, ManagementNodeChangeListener, ManagementNodeReadyExtensionPoint {
+ private static final CLogger logger = Utils.getLogger(VmMetadataDirtyMarker.class);
+
+ // =====================================================================
+ // 常量
+ // =====================================================================
+
+ // 指数退避参数改为 GlobalConfig(C-RB-04),详见 onFlushFailure()。
+
+ // =====================================================================
+ // 注入
+ // =====================================================================
+
+ @Autowired
+ private CloudBus bus;
+
+ @Autowired
+ private DatabaseFacade dbf;
+
+ @Autowired
+ private ThreadFacade thdf;
+
+ // =====================================================================
+ // Poller 状态
+ // =====================================================================
+
+ private Future存储迁移期间 Poller 会将相关 dirty 行的 nextRetryTime 设为 2099-12-31 23:59:59 + * 以防止 flush 竞争。如果迁移流程崩溃(MN 宕机),这些行会卡在该时间点永远不被处理。
+ * + *本方法在 MN 重启后、Poller 启动前执行,将所有"远未来"暂停行重置为可处理状态。
+ * + * @see Part 01c §1.6 迁移暂停恢复 + */ + private void recoverStalledMigrationPauses() { + int recovered = SQL.New( + "UPDATE VmMetadataDirtyVO " + + "SET nextRetryTime = NULL, retryCount = 0 " + + "WHERE nextRetryTime = '2099-12-31 23:59:59'") + .execute(); + if (recovered > 0) { + logger.warn(String.format("[MetadataDirty] Recovered %d dirty rows with stalled migration pause (nextRetryTime far in future)", recovered)); + } + } + + // ===================================================================== + // ManagementNodeChangeListener:MN 拓扑变化处理 + // ===================================================================== + + /** Timestamp of the most recent nodeLeft event, used by §9.1 M3 recent-nodeLeft check. */ + private volatile long lastNodeLeftTimestamp = 0; + + @Override + public void nodeLeft(ManagementNodeInventory inv) { + // MN 宕机 → FK SET_NULL 已释放其认领的 dirty 行 + // C-02B-1 §7.2: 延迟 N 秒后再触发 claimAndFlush(),降低 zombie MN 并发写入概率 + long delaySec = VmGlobalConfig.VM_METADATA_NODE_LEFT_DELAY.value(Long.class); + logger.info(String.format("[MetadataDirty] node[%s] left, scheduling claim and flush after %ds delay", + inv.getUuid(), delaySec)); + + // M3 修复:记录 nodeLeft 时间戳,供 §9.1 升级刷新 recent-nodeLeft 检查使用 + lastNodeLeftTimestamp = System.currentTimeMillis(); + + thdf.submit(new org.zstack.core.thread.Task使用 INSERT IGNORE + UPDATE 两步(C-DM-01: Galera 集群兼容),保证:
+ *markDirty 后立即调用 {@link #triggerFlushForVm(String)}, + * 尝试认领并提交刷写,消除最长 N 秒的 Poller 等待延迟。
+ * + * @param vmInstanceUuid 目标虚拟机 UUID + * @param storageStructureChange 是否涉及存储结构变更 + * @return true 如果标脏成功(供 MetadataStaleRecoveryTask DP-03 使用) + */ + public boolean markDirty(String vmInstanceUuid, boolean storageStructureChange) { + // 前置检查:功能开关 + if (!VmGlobalConfig.VM_METADATA.value(Boolean.class)) { + return false; + } + + // 前置检查:仅处理 KVM 虚拟化 + UserVm 类型的 VM + // 非 KVM(如 Simulator)或非 UserVm(如 ApplianceVm)不产生元数据 + boolean isTargetVm = Q.New(VmInstanceVO.class) + .eq(VmInstanceVO_.uuid, vmInstanceUuid) + .eq(VmInstanceVO_.type, VmInstanceConstant.USER_VM_TYPE) + .eq(VmInstanceVO_.hypervisorType, VmInstanceConstant.KVM_HYPERVISOR_TYPE) + .isExists(); + if (!isTargetVm) { + logger.trace(String.format("[MetadataDirty] vm[uuid:%s] is not KVM UserVm, skipping markDirty", + vmInstanceUuid)); + return false; + } + + try { + // C-DM-01: Galera 集群兼容写法,避免 INSERT ON DUPLICATE KEY 在高并发下死锁 + // Step 1: INSERT IGNORE(新行) + int inserted = SQL.New("INSERT IGNORE INTO VmMetadataDirtyVO " + + "(vmInstanceUuid, dirtyVersion, storageStructureChange) " + + "VALUES (:vmUuid, 1, :ssc)") + .param("vmUuid", vmInstanceUuid) + .param("ssc", storageStructureChange) + .execute(); + + // Step 2: 仅在行已存在时执行 UPDATE(dirtyVersion +1, storageStructureChange OR 升级) + if (inserted == 0) { + int updated = SQL.New("UPDATE VmMetadataDirtyVO " + + "SET dirtyVersion = dirtyVersion + 1, " + + " storageStructureChange = storageStructureChange OR :ssc " + + "WHERE vmInstanceUuid = :vmUuid") + .param("vmUuid", vmInstanceUuid) + .param("ssc", storageStructureChange) + .execute(); + + // Q19 修复:INSERT IGNORE 返回 0(行已存在)但 UPDATE 也返回 0(行被并发删除) + // 竞态窗口:INSERT IGNORE → onFlushSuccess DELETE → UPDATE(行已不存在) + // 此时必须重新 INSERT,否则本次 markDirty 对应的 DB 变更将丢失 + if (updated == 0) { + SQL.New("INSERT IGNORE INTO VmMetadataDirtyVO " + + "(vmInstanceUuid, dirtyVersion, storageStructureChange) " + + "VALUES (:vmUuid, 1, :ssc)") + .param("vmUuid", vmInstanceUuid) + .param("ssc", storageStructureChange) + .execute(); + } + } + + logger.debug(String.format("[MetadataDirty] marked dirty for vm[uuid:%s], storageStructureChange=%s", + vmInstanceUuid, storageStructureChange)); + + // 立即唤醒:尝试认领并提交刷写,不等待 Poller 轮询 + triggerFlushForVm(vmInstanceUuid); + return true; + } catch (Exception e) { + logger.warn(String.format("[MetadataDirty] markDirty failed for vm[uuid:%s]: %s", + vmInstanceUuid, e.getMessage())); + return false; + } + } + + /** + * 标脏入口(便捷重载,默认 storageStructureChange=false,即 CONFIG 级别)。 + * + * @param vmInstanceUuid 目标虚拟机 UUID + * @return true 如果标脏成功 + */ + public boolean markDirty(String vmInstanceUuid) { + return markDirty(vmInstanceUuid, false); + } + + // ===================================================================== + // triggerFlushForVm — 立即唤醒(单 VM) + // ===================================================================== + + /** + * 立即尝试认领并刷写指定 VM 的 dirty 行。 + * 若行已被认领或处于退避期,跳过(Poller 安全网会处理)。 + * + *Q20 修复:findStaleClaimOwner 可能返回 null(无 stale claim)。 + * SQL 的 OR 分支使用 :staleId 参数,当 staleId=null 时 + * MySQL 会将 {@code managementNodeUuid = NULL} 解析为 FALSE(SQL 三值逻辑), + * 不会误匹配任何行。但为避免依赖此隐式行为,显式处理: + * staleId=null 时仅使用 IS NULL 分支,不包含 stale 接管条件。
+ */ + private void triggerFlushForVm(String vmUuid) { + String myId = Platform.getManagementServerId(); + long staleMinutes = VmGlobalConfig.VM_METADATA_TRIGGER_FLUSH_STALE.value(Long.class); + String staleId = findStaleClaimOwner(vmUuid, Duration.ofMinutes(staleMinutes)); + + String sql; + if (staleId != null) { + sql = "UPDATE VmMetadataDirtyVO " + + "SET managementNodeUuid = :myId, lastClaimTime = CURRENT_TIMESTAMP " + + "WHERE vmInstanceUuid = :vmUuid " + + "AND (managementNodeUuid IS NULL " + + " OR (managementNodeUuid = :staleId AND lastClaimTime < CURRENT_TIMESTAMP - INTERVAL " + staleMinutes + " MINUTE)) " + + "AND (nextRetryTime IS NULL OR nextRetryTime <= CURRENT_TIMESTAMP)"; + } else { + sql = "UPDATE VmMetadataDirtyVO " + + "SET managementNodeUuid = :myId, lastClaimTime = CURRENT_TIMESTAMP " + + "WHERE vmInstanceUuid = :vmUuid " + + "AND managementNodeUuid IS NULL " + + "AND (nextRetryTime IS NULL OR nextRetryTime <= CURRENT_TIMESTAMP)"; + } + + int claimed = SQL.New(sql) + .param("myId", myId) + .param("staleId", staleId) + .param("vmUuid", vmUuid) + .execute(); + + if (claimed == 0) { + logger.debug(String.format("[MetadataDirty] triggerFlushForVm skip claim, vmUuid=%s, " + + "reason=already-claimed-or-backoff", vmUuid)); + return; + } + + VmMetadataDirtyVO dirty = dbf.findByUuid(vmUuid, VmMetadataDirtyVO.class); + // DP-07 说明:dirty == null 是合法场景。CAS UPDATE 成功后、findByUuid 前, + // 若同 MN 上一个 running flush 的 onFlushSuccess() 恰好执行了条件 DELETE, + // 则该行已被删除。此时直接 return 即可——数据已经是最新的。 + if (dirty == null) { + return; + } + + submitFlushTask(dirty); + } + + // ===================================================================== + // Poller — 轮询安全网 + // ===================================================================== + + /** + * 内部 PeriodicTask 实现。 + * + *Poller 角色定位:markDirty 后的 triggerFlushForVm 已覆盖常规场景。 + * Poller 降级为安全网,负责处理: + *
单条 UPDATE 天然原子,无锁等待,无死锁风险。
+ *DP-05 修复:僵尸 claim 清理已提取为独立低频任务 {@link #cleanupZombieClaims()}。
+ * + * @return 认领到的 dirty 行列表 + */ + private ListΔ-1 重构:原嵌套 ChainTask(外层全局限流 + 内层 per-VM 串行) + * 改为 AtomicInteger 全局限流 + 单层 per-VM ChainTask。原因:
+ *流程:
+ *Δ-2 修复:使用 SQLBatch 替代 @Transactional,避免 self-invocation 陷阱。
+ * + *条件删除:仅当 dirtyVersion == snapshotVersion 时删除, + * 即"刷写期间没有新的 markDirty 到来"。
+ * + *如果 dirtyVersion > snapshotVersion,说明刷写期间有新变更, + * 释放认领让 triggerFlush / Poller 重新处理。
+ */ + private void onFlushSuccess(String vmUuid, long snapshotVersion) { + new SQLBatch() { + @Override + protected void scripts() { + // 条件删除:仅当 dirtyVersion == snapshotVersion 时删除 + int deleted = SQL.New(VmMetadataDirtyVO.class) + .eq(VmMetadataDirtyVO_.vmInstanceUuid, vmUuid) + .eq(VmMetadataDirtyVO_.dirtyVersion, snapshotVersion) + .delete(); + + if (deleted == 0) { + // dirtyVersion > snapshotVersion → 刷写期间有新变更 + // 释放认领,让 triggerFlush / Poller 重新处理 + // 同时重置 retryCount(本次成功说明通路正常) + SQL.New(VmMetadataDirtyVO.class) + .eq(VmMetadataDirtyVO_.vmInstanceUuid, vmUuid) + .set(VmMetadataDirtyVO_.managementNodeUuid, null) + .set(VmMetadataDirtyVO_.retryCount, 0) + .set(VmMetadataDirtyVO_.nextRetryTime, null) + .update(); + + logger.debug(String.format("[MetadataDirty] vm[uuid:%s] has new changes during flush " + + "(snapshotVersion=%d), released for re-processing", vmUuid, snapshotVersion)); + } else { + logger.debug(String.format("[MetadataDirty] vm[uuid:%s] flush completed and dirty row removed", + vmUuid)); + } + } + }.execute(); + + // Δ-9:记录路径指纹(用于 PathDriftDetector 巡检) + savePathFingerprint(vmUuid); + } + + // ===================================================================== + // onFlushFailure — 刷写失败处理(指数退避 / 放弃) + // ===================================================================== + + /** + * 刷写失败后的处理。 + * + *retryCount++ → 达到上限则标记 stale + 删除行(MetadataStaleRecoveryTask 接管); + * 未达上限则释放认领 + 指数退避。
+ * + *C-RB-04: 退避参数来自 GlobalConfig,禁止硬编码。
+ *C-SR-05: 重试耗尽时在 PathFingerprintVO 标记 lastFlushFailed=true。
+ */ + private void onFlushFailure(String vmUuid, ErrorCode error) { + VmMetadataDirtyVO dirty = dbf.findByUuid(vmUuid, VmMetadataDirtyVO.class); + if (dirty == null) { + return; // VM 已销毁,FK CASCADE 已清理 + } + + int newRetryCount = dirty.getRetryCount() + 1; + int maxRetry = VmGlobalConfig.VM_METADATA_MAX_RETRY.value(Integer.class); + int baseDelay = VmGlobalConfig.VM_METADATA_RETRY_BASE_DELAY.value(Integer.class); + int maxExponent = VmGlobalConfig.VM_METADATA_RETRY_MAX_EXPONENT.value(Integer.class); + + if (newRetryCount >= maxRetry) { + // 达到上限 → 告警 + 标记 stale(C-SR-05:不再直接删除后静默放弃) + logger.error(String.format("[MetadataDirty] metadata update for vm[uuid:%s] failed " + + "after %d retries, marking as stale. MetadataStaleRecoveryTask will retry " + + "independently. Error: %s", vmUuid, newRetryCount, error)); + + // C-SR-05: 在 PathFingerprintVO 上标记 lastFlushFailed=true + SQL.New("UPDATE VmMetadataPathFingerprintVO " + + "SET lastFlushFailed = 1 WHERE vmInstanceUuid = :vmUuid") + .param("vmUuid", vmUuid) + .execute(); + + // 删除 dirty 行(释放 Poller 资源),stale 恢复由独立任务接管 + SQL.New(VmMetadataDirtyVO.class) + .eq(VmMetadataDirtyVO_.vmInstanceUuid, vmUuid) + .delete(); + return; + } + + // 未达上限 → 释放认领 + 指数退避(C-RB-04: 参数来自 GlobalConfig) + long delaySec = baseDelay * (1L << Math.min(newRetryCount, maxExponent)); + Timestamp nextRetry = Timestamp.from(Instant.now().plusSeconds(delaySec)); + + SQL.New(VmMetadataDirtyVO.class) + .eq(VmMetadataDirtyVO_.vmInstanceUuid, vmUuid) + .set(VmMetadataDirtyVO_.managementNodeUuid, null) + .set(VmMetadataDirtyVO_.retryCount, newRetryCount) + .set(VmMetadataDirtyVO_.nextRetryTime, nextRetry) + .update(); + + logger.warn(String.format("[MetadataDirty] metadata update for vm[uuid:%s] failed " + + "(retry %d/%d), next retry at %s. Error: %s", + vmUuid, newRetryCount, maxRetry, nextRetry, error)); + } + + // ===================================================================== + // 辅助方法 + // ===================================================================== + + /** + * 释放 dirty 行的认领(managementNodeUuid 置 NULL)。 + */ + private void releaseClaim(String vmUuid) { + SQL.New(VmMetadataDirtyVO.class) + .eq(VmMetadataDirtyVO_.vmInstanceUuid, vmUuid) + .set(VmMetadataDirtyVO_.managementNodeUuid, null) + .update(); + } + + /** + * 查找指定 VM dirty 行的 stale claim owner。 + * + *若该 VM 的 dirty 行被某个 MN 认领,且 lastClaimTime 超过 staleThreshold, + * 则返回该 MN 的 UUID;否则返回 null。
+ * + * @param vmUuid 目标 VM UUID + * @param staleThreshold 认领超时阈值 + * @return stale claim owner 的 MN UUID,或 null + */ + private String findStaleClaimOwner(String vmUuid, Duration staleThreshold) { + Timestamp cutoff = Timestamp.from(Instant.now().minus(staleThreshold)); + return Q.New(VmMetadataDirtyVO.class) + .eq(VmMetadataDirtyVO_.vmInstanceUuid, vmUuid) + .notNull(VmMetadataDirtyVO_.managementNodeUuid) + .lt(VmMetadataDirtyVO_.lastClaimTime, cutoff) + .select(VmMetadataDirtyVO_.managementNodeUuid) + .findValue(); + } + + /** + * 记录 VM 的路径指纹(用于 MetadataPathDriftDetector 巡检)。 + * + *每次 flush 成功后调用,INSERT or UPDATE VmMetadataPathFingerprintVO。 + * pathSnapshot 为当前 VM 所有 Volume + Snapshot 的 installPath 列表的 JSON。
+ * + *pathSnapshot 构建使用 {@link MetadataPathSnapshotBuilder#buildPathJson}, + * 与 {@link MetadataPathDriftDetector} 巡检时使用完全相同的逻辑,确保一致性。
+ */ + private void savePathFingerprint(String vmUuid) { + // 构建当前路径快照 JSON + List从 claimDirtyRows() 提取为独立低频任务,避免每 5s Poller 周期执行不必要的 + * write-intent 扫描。覆盖的场景:
+ *C-CL-02: 阈值 15 分钟 > flush 最大超时(5min),安全余量充足。
+ */ + private void cleanupZombieClaims() { + long thresholdMinutes = VmGlobalConfig.VM_METADATA_ZOMBIE_CLAIM_THRESHOLD.value(Long.class); + int cleaned = SQL.New("UPDATE VmMetadataDirtyVO " + + "SET managementNodeUuid = NULL, lastClaimTime = NULL " + + "WHERE managementNodeUuid IS NOT NULL " + + "AND lastClaimTime < CURRENT_TIMESTAMP - INTERVAL " + thresholdMinutes + " MINUTE") + .execute(); + + if (cleaned > 0) { + logger.info(String.format("[MetadataDirty] cleanupZombieClaims released %d zombie claim(s) " + + "(threshold=%d minutes)", cleaned, thresholdMinutes)); + } + } + + // ===================================================================== + // 升级全量刷新(§9.2) + // ===================================================================== + + /** + * 升级后全量刷新:为所有 UserVm 标脏,Poller 自动处理。 + * + *§9.2: 使用 C-DM-01 兼容的 INSERT IGNORE + UPDATE 两步,keyset 分页。 + * storageStructureChange=1(C-SC-07:升级后无法判断存储拓扑是否变化)。
+ * + *lastRefreshVersion 在全量刷新完成后写入(讨论 Δ-8): + * 若刷新过程中 MN 崩溃,重启后 lastRefreshVersion 仍为旧值 → 重新触发 → 幂等安全。
+ */ + private void submitFullRefresh(String currentVersion) { + logger.info(String.format("[MetadataDirty] metadata full refresh: starting for version %s", currentVersion)); + + int batchSize = VmGlobalConfig.VM_METADATA_UPGRADE_REFRESH_BATCH_SIZE.value(Integer.class); + String lastUuid = ""; + int totalProcessed = 0; + + while (true) { + // Step 1: INSERT IGNORE — 为尚无 dirty 行的 VM 创建新行 + SQL.New( + "INSERT IGNORE INTO VmMetadataDirtyVO (vmInstanceUuid, dirtyVersion, storageStructureChange) " + + "SELECT v.uuid, 1, 1 FROM VmInstanceVO v " + + "WHERE v.type = 'UserVm' AND v.uuid > :lastUuid " + + "ORDER BY v.uuid ASC LIMIT :batchSize") + .param("lastUuid", lastUuid) + .param("batchSize", batchSize) + .execute(); + + // Step 2: UPDATE — 已有 dirty 行的 VM 递增 dirtyVersion + 升级 storageStructureChange + SQL.New( + "UPDATE VmMetadataDirtyVO d " + + "INNER JOIN VmInstanceVO v ON d.vmInstanceUuid = v.uuid " + + "SET d.dirtyVersion = d.dirtyVersion + 1, " + + " d.storageStructureChange = 1 " + + "WHERE v.type = 'UserVm' AND v.uuid > :lastUuid " + + "ORDER BY v.uuid ASC LIMIT :batchSize") + .param("lastUuid", lastUuid) + .param("batchSize", batchSize) + .execute(); + + // 更新 lastUuid 用于 keyset 分页 + List与 §9.2 升级全量刷新的区别:
+ *§9a.2 讨论 Δ-10:功能关闭期间存储拓扑可能发生变更, + * 重新启用时旧指纹与实际拓扑不一致,会导致路径巡检产生大量误报。 + * 清理采用 keyset 分页异步删除(每批 1000 行),不阻塞 GlobalConfig 变更回调。
+ */ + private void cleanupPathFingerprints() { + thdf.submit(new org.zstack.core.thread.Task§9.1: 比较 {@code dbf.getDbVersion()} 与 {@code VM_METADATA_LAST_REFRESH_VERSION}, + * 若不一致则在延迟后执行 {@link #submitFullRefresh(String)}。
+ * + *延迟原因:升级后多个 MN 同时启动,仅需一个 MN 执行全量刷新。 + * 通过 {@code VM_METADATA_UPGRADE_REFRESH_DELAY}(默认 600s)延迟 + 执行前 re-check + * 实现"最终只有一个 MN 执行"的效果(best-effort, 非 leader election)。
+ * + *M3 recent-nodeLeft 防护:延迟到期后若近 15 分钟内发生过 nodeLeft, + * 说明集群可能不稳定,递归 reschedule 以避免在 MN 重新平衡期间执行全量刷新。
+ */ + private void scheduleUpgradeRefreshIfNeeded() { + if (!VmGlobalConfig.VM_METADATA.value(Boolean.class)) { + return; + } + + String currentVersion = dbf.getDbVersion(); + String lastRefreshVersion = VmGlobalConfig.VM_METADATA_LAST_REFRESH_VERSION.value(String.class); + + if (currentVersion.equals(lastRefreshVersion)) { + logger.debug("[MetadataDirty] DB version matches lastRefreshVersion, no upgrade refresh needed"); + return; + } + + long delaySec = VmGlobalConfig.VM_METADATA_UPGRADE_REFRESH_DELAY.value(Long.class); + logger.info(String.format("[MetadataDirty] DB version %s != lastRefreshVersion %s, " + + "scheduling upgrade refresh after %ds delay", currentVersion, lastRefreshVersion, delaySec)); + + thdf.submitTimeoutTask(() -> { + // Re-check: version may have changed, or feature may be disabled + if (!VmGlobalConfig.VM_METADATA.value(Boolean.class)) { + return; + } + String recheckVersion = dbf.getDbVersion(); + if (!recheckVersion.equals(currentVersion)) { + logger.warn("[MetadataDirty] DB version changed during upgrade refresh delay, skip"); + return; + } + String recheckLastRefresh = VmGlobalConfig.VM_METADATA_LAST_REFRESH_VERSION.value(String.class); + if (recheckVersion.equals(recheckLastRefresh)) { + logger.info("[MetadataDirty] another MN already completed upgrade refresh, skip"); + return; + } + + // M3 recent-nodeLeft check: if nodeLeft within last 15 min, reschedule + long recentNodeLeftWindowMs = 15L * 60 * 1000; + if (System.currentTimeMillis() - lastNodeLeftTimestamp < recentNodeLeftWindowMs) { + logger.info("[MetadataDirty] recent nodeLeft detected, rescheduling upgrade refresh"); + scheduleUpgradeRefreshIfNeeded(); // re-enter with fresh delay + return; + } + + submitFullRefresh(recheckVersion); + }, TimeUnit.SECONDS, delaySec); + } + + /** + * 启动僵尸 claim 清理定时任务(60s 间隔)。 + */ + private synchronized void startZombieCleanupTask() { + if (zombieCleanupFuture != null) { + zombieCleanupFuture.cancel(false); + } + zombieCleanupFuture = thdf.submitPeriodicTask(new PeriodicTask() { + @Override + public TimeUnit getTimeUnit() { + return TimeUnit.SECONDS; + } + + @Override + public long getInterval() { + return 60; + } + + @Override + public String getName() { + return "vm-metadata-zombie-claim-cleanup"; + } + + @Override + public void run() { + cleanupZombieClaims(); + } + }); + logger.info("[MetadataDirty] zombie claim cleanup task started (interval=60s)"); + } + + /** + * 停止僵尸 claim 清理定时任务。 + */ + private synchronized void stopZombieCleanupTask() { + if (zombieCleanupFuture != null) { + zombieCleanupFuture.cancel(false); + zombieCleanupFuture = null; + logger.info("[MetadataDirty] zombie claim cleanup task stopped"); + } + } +} diff --git a/compute/src/main/java/org/zstack/compute/vm/metadata/VmMetadataUpdateInterceptor.java b/compute/src/main/java/org/zstack/compute/vm/metadata/VmMetadataUpdateInterceptor.java new file mode 100644 index 00000000000..efb4d67d044 --- /dev/null +++ b/compute/src/main/java/org/zstack/compute/vm/metadata/VmMetadataUpdateInterceptor.java @@ -0,0 +1,358 @@ +package org.zstack.compute.vm.metadata; + + +import org.springframework.beans.factory.annotation.Autowired; +import org.zstack.compute.vm.VmGlobalConfig; +import org.zstack.core.cloudbus.CloudBus; +import org.zstack.header.Component; +import org.zstack.header.message.*; +import org.zstack.header.vm.MetadataImpact; +import org.zstack.header.vm.VmUuidFromApiResolver; +import org.zstack.utils.BeanUtils; +import org.zstack.utils.Utils; +import org.zstack.utils.logging.CLogger; + +import org.zstack.core.thread.PeriodicTask; +import org.zstack.core.thread.ThreadFacade; +import org.zstack.header.managementnode.ManagementNodeReadyExtensionPoint; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * 拦截标注了 {@link MetadataImpact} 的 API 消息,在 API 成功后调用 markDirty 触发元数据更新。 + * + *+ * API Message 投递 Event 发布 + * │ │ + * ▼ ▼ + * BeforeDeliveryMessageInterceptor BeforePublishEventInterceptor + * │ │ + * │ 检测 @MetadataImpact │ 通过 apiId 匹配 + * │ 通过 VmUuidFromApiResolver │ 检查 API 是否成功 + * │ 解析 vmUuid 列表 │ 调用 markDirty() + * │ 缓存到 pendingApis │ 清理 pendingApis + * │ (key = apiId) │ + * ▼ ▼ + *+ * + *
通过注入的 {@link VmUuidFromApiResolver} 列表按顺序解析 vmUuid:
+ *使用 {@link VmMetadataDirtyMarker#markDirty(String, boolean)} 将 VM 标记为脏, + * INSERT ON DUPLICATE KEY UPDATE 天然去重,100 个 API 只产生 1 行 dirty 行。 + * markDirty 后立即尝试认领并刷写,Poller 作为安全网处理退避和异常场景。
+ */ +public class VmMetadataUpdateInterceptor implements Component, ManagementNodeReadyExtensionPoint { + private static final CLogger logger = Utils.getLogger(VmMetadataUpdateInterceptor.class); + + @Autowired + private CloudBus bus; + + @Autowired + private VmMetadataDirtyMarker dirtyMarker; + + @Autowired + private ThreadFacade thdf; + + /** + * VM UUID 解析器链,按注册顺序尝试(在 VmInstanceManager.xml 中注册)。 + */ + @Autowired(required = false) + private List用途:
+ *注册方式:各模块在 Component.start() 中调用 + * {@link #registerInternalMetadataMessage(Class)} 注册。
+ * + * @see VmMetadataDirtyMarker#markDirty(String, boolean) + */ + private static final Set按注册顺序遍历 resolvers,使用第一个 supports() 返回 true 的 Resolver 进行解析。 + * 如果所有 Resolver 都不支持或返回空列表,则返回空。
+ */ + private List根据 {@link MetadataImpact.Impact} 决定 OP type:
+ *pendingApis 在 BeforeDeliveryMessageInterceptor 中写入,在 BeforePublishEventInterceptor + * 中消费。若 APIEvent 因 MN 崩溃、消息丢失等原因永远不发布,pendingApis 中的条目会泄漏。 + * 本任务定期清理超过 {@code vm.metadata.pendingApi.timeoutMinutes}(默认 45 分钟)的陈旧条目, + * 并为其调用 markDirty(保守策略:超时意味着 API 结果未知,保守标脏确保最终一致)。
+ */ + private synchronized void startCleanupTask() { + if (cleanupFuture != null) { + cleanupFuture.cancel(false); + } + // 每 5 分钟检查一次 + cleanupFuture = thdf.submitPeriodicTask(new PeriodicTask() { + @Override + public TimeUnit getTimeUnit() { + return TimeUnit.MINUTES; + } + + @Override + public long getInterval() { + return 5; + } + + @Override + public String getName() { + return "vm-metadata-pending-api-cleanup"; + } + + @Override + public void run() { + cleanupStalePendingApis(); + } + }); + logger.info("[MetadataInterceptor] pendingApis cleanup task started (check every 5min, " + + "timeout={}min)", VmGlobalConfig.VM_METADATA_PENDING_API_TIMEOUT.value(Long.class)); + } + + private synchronized void stopCleanupTask() { + if (cleanupFuture != null) { + cleanupFuture.cancel(false); + cleanupFuture = null; + } + } + + /** + * 清理超时的 pendingApis 条目。 + * + *超时条目执行保守 markDirty:API 结果未知,保守标脏确保元数据最终一致。 + * 使用 STORAGE 级别(storageStructureChange=true)以覆盖最坏情况。
+ */ + private void cleanupStalePendingApis() { + if (pendingApis.isEmpty()) { + return; + } + + long timeoutMs = VmGlobalConfig.VM_METADATA_PENDING_API_TIMEOUT.value(Long.class) + * 60 * 1000; + long now = System.currentTimeMillis(); + int cleaned = 0; + + Iterator覆盖快照、云盘挂载/卸载等涉及 Volume 但不直接携带 vmInstanceUuid 的 API。
+ * + *如果消息同时实现了 {@link VmInstanceMessage},则由 {@link DefaultVmUuidFromApiResolver} 处理, + * 本解析器不参与。
+ * + *在 API 执行前调用。对于 attach 场景,VolumeVO.vmInstanceUuid 可能尚未设置, + * 此时通过反射 fallback 到 msg.getVmInstanceUuid()(如果存在)。
+ */ +public class VolumeBasedVmUuidFromApiResolver implements VmUuidFromApiResolver { + + @Override + public boolean supports(APIMessage msg) { + // 同时实现 VmInstanceMessage 的由 DefaultResolver 处理 + return msg instanceof VolumeMessage && !(msg instanceof VmInstanceMessage); + } + + @Override + public List由管理平面发送给主存储 handler,Agent 端仅读取 Header(不读 Slot), + * 返回每个 VM 的 readStatus 和 PendingOp 信息。
+ * + *路由:{@code makeLocalServiceId} → 主存储 handler → Agent HTTP 调用
+ * + * @see BatchCheckMetadataStatusReply + * @see MetadataStatusResult + */ +public class BatchCheckMetadataStatusMsg extends NeedReplyMessage { + + private String primaryStorageUuid; + + private List不标注时默认行为等同于 {@link Impact#CONFIG}。 + * 明确不影响元数据的 API 应标注 {@link Impact#NONE}。
+ * + *不涉及 VM 的 API(如 APICreateZoneMsg)即使默认 CONFIG, + * 也不会触发元数据更新——因为 {@link VmUuidFromApiResolver} 无法解析出 vmUuid, + * 不会产生 {@link UpdateVmInstanceMetadataMsg}。
+ * + * @see VmUuidFromApiResolver + * @see UpdateVmInstanceMetadataMsg + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface MetadataImpact { + + /** + * 影响类型。 + */ + Impact value(); + + /** + * API 失败时是否也需要更新元数据。 + * + *默认 false:仅在 API 成功后触发元数据更新。 + * 设为 true 时,API 执行失败也会触发 markDirty。 + * 适用于 API 可能部分成功、需要同步最新状态的场景。
+ */ + boolean updateOnFailure() default false; + + /** + * API 对虚拟机元数据的影响类型枚举。 + */ + enum Impact { + /** + * 不影响虚拟机元数据,明确跳过。 + * + *用于标注与 VM 无关或虽关联 VM 但不影响元数据内容的 API, + * 如 APIQueryVmInstanceMsg、APIGetVmConsoleAddressMsg 等。
+ */ + NONE, + + /** + * 影响虚拟机配置,触发元数据更新。 + * + *如修改 CPU/内存、增删 SystemTag/ResourceConfig 等。 + * 这是未标注 {@link MetadataImpact} 注解时的默认行为。
+ */ + CONFIG, + + /** + * 影响存储结构,触发元数据更新。 + * + *如存储迁移、快照操作、删除云盘等涉及存储结构变更的 API。 + * 在 sblk 场景下会设置 pending_op=2 以标记存储结构变更。
+ */ + STORAGE + } +} \ No newline at end of file diff --git a/header/src/main/java/org/zstack/header/vm/MetadataStatusResult.java b/header/src/main/java/org/zstack/header/vm/MetadataStatusResult.java new file mode 100644 index 00000000000..c26c684ac5b --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/MetadataStatusResult.java @@ -0,0 +1,65 @@ +package org.zstack.header.vm; + +import java.io.Serializable; + +/** + * 单个 VM 的元数据 Header 状态结果(用于健康巡检)。 + * + * @see BatchCheckMetadataStatusReply + */ +public class MetadataStatusResult implements Serializable { + + /** + * 读取状态:OK / NEED_REPAIR / RECOVERED / DEGRADED / + * STORAGE_CHANGE_INCOMPLETE / CORRUPTED + */ + private String readStatus; + + /** + * 可为 null。NEED_REPAIR/RECOVERED 时提示的修复动作 + * (如 "complete_phase3" / "rebuild_header" / "full_refresh")。 + */ + private String repairAction; + + /** + * 最后更新时间戳(epoch ms)。 + */ + private Long lastUpdateTime; + + /** + * 当前 PendingOp 值(0/1/2)。 + */ + private Integer pendingOp; + + public String getReadStatus() { + return readStatus; + } + + public void setReadStatus(String readStatus) { + this.readStatus = readStatus; + } + + public String getRepairAction() { + return repairAction; + } + + public void setRepairAction(String repairAction) { + this.repairAction = repairAction; + } + + public Long getLastUpdateTime() { + return lastUpdateTime; + } + + public void setLastUpdateTime(Long lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + public Integer getPendingOp() { + return pendingOp; + } + + public void setPendingOp(Integer pendingOp) { + this.pendingOp = pendingOp; + } +} diff --git a/header/src/main/java/org/zstack/header/vm/MetadataStorageHandler.java b/header/src/main/java/org/zstack/header/vm/MetadataStorageHandler.java new file mode 100644 index 00000000000..4f59e965716 --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/MetadataStorageHandler.java @@ -0,0 +1,177 @@ +package org.zstack.header.vm; + +import org.zstack.header.core.Completion; +import org.zstack.header.core.ReturnValueCompletion; + +import java.util.List; + +/** + * 元数据存储处理器接口 — 抽象不同存储类型的元数据读写操作。 + * + *VM 元数据需要持久化到 VM 根盘所在的 Primary Storage 上。不同存储类型 + * (SharedBlock、Local/NFS)使用不同的存储格式和协议,本接口统一抽象 + * 这些差异,使上层逻辑(Poller、迁移流程、注册 API)无需关心底层实现。
+ * + *所有方法通过 {@code psUuid} 参数动态路由 — 每次调用时根据 + * {@code PrimaryStorageVO.type} 查找对应 Handler 实现。支持同一迁移流程中 + * 源/目标使用不同 Handler。例如 VM 从 SharedBlock 迁移到 NFS 时, + * {@code initializeMetadata(targetPsUuid)} 路由到 {@code LocalNfsMetadataStorageHandler}, + * {@code deleteMetadata(sourcePsUuid)} 路由到 {@code SblkMetadataStorageHandler}。
+ * + *| 实现类 | 存储类型 |
|---|---|
| {@code SblkMetadataStorageHandler} | SharedBlock |
| {@code LocalNfsMetadataStorageHandler} | Local/NFS |
VM 创建或存储迁移 Step 4 时调用。对于 sblk,创建 LV 并写入 Header + Slot A; + * 对于 local/NFS,创建 {@code .zstack-vm-metadata/} 目录(若不存在)并 + * 通过 tmp+fsync+rename 原子写入 JSON 文件。
+ * + *若容器已存在则覆盖写入(幂等)。
+ * + * @param psUuid Primary Storage UUID — 用于路由到正确的存储后端 + * @param vmUuid VM UUID — 确定元数据文件/LV 名称 + * @param payloadJson 完整的元数据 JSON payload(由 {@code VmMetadataBuilder} 构建) + * @param completion 异步回调 + */ + void initializeMetadata(String psUuid, String vmUuid, String payloadJson, Completion completion); + + /** + * 删除 VM 元数据。 + * + *ExpungeVm 或存储迁移 Step 7 源端清理时调用。
+ * + *C-01C-9 幂等约束:删除不存在的元数据(LV 已删除或 JSON 文件不存在) + * 必须返回成功(不抛异常)。同时清理同名的 {@code .tmp} 和 {@code .sc.tmp} + * 残留文件(如存在),删除 tmp 失败不影响主操作成功。
+ * + * @param psUuid Primary Storage UUID + * @param vmUuid VM UUID + * @param completion 异步回调 + */ + void deleteMetadata(String psUuid, String vmUuid, Completion completion); + + /** + * 写入/更新 VM 元数据(原子操作)。 + * + *Poller flush 时调用。sblk 使用三阶段原子写入;local/NFS 使用 + * tmp+fsync+rename 原子写入。
+ * + *{@code storageStructureChange} 参数用于区分 tmp 文件后缀: + * {@code true} 使用 {@code .sc.tmp}(存储迁移写入), + * {@code false} 使用 {@code .tmp}(普通写入)。 + * 注册时若检测到 {@code .sc.tmp} 残留,说明存储迁移写入未完成, + * 该元数据文件标记为不可靠。
+ * + * @param psUuid Primary Storage UUID + * @param vmUuid VM UUID + * @param payloadJson 完整的元数据 JSON payload + * @param storageStructureChange 是否为存储结构变更写入(影响 tmp 文件后缀和 sblk OP type) + * @param completion 异步回调 + */ + void writeMetadata(String psUuid, String vmUuid, String payloadJson, + boolean storageStructureChange, Completion completion); + + /** + * 读取 VM 元数据。 + * + *存储迁移 Step 5 read-back 校验、Scan/Read API 时调用。
+ * + *返回值说明:
+ *当前支持:SharedBlock、LocalStorage、NFS。 + * 不支持的类型(ceph、zbs、vhost 等)返回 false,上层静默跳过。
+ * + * @param psType Primary Storage 类型字符串(如 "SharedBlock"、"LocalStorage"、"NFS") + * @return true 如果该存储类型支持元数据 + */ + boolean isMetadataSupported(String psType); + + /** + * 扫描指定 PS 上所有元数据条目,返回 {@link VmMetadataEntry} 列表(轻量级,不读取 payload)。 + * + *扫描方式因存储类型而异:
+ *用途:{@code MetadataOrphanDetector}(Part 2b §8.4.2)、Scan API(Part 5 §2)
+ * + *返回类型说明(讨论 Δ-7):原方案返回 {@code List
包含 vmUuid 和可选的 hostUuid 信息。对于共享存储(SharedBlock/NFS), + * hostUuid 为 null;对于 Local Storage,hostUuid 标识元数据文件所在 Host。
+ */ + class VmMetadataEntry { + private String vmUuid; + private String hostUuid; // nullable: SharedBlock/NFS 场景为 null + + public VmMetadataEntry() { + } + + public VmMetadataEntry(String vmUuid, String hostUuid) { + this.vmUuid = vmUuid; + this.hostUuid = hostUuid; + } + + public String getVmUuid() { + return vmUuid; + } + + public void setVmUuid(String vmUuid) { + this.vmUuid = vmUuid; + } + + public String getHostUuid() { + return hostUuid; + } + + public void setHostUuid(String hostUuid) { + this.hostUuid = hostUuid; + } + } +} diff --git a/header/src/main/java/org/zstack/header/vm/PreCheckItem.java b/header/src/main/java/org/zstack/header/vm/PreCheckItem.java new file mode 100644 index 00000000000..49dc2239b2d --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/PreCheckItem.java @@ -0,0 +1,33 @@ +package org.zstack.header.vm; + +import java.io.Serializable; + +public class PreCheckItem implements Serializable { + private String name; + private boolean passed; + private String message; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public boolean isPassed() { + return passed; + } + + public void setPassed(boolean passed) { + this.passed = passed; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/header/src/main/java/org/zstack/header/vm/RegisterVmFromMetadataInnerMsg.java b/header/src/main/java/org/zstack/header/vm/RegisterVmFromMetadataInnerMsg.java new file mode 100644 index 00000000000..9b7bfabe21a --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/RegisterVmFromMetadataInnerMsg.java @@ -0,0 +1,64 @@ +package org.zstack.header.vm; + +import org.zstack.header.message.NeedReplyMessage; + +/** + * Internal message for LongJob integration of VM registration from metadata. + * Mirrors fields from APIRegisterVmInstanceFromMetadataMsg. + */ +public class RegisterVmFromMetadataInnerMsg extends NeedReplyMessage { + private String metadataContent; + private String targetPrimaryStorageUuid; + private String zoneUuid; + private String clusterUuid; + private Boolean forceVersionMismatch; + private String accountUuid; + + public String getMetadataContent() { + return metadataContent; + } + + public void setMetadataContent(String metadataContent) { + this.metadataContent = metadataContent; + } + + public String getTargetPrimaryStorageUuid() { + return targetPrimaryStorageUuid; + } + + public void setTargetPrimaryStorageUuid(String targetPrimaryStorageUuid) { + this.targetPrimaryStorageUuid = targetPrimaryStorageUuid; + } + + public String getZoneUuid() { + return zoneUuid; + } + + public void setZoneUuid(String zoneUuid) { + this.zoneUuid = zoneUuid; + } + + public String getClusterUuid() { + return clusterUuid; + } + + public void setClusterUuid(String clusterUuid) { + this.clusterUuid = clusterUuid; + } + + public Boolean getForceVersionMismatch() { + return forceVersionMismatch; + } + + public void setForceVersionMismatch(Boolean forceVersionMismatch) { + this.forceVersionMismatch = forceVersionMismatch; + } + + public String getAccountUuid() { + return accountUuid; + } + + public void setAccountUuid(String accountUuid) { + this.accountUuid = accountUuid; + } +} diff --git a/header/src/main/java/org/zstack/header/vm/RegisterVmFromMetadataInnerReply.java b/header/src/main/java/org/zstack/header/vm/RegisterVmFromMetadataInnerReply.java new file mode 100644 index 00000000000..8353aa880e2 --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/RegisterVmFromMetadataInnerReply.java @@ -0,0 +1,26 @@ +package org.zstack.header.vm; + +import org.zstack.header.message.MessageReply; + +import java.util.List; + +public class RegisterVmFromMetadataInnerReply extends MessageReply { + private VmInstanceInventory inventory; + private List由管理平面发送给主存储 handler,用于完成未完成的 Phase 3、 + * 清除 PendingOp、重建 Header 或触发全量刷写。
+ * + *路由:{@code makeLocalServiceId} → 主存储 handler → Agent HTTP 调用
+ * + * @see BatchCheckMetadataStatusMsg + */ +public class RepairMetadataMsg extends NeedReplyMessage { + + private String vmUuid; + + private String primaryStorageUuid; + + /** + * 修复动作。 + * + *可选值:{@code complete_phase3} / {@code clear_pending_op} / + * {@code rebuild_header} / {@code full_refresh}
+ */ + private String repairAction; + + public String getVmUuid() { + return vmUuid; + } + + public void setVmUuid(String vmUuid) { + this.vmUuid = vmUuid; + } + + public String getPrimaryStorageUuid() { + return primaryStorageUuid; + } + + public void setPrimaryStorageUuid(String primaryStorageUuid) { + this.primaryStorageUuid = primaryStorageUuid; + } + + public String getRepairAction() { + return repairAction; + } + + public void setRepairAction(String repairAction) { + this.repairAction = repairAction; + } +} diff --git a/header/src/main/java/org/zstack/header/vm/RepairMetadataReply.java b/header/src/main/java/org/zstack/header/vm/RepairMetadataReply.java new file mode 100644 index 00000000000..16445a19fe6 --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/RepairMetadataReply.java @@ -0,0 +1,11 @@ +package org.zstack.header.vm; + +import org.zstack.header.message.MessageReply; + +/** + * {@link RepairMetadataMsg} 的回复。 + * + *成功/失败通过 {@link MessageReply} 基类的 ErrorCode 传递。
+ */ +public class RepairMetadataReply extends MessageReply { +} diff --git a/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataMsg.java b/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataMsg.java new file mode 100644 index 00000000000..e918ad6b075 --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataMsg.java @@ -0,0 +1,43 @@ +package org.zstack.header.vm; + +import org.zstack.header.message.NeedReplyMessage; + +/** + * 更新虚拟机元数据消息(MN 内部)。 + * + *调用链第 1 步:由 API 完成后的拦截器发出,路由到 VM 所在的 MN 节点。 + * 接收方从 DB 构建 {@link VmInstanceMetadataDTO},编码后发送 + * {@link UpdateVmInstanceMetadataOnPrimaryStorageMsg}。
+ * + * @see UpdateVmInstanceMetadataOnPrimaryStorageMsg + * @see UpdateVmInstanceMetadataOnHypervisorMsg + */ +public class UpdateVmInstanceMetadataMsg extends NeedReplyMessage implements VmInstanceMessage { + + private String vmInstanceUuid; + + /** + * 是否涉及存储结构变更。 + * + *对应 {@link MetadataImpact.Impact#STORAGE} 类型的操作。 + * sblk 场景下会设置 pending_op=2。
+ */ + private boolean storageStructureChange; + + @Override + public String getVmInstanceUuid() { + return vmInstanceUuid; + } + + public void setVmInstanceUuid(String vmInstanceUuid) { + this.vmInstanceUuid = vmInstanceUuid; + } + + public boolean isStorageStructureChange() { + return storageStructureChange; + } + + public void setStorageStructureChange(boolean storageStructureChange) { + this.storageStructureChange = storageStructureChange; + } +} diff --git a/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataOnHypervisorMsg.java b/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataOnHypervisorMsg.java new file mode 100644 index 00000000000..61e0ebde900 --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataOnHypervisorMsg.java @@ -0,0 +1,86 @@ +package org.zstack.header.vm; + +import org.zstack.header.host.HostMessage; +import org.zstack.header.message.NeedReplyMessage; + +/** + * 在 Hypervisor 上更新虚拟机元数据消息。 + * + *调用链第 3 步(可选):发送到 Host Agent 执行实际的存储写入。
+ * + *调用链第 2 步:发送到主存储服务,由主存储根据自身类型决定写入方式: + *
LocalStorage 通过根盘 installPath 推导元数据文件路径; + * NFS 通过根盘关联的 Host 确定转发目标。
+ */ + private String rootVolumeUuid; + + /** + * 元数据 JSON 字符串。 + * + *由 {@code VmInstanceBase.buildVmInstanceMetadata()} 从 DB 全量构建, + * 为 {@link VmInstanceMetadataDTO} 的 JSON 序列化结果。
+ */ + private String metadata; + + /** + * 是否涉及存储结构变更(sblk 场景设置 pending_op=2)。 + */ + private boolean storageStructureChange; + + @Override + public String getPrimaryStorageUuid() { + return primaryStorageUuid; + } + + public void setPrimaryStorageUuid(String primaryStorageUuid) { + this.primaryStorageUuid = primaryStorageUuid; + } + + public String getVmInstanceUuid() { + return vmInstanceUuid; + } + + public void setVmInstanceUuid(String vmInstanceUuid) { + this.vmInstanceUuid = vmInstanceUuid; + } + + public String getRootVolumeUuid() { + return rootVolumeUuid; + } + + public void setRootVolumeUuid(String rootVolumeUuid) { + this.rootVolumeUuid = rootVolumeUuid; + } + + public String getMetadata() { + return metadata; + } + + public void setMetadata(String metadata) { + this.metadata = metadata; + } + + public boolean isStorageStructureChange() { + return storageStructureChange; + } + + public void setStorageStructureChange(boolean storageStructureChange) { + this.storageStructureChange = storageStructureChange; + } +} diff --git a/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataOnPrimaryStorageReply.java b/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataOnPrimaryStorageReply.java new file mode 100644 index 00000000000..475855f2b67 --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataOnPrimaryStorageReply.java @@ -0,0 +1,9 @@ +package org.zstack.header.vm; + +import org.zstack.header.message.MessageReply; + +/** + * {@link UpdateVmInstanceMetadataOnPrimaryStorageMsg} 的回复。 + */ +public class UpdateVmInstanceMetadataOnPrimaryStorageReply extends MessageReply { +} diff --git a/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataReply.java b/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataReply.java new file mode 100644 index 00000000000..61a2d4dbd1b --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/UpdateVmInstanceMetadataReply.java @@ -0,0 +1,9 @@ +package org.zstack.header.vm; + +import org.zstack.header.message.MessageReply; + +/** + * {@link UpdateVmInstanceMetadataMsg} 的回复。 + */ +public class UpdateVmInstanceMetadataReply extends MessageReply { +} diff --git a/header/src/main/java/org/zstack/header/vm/VmInstanceConstant.java b/header/src/main/java/org/zstack/header/vm/VmInstanceConstant.java index 9d0efdd77f1..a2716386957 100755 --- a/header/src/main/java/org/zstack/header/vm/VmInstanceConstant.java +++ b/header/src/main/java/org/zstack/header/vm/VmInstanceConstant.java @@ -96,4 +96,6 @@ enum Capability { String VM_CDROM_OCCUPANT_ISO = "ISO"; String VM_CDROM_OCCUPANT_GUEST_TOOLS = "GuestTools"; + + String VM_META_SUFFIX = "_meta"; } diff --git a/header/src/main/java/org/zstack/header/vm/VmInstanceMetadataCodec.java b/header/src/main/java/org/zstack/header/vm/VmInstanceMetadataCodec.java new file mode 100644 index 00000000000..2153f9247cb --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/VmInstanceMetadataCodec.java @@ -0,0 +1,96 @@ +package org.zstack.header.vm; + +import org.zstack.utils.gson.JSONObjectUtil; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +/** + * 虚拟机元数据编解码器。 + * + *负责 {@link VmInstanceMetadataDTO} 与存储介质之间的编解码: + *
+ * 序列化流程:DTO → JSON String → Base64 String → byte[](写入存储) + * 反序列化流程:byte[](读取存储) → Base64 String → JSON String → DTO + *+ * + *
单层 Base64 编码策略:DTO 内部所有字段为明文 JSON, + * 仅在写入存储时做一次 Base64 编码。
+ */ +public class VmInstanceMetadataCodec { + + private VmInstanceMetadataCodec() { + } + + /** + * 将 DTO 编码为可写入存储的字节数组。 + * + * @param dto 元数据 DTO + * @return Base64 编码后的字节数组 + */ + public static byte[] encode(VmInstanceMetadataDTO dto) { + String json = JSONObjectUtil.toJsonString(dto); + return Base64.getEncoder().encode(json.getBytes(StandardCharsets.UTF_8)); + } + + /** + * 将 DTO 编码为 Base64 字符串。 + * + * @param dto 元数据 DTO + * @return Base64 编码后的字符串 + */ + public static String encodeToString(VmInstanceMetadataDTO dto) { + String json = JSONObjectUtil.toJsonString(dto); + return Base64.getEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8)); + } + + /** + * 从存储读取的字节数组解码为 DTO。 + * + * @param data Base64 编码的字节数组 + * @return 元数据 DTO + * @throws IllegalArgumentException 如果 Base64 解码失败或 JSON 格式错误 + */ + public static VmInstanceMetadataDTO decode(byte[] data) { + byte[] jsonBytes = Base64.getDecoder().decode(data); + String json = new String(jsonBytes, StandardCharsets.UTF_8); + return JSONObjectUtil.toObject(json, VmInstanceMetadataDTO.class); + } + + /** + * 从 Base64 字符串解码为 DTO。 + * + * @param base64 Base64 编码的字符串 + * @return 元数据 DTO + * @throws IllegalArgumentException 如果 Base64 解码失败或 JSON 格式错误 + */ + public static VmInstanceMetadataDTO decodeFromString(String base64) { + byte[] jsonBytes = Base64.getDecoder().decode(base64); + String json = new String(jsonBytes, StandardCharsets.UTF_8); + return JSONObjectUtil.toObject(json, VmInstanceMetadataDTO.class); + } + + /** + * 将 DTO 序列化为 JSON 字符串(不做 Base64 编码)。 + * + *用于调试、日志、一致性检查等场景。
+ * + * @param dto 元数据 DTO + * @return JSON 字符串 + */ + public static String toJson(VmInstanceMetadataDTO dto) { + return JSONObjectUtil.toJsonString(dto); + } + + /** + * 从 JSON 字符串反序列化为 DTO(不做 Base64 解码)。 + * + *用于调试、测试等场景。
+ * + * @param json JSON 字符串 + * @return 元数据 DTO + */ + public static VmInstanceMetadataDTO fromJson(String json) { + return JSONObjectUtil.toObject(json, VmInstanceMetadataDTO.class); + } +} \ No newline at end of file diff --git a/header/src/main/java/org/zstack/header/vm/VmInstanceMetadataConstants.java b/header/src/main/java/org/zstack/header/vm/VmInstanceMetadataConstants.java new file mode 100644 index 00000000000..ec9b5a231ee --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/VmInstanceMetadataConstants.java @@ -0,0 +1,82 @@ +package org.zstack.header.vm; + +/** + * 虚拟机元数据相关常量。 + */ +public class VmInstanceMetadataConstants { + + private VmInstanceMetadataConstants() { + } + + /** + * 元数据 LV 后缀(sblk 场景)。 + * + *LV 命名规则:{vm_uuid}_vmmeta
+ */ + public static final String SBLK_LV_SUFFIX = "_vmmeta"; + + /** + * 元数据文件名(local/NFS 场景)。 + * + *文件位于与根盘同目录下。
+ */ + public static final String METADATA_FILE_NAME = "vm_metadata.json"; + + /** + * sblk 元数据 LV 默认初始大小(字节):4MB。 + */ + public static final long SBLK_LV_INITIAL_SIZE = 4L * 1024 * 1024; + + /** + * sblk 元数据 LV 最大大小(字节):64MB。 + */ + public static final long SBLK_LV_MAX_SIZE = 64L * 1024 * 1024; + + /** + * sblk 写入序列号最大值。溢出后回绕到 1。 + */ + public static final long MAX_WRITE_SEQUENCE = 0xFFFFFFFFFFFFFFFFL; + + /** + * 全局配置:是否启用虚拟机元数据记录。 + * + *默认关闭。开启后,API 操作成功时自动触发元数据更新。
+ */ + public static final String GLOBAL_CONFIG_METADATA_ENABLED = "vm.metadata.enabled"; + + /** + * GC 初始延迟秒数。 + * + *API 成功后延迟该秒数再触发元数据更新, + * 避免短时间内多次 API 操作产生过多无用更新。
+ */ + public static final int INITIAL_GC_DELAY_SECONDS = 5; + + /** + * 注册虚拟机 MN 标识 System Tag 前缀。 + * + *注册过程中在 VM 上打标记,记录执行注册的 MN UUID, + * 用于 MN 崩溃后的事务回滚判断。
+ */ + public static final String REGISTERING_MN_TAG_PREFIX = "vmMetadata::registeringMnUuid::"; + + /** + * VM 状态:注册中。 + * + *注册开始时 VM 进入此中间状态,注册完成后转为 Stopped。
+ */ + public static final String VM_STATE_REGISTERING = "Registering"; + + /** + * ChainTask 最大排队任务数。 + * + *同一 VM 的元数据更新 ChainTask 最多排队 1 个, + * 超出的通过 exceedMaxPendingCallback 立即 Done。
+ */ + public static final int MAX_PENDING_METADATA_TASKS = 1; + + /** + * ChainTask syncSignature 前缀。 + */ + public static final String CHAIN_TASK_SIGNATURE_PREFIX = "vm-metadata-update-"; +} \ No newline at end of file diff --git a/header/src/main/java/org/zstack/header/vm/VmInstanceMetadataDTO.java b/header/src/main/java/org/zstack/header/vm/VmInstanceMetadataDTO.java new file mode 100644 index 00000000000..d0a3ee2db8f --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/VmInstanceMetadataDTO.java @@ -0,0 +1,141 @@ +package org.zstack.header.vm; + +import com.google.gson.annotations.SerializedName; + +import java.util.List; + +/** + * 虚拟机元数据 DTO。 + * + *存储在主存储上的元数据文件内容就是该 DTO 的 JSON 字符串经 Base64 编码后的结果。
+ * + *DTO 内部所有字段均为明文 JSON。由存储写入层对整个 DTO 的 JSON 字符串做一次统一 + * Base64 编码后写入存储介质(sblk Slot Payload / local NFS 文件内容)。
+ * + *Checksum 不作为 DTO 字段,由存储层保证: + *
对于每种资源(VM、Volume、Nic),记录其 VO 全量 JSON 及关联的 SystemTag/ResourceConfig。
+ */ + public static class ResourceMetadata { + /** + * 资源 UUID。 + * + *冗余字段,反序列化时必须校验与 {@link #vo} 内部的 uuid 字段一致。
+ */ + @SerializedName("resourceUuid") + public String resourceUuid; + + /** + * VO 全量 JSON 明文。 + * + *序列化时由 Gson 自动处理嵌套 JSON 的转义;反序列化时需要二次反序列化为具体 VO 类。
+ */ + @SerializedName("vo") + public String vo; + + /** + * SystemTag 列表的 Base64 编码。 + * + *构建过程:SystemTagVO 列表 → 逐个 JSON 序列化 → 组成 JSON Array 字符串 → Base64 编码。 + * Base64 编码是为了保护可能包含的密码、密钥等敏感信息。
+ */ + @SerializedName("systemTags") + public String systemTags; + + /** + * ResourceConfig 列表的 Base64 编码。 + * + *构建过程与 systemTags 一致。
+ */ + @SerializedName("resourceConfigs") + public String resourceConfigs; + } + + /** + * 元数据 schema 版本,与 ZStack 数据库版本(zsv)一致,如 "5.0.0"。 + * + *序列化时自动填充当前平台版本。注册时若版本不匹配则拒绝注册。 + * 升级后通过全量更新 GC 将所有 VM 的元数据刷新到新版本。
+ */ + @SerializedName("schemaVersion") + public String schemaVersion; + + /** + * 虚拟机分类。 + * + *标识本元数据所属 VM 的分类(普通 / 模板 / 模板缓存), + * 注册恢复时按不同分类执行不同的恢复逻辑。
+ */ + @SerializedName("vmCategory") + public VmMetadataCategory vmCategory; + + /** + * 虚拟机自身的元数据。 + * + *{@link ResourceMetadata#vo} 为 VmInstanceVO 的 JSON。
+ */ + @SerializedName("vm") + public ResourceMetadata vm; + + /** + * 云盘元数据列表。 + * + *包含根盘与数据盘(挂载的 + 已卸载但 lastVmInstanceUuid 指向本 VM 的)。 + * 不包含共享盘(isShareable=true 的 Volume 被排除)。 + * {@link VolumeResourceMetadata#vo} 为 VolumeVO 的 JSON, + * 每个 Volume 的快照引用数据内嵌在 {@link VolumeResourceMetadata} 中。
+ */ + @SerializedName("volumes") + public List仅记录,注册时不恢复。{@link ResourceMetadata#vo} 为 VmNicVO 的 JSON。
+ */ + @SerializedName("nics") + public List所有 Volume 下的 VolumeSnapshotVO JSON 明文的扁平列表, + * 按 BFS 拓扑序排列(父快照在子快照之前)。
+ */ + @SerializedName("snapshots") + public List每个元素是 VolumeSnapshotGroupVO 的 JSON 明文。
+ */ + @SerializedName("snapshotGroups") + public List每个元素是 VolumeSnapshotGroupRefVO 的 JSON 明文。 + * 通过 {@code volumeSnapshotGroupUuid} 字段与 {@link #snapshotGroups} 关联。
+ */ + @SerializedName("snapshotGroupRefs") + public List封装从元数据注册虚拟机时需要的新环境上下文信息。
+ * + *字段处理矩阵中标记为"API 参数"或"替换"的字段,其新值来源于此对象。
+ */ +public class VmInstanceMetadataRegistrationSpec { + + /** + * 注册目标 Zone UUID(必填)。 + * + *替换 VmInstanceVO.zoneUuid。
+ */ + private String zoneUuid; + + /** + * 注册目标主存储 UUID(必填)。 + * + *替换 VolumeVO.primaryStorageUuid、VolumeSnapshotVO.primaryStorageUuid。
+ */ + private String primaryStorageUuid; + + /** + * 注册操作的账户 UUID。 + * + *替换所有 VO 的 accountUuid 字段。通常为 admin。
+ */ + private String accountUuid; + + /** + * 旧存储路径标识符。 + * + *在反序列化后、注册前执行校验,确保元数据完整性和一致性。
+ * + *校验项: + *
snapshotGroupRefs 中引用的 volumeSnapshotGroupUuid + * 必须存在于 snapshotGroups 中。
+ * + * @param dto 待校验的元数据 DTO + * @throws CloudRuntimeException 引用了不存在的 group 时抛出 + */ + @SuppressWarnings("unchecked") + public static void validateSnapshotGroupIntegrity(VmInstanceMetadataDTO dto) { + if (dto.snapshotGroupRefs == null || dto.snapshotGroupRefs.isEmpty()) { + return; + } + if (dto.snapshotGroups == null || dto.snapshotGroups.isEmpty()) { + throw new CloudRuntimeException( + "metadata has snapshotGroupRefs but no snapshotGroups"); + } + + Set通过 {@code EventFacade.fire()} 发布,供监控系统和巡检机制消费。
+ */ +public class VmMetadataCanonicalEvents { + + /** + * GC 放弃后的 stale 事件路径。 + * + *当 {@code UpdateVmInstanceMetadataGC} 超过最大重试次数后发布此事件, + * {@code MetadataHealthCheckJob} 监听此事件将 VM 加入优先刷新队列。
+ */ + public static final String VM_METADATA_STALE_PATH = "/vm/metadata/stale"; + + @NeedJsonSchema + public static class MetadataStaleData { + public String vmInstanceUuid; + + public MetadataStaleData() { + } + + public MetadataStaleData(String vmInstanceUuid) { + this.vmInstanceUuid = vmInstanceUuid; + } + } +} diff --git a/header/src/main/java/org/zstack/header/vm/VmMetadataCategory.java b/header/src/main/java/org/zstack/header/vm/VmMetadataCategory.java new file mode 100644 index 00000000000..8945760d3e5 --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/VmMetadataCategory.java @@ -0,0 +1,18 @@ +package org.zstack.header.vm; + +/** + * 虚拟机元数据分类。 + * + *用于区分元数据所属的 VM 类型,注册恢复时按不同分类执行不同的恢复逻辑。
+ * + *SharedBlock(sblk)使用固定大小的 LV 存储 VM 元数据,采用双 Slot 布局: + *
+ * [ LV Header (4096B) ][ Slot-A ][ Slot-B ] + * Slot 大小 = (lvSize - headerSize) / 2,向下对齐到 4096 + * Slot Header = 36B(Magic 4B + SeqNum 8B + SlotOffset 8B + SlotCapacity 8B + PayloadLen 8B) + * 可用 Payload = SlotCapacity - SlotHeaderSize + *+ * + * @see Part 02b §10.0 容量公式与常量 + */ +public final class VmMetadataConstants { + + private VmMetadataConstants() { + // utility class + } + + /** LV 头部大小(字节) */ + public static final long SBLK_HEADER_SIZE = 4096L; + + /** Slot 头部大小(字节):Magic(4) + SeqNum(8) + SlotOffset(8) + SlotCapacity(8) + PayloadLen(8) */ + public static final long SBLK_SLOT_HEADER_SIZE = 36L; + + /** SharedBlock 元数据 LV 最大大小(64MB) */ + public static final long SBLK_MAX_LV_SIZE = 64L * 1024 * 1024; + + /** + * 计算给定 LV 大小下单个 Slot 的容量(字节)。 + * + *
公式:((lvSize - headerSize) / 2 / 4096) * 4096(向下对齐到 4096)
+ * + * @param lvSize LV 总大小(字节) + * @return 单个 Slot 的容量(字节) + */ + public static long slotCapacity(long lvSize) { + return ((lvSize - SBLK_HEADER_SIZE) / 2 / 4096) * 4096; + } + + /** 64MB LV 下单个 Slot 的最大容量(约 33,550,336 字节) */ + public static final long SBLK_MAX_SLOT_CAPACITY = slotCapacity(SBLK_MAX_LV_SIZE); + + /** 64MB LV 下单个 Slot 的最大可用 Payload(约 33,550,300 字节) */ + public static final long SBLK_MAX_PAYLOAD_SIZE = SBLK_MAX_SLOT_CAPACITY - SBLK_SLOT_HEADER_SIZE; + + /** Payload 大小预警阈值(8MB):超过时输出 WARN 日志 */ + public static final long PAYLOAD_WARN_THRESHOLD = 8L * 1024 * 1024; + + /** Payload 大小拒绝阈值(30MB):超过时 ERROR + 拒绝写入 */ + public static final long PAYLOAD_REJECT_THRESHOLD = 30L * 1024 * 1024; +} diff --git a/header/src/main/java/org/zstack/header/vm/VmMetadataDirtyVO.java b/header/src/main/java/org/zstack/header/vm/VmMetadataDirtyVO.java new file mode 100644 index 00000000000..60bf5231263 --- /dev/null +++ b/header/src/main/java/org/zstack/header/vm/VmMetadataDirtyVO.java @@ -0,0 +1,143 @@ +package org.zstack.header.vm; + +import org.zstack.header.managementnode.ManagementNodeVO; +import org.zstack.header.vo.ForeignKey; +import org.zstack.header.vo.ForeignKey.ReferenceOption; + +import javax.persistence.*; +import java.sql.Timestamp; + +/** + * 记录 VM 元数据的"脏标记",表示该 VM 的元数据需要写入主存储。 + * + *用于非 VM 直接 API(如 Volume/Nic/快照 API)中提取关联的 VM UUID, + * 以便在 API 成功后触发对应 VM 的元数据更新。
+ * + *Resolver 应在 API 执行前 预解析 vmUuid 并缓存在上下文中, + * 因为 API 执行后相关资源可能已被删除(如 APIDeleteVolumeMsg 执行后 VolumeVO 不存在)。
+ * + * @see MetadataImpact + * @see UpdateVmInstanceMetadataMsg + */ +public interface VmUuidFromApiResolver { + + /** + * 判断此 Resolver 是否能处理指定的 API 消息类型。 + * + * @param msg API 消息 + * @return true 表示此 Resolver 可以从该消息中解析 vmUuid + */ + boolean supports(APIMessage msg); + + /** + * 从 API 消息中解析出关联的 vmInstanceUuid 列表。 + * + *可能返回空列表(如 volume 未挂载到任何 VM)。 + * 可能返回多个 UUID(如批量操作涉及多台 VM)。
+ * + *此方法应在 API 执行前调用。
+ * + * @param msg API 消息 + * @return 关联的 vmInstanceUuid 列表,不为 null + */ + List每个 Volume 的快照引用数据直接关联到对应的 VolumeResourceMetadata 中, + * 而非放在 DTO 顶层的 Map 结构里,便于按卷维度整体操作。
+ */ +public class VolumeResourceMetadata extends VmInstanceMetadataDTO.ResourceMetadata { + /** + * 该 Volume 关联的快照引用列表。 + * + *每个元素是 VolumeSnapshotReferenceVO 的 JSON 明文。 + * 通过 {@code referenceVolumeUuid} 查询关联到本 Volume。
+ */ + @SerializedName("snapshotReferences") + public List每个元素是 VolumeSnapshotReferenceTreeVO 的 JSON 明文。
+ */ + @SerializedName("snapshotReferenceTrees") + public List