diff --git a/api/mesh/v1alpha1/runtime_instance_helper.go b/api/mesh/v1alpha1/runtime_instance_helper.go index c69c002a6..3bffb4cf6 100644 --- a/api/mesh/v1alpha1/runtime_instance_helper.go +++ b/api/mesh/v1alpha1/runtime_instance_helper.go @@ -23,4 +23,8 @@ const ( StartupProbe = "startup" ) -const InstanceTerminating = "Terminating" +const ( + InstanceStarting = "Starting" + InstanceCrashing = "Crashing" + InstanceTerminating = "Terminating" +) diff --git a/pkg/console/model/application.go b/pkg/console/model/application.go index d11943f62..2200e5369 100644 --- a/pkg/console/model/application.go +++ b/pkg/console/model/application.go @@ -133,6 +133,7 @@ func NewApplicationTabInstanceInfoReq() *ApplicationTabInstanceInfoReq { type AppInstanceInfoResp struct { AppName string `json:"appName"` CreateTime string `json:"createTime"` + LifecycleState string `json:"lifecycleState"` DeployState string `json:"deployState"` DeployClusters string `json:"deployClusters"` IP string `json:"ip"` diff --git a/pkg/console/model/instance.go b/pkg/console/model/instance.go index e933d7790..b61c56ad0 100644 --- a/pkg/console/model/instance.go +++ b/pkg/console/model/instance.go @@ -20,6 +20,7 @@ package model import ( "github.com/duke-git/lancet/v2/strutil" + meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1" "github.com/apache/dubbo-admin/pkg/config/app" meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" @@ -58,6 +59,7 @@ type SearchInstanceResp struct { Name string `json:"name"` WorkloadName string `json:"workloadName"` AppName string `json:"appName"` + LifecycleState string `json:"lifecycleState"` DeployState string `json:"deployState"` DeployCluster string `json:"deployCluster"` RegisterState string `json:"registerState"` @@ -85,13 +87,10 @@ func (r *SearchInstanceResp) FromInstanceResource(instanceResource *meshresource if cfg.Engine != nil && cfg.Engine.ID == instance.SourceEngine { r.DeployCluster = cfg.Engine.Name } - if r.RegisterTime != "" { - r.RegisterState = "Registered" - } else { - r.RegisterState = "UnRegistered" - } + r.RegisterState = DeriveInstanceRegisterState(instance) r.Labels = instance.Tags - r.DeployState = instance.DeployState + r.DeployState = DeriveInstanceDeployState(instance) + r.LifecycleState = DeriveInstanceLifecycleState(instance, r.DeployState, r.RegisterState) r.WorkloadName = instance.WorkloadName r.AppName = instance.AppName return r @@ -115,6 +114,7 @@ type InstanceDetailResp struct { RegisterTime string `json:"registerTime"` RegisterClusters []string `json:"registerClusters"` DeployCluster string `json:"deployCluster"` + LifecycleState string `json:"lifecycleState"` DeployState string `json:"deployState"` RegisterState string `json:"registerState"` Node string `json:"node"` @@ -158,16 +158,9 @@ func FromInstanceResource(res *meshresource.InstanceResource, cfg app.AdminConfi if cfg.Engine.ID == res.Spec.SourceEngine { r.DeployCluster = cfg.Engine.Name } - if strutil.IsNotBlank(instance.DeployState) { - r.DeployState = instance.DeployState - } else { - r.DeployState = "Unknown" - } - if strutil.IsBlank(r.RegisterTime) { - r.RegisterState = "UnRegistered" - } else { - r.RegisterState = "Registered" - } + r.DeployState = DeriveInstanceDeployState(instance) + r.RegisterState = DeriveInstanceRegisterState(instance) + r.LifecycleState = DeriveInstanceLifecycleState(instance, r.DeployState, r.RegisterState) r.Node = instance.Node r.Image = instance.Image r.Probes = ProbeStruct{} @@ -196,3 +189,64 @@ func FromInstanceResource(res *meshresource.InstanceResource, cfg app.AdminConfi } return r } + +func DeriveInstanceDeployState(instance *meshproto.Instance) string { + if instance == nil || strutil.IsBlank(instance.DeployState) { + return "Unknown" + } + switch instance.DeployState { + case "Running": + if !isPodReady(instance) { + return "Starting" + } + return "Running" + default: + return instance.DeployState + } +} + +func DeriveInstanceRegisterState(instance *meshproto.Instance) string { + if instance == nil || strutil.IsBlank(instance.RegisterTime) { + return "UnRegistered" + } + return "Registered" +} + +func DeriveInstanceLifecycleState(instance *meshproto.Instance, deployState string, registerState string) string { + switch deployState { + case "Crashing", "Failed", "Unknown", "Succeeded": + return "Error" + case "Terminating": + return "Terminating" + } + + if registerState == "Registered" { + if deployState == "Running" { + return "Serving" + } + return "Error" + } + + if deployState == "Running" && strutil.IsNotBlank(instance.UnregisterTime) { + return "Draining" + } + + switch deployState { + case "Pending", "Starting", "Running": + return "Starting" + default: + return "Unknown" + } +} + +func isPodReady(instance *meshproto.Instance) bool { + for _, condition := range instance.Conditions { + if condition == nil { + continue + } + if condition.Type == "Ready" { + return condition.Status == "True" + } + } + return false +} diff --git a/pkg/console/service/application.go b/pkg/console/service/application.go index 80e7ee685..7fd0f22db 100644 --- a/pkg/console/service/application.go +++ b/pkg/console/service/application.go @@ -95,7 +95,8 @@ func buildAppInstanceInfoResp(instanceRes *meshresource.InstanceResource, cfg ap resp.Name = instance.Name resp.AppName = instance.AppName resp.CreateTime = instance.CreateTime - resp.DeployState = instance.DeployState + resp.DeployState = model.DeriveInstanceDeployState(instance) + resp.LifecycleState = model.DeriveInstanceLifecycleState(instance, resp.DeployState, model.DeriveInstanceRegisterState(instance)) if cfg.Engine.ID == instance.SourceEngine { resp.DeployClusters = cfg.Engine.Name } @@ -104,7 +105,7 @@ func buildAppInstanceInfoResp(instanceRes *meshresource.InstanceResource, cfg ap if d := cfg.FindDiscovery(instanceRes.Mesh); d != nil { resp.RegisterCluster = d.Name } - resp.RegisterState = "Registered" + resp.RegisterState = model.DeriveInstanceRegisterState(instance) resp.RegisterTime = instance.RegisterTime resp.WorkloadName = instance.WorkloadName return resp diff --git a/pkg/core/controller/informer.go b/pkg/core/controller/informer.go index 92cdb6938..cf0003e3b 100644 --- a/pkg/core/controller/informer.go +++ b/pkg/core/controller/informer.go @@ -229,17 +229,10 @@ func (s *informer) HandleDeltas(obj interface{}, _ bool) error { } // from oldest to newest for _, d := range deltas { - var resource model.Resource - var object interface{} - if o, ok := d.Object.(cache.DeletedFinalStateUnknown); ok { - object = o.Obj - } else { - object = d.Object - } - resource, ok := object.(model.Resource) - if !ok { - logger.Errorf("object from ListWatcher is not conformed to Resource, obj: %v", obj) - return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + resource, err := s.toResource(d.Object) + if err != nil { + logger.Errorf("object from ListWatcher is not conformed to Resource, obj: %v, err: %v", obj, err) + return err } switch d.Type { case cache.Sync, cache.Replaced, cache.Added, cache.Updated: @@ -257,6 +250,8 @@ func (s *informer) HandleDeltas(obj interface{}, _ bool) error { s.EmitEvent(cache.Added, nil, resource) } case cache.Deleted: + logger.Infof("informer processing delete delta, resource kind: %s, key: %s", + resource.ResourceKind().ToString(), resource.ResourceKey()) if err := s.indexer.Delete(resource); err != nil { logger.Errorf("failed to delete resource from informer, cause %v, resource: %s,", err, resource.String()) return err @@ -267,6 +262,31 @@ func (s *informer) HandleDeltas(obj interface{}, _ bool) error { return nil } +func (s *informer) toResource(obj interface{}) (model.Resource, error) { + object := obj + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + logger.Debugf("informer resolved tombstone object during delete handling, key: %s", tombstone.Key) + object = tombstone.Obj + } + if resource, ok := object.(model.Resource); ok { + return resource, nil + } + if s.transform != nil { + transformed, err := s.transform(object) + if err != nil { + return nil, err + } + if resource, ok := transformed.(model.Resource); ok { + return resource, nil + } + object = transformed + } + if object == nil { + return nil, bizerror.NewAssertionError("Resource", "nil") + } + return nil, bizerror.NewAssertionError("Resource", reflect.TypeOf(object).Name()) +} + // EmitEvent emits an event to the event bus. func (s *informer) EmitEvent(typ cache.DeltaType, oldObj model.Resource, newObj model.Resource) { event := events.NewResourceChangedEvent(typ, oldObj, newObj) diff --git a/pkg/core/controller/listwatcher.go b/pkg/core/controller/listwatcher.go index e90580f0d..4978f0116 100644 --- a/pkg/core/controller/listwatcher.go +++ b/pkg/core/controller/listwatcher.go @@ -31,3 +31,9 @@ type ResourceListerWatcher interface { // return nil if there is no need to transform, see cache.SharedInformer for detail TransformFunc() cache.TransformFunc } + +// ResourceKeyProvider can be optionally implemented by a ResourceListerWatcher when +// raw watch objects need a key that is consistent with the transformed resource key. +type ResourceKeyProvider interface { + KeyFunc() cache.KeyFunc +} diff --git a/pkg/core/discovery/subscriber/rpc_instance.go b/pkg/core/discovery/subscriber/rpc_instance.go index e970239dd..6ea6c47ab 100644 --- a/pkg/core/discovery/subscriber/rpc_instance.go +++ b/pkg/core/discovery/subscriber/rpc_instance.go @@ -20,6 +20,7 @@ package subscriber import ( "reflect" + "github.com/duke-git/lancet/v2/slice" "github.com/duke-git/lancet/v2/strutil" "k8s.io/client-go/tools/cache" @@ -110,7 +111,12 @@ func (s *RPCInstanceEventSubscriber) processUpsert(rpcInstanceRes *meshresource. // We should merge the rpc info into it if instanceRes != nil { meshresource.MergeRPCInstanceIntoInstance(rpcInstanceRes, instanceRes) - return s.instanceStore.Update(instanceRes) + if err := s.instanceStore.Update(instanceRes); err != nil { + return err + } + logger.Infof("instance lifecycle merged rpc source, instance: %s, rpc: %s, registerState: registered, deployState: %s", + instanceRes.ResourceKey(), rpcInstanceRes.ResourceKey(), instanceRes.Spec.DeployState) + return nil } // Otherwise we can create a new instance resource by rpc instance instanceRes = meshresource.FromRPCInstance(rpcInstanceRes) @@ -119,6 +125,8 @@ func (s *RPCInstanceEventSubscriber) processUpsert(rpcInstanceRes *meshresource. logger.Errorf("add instance resource failed, instance: %s, err: %s", instanceRes.ResourceKey(), err.Error()) return err } + logger.Infof("instance lifecycle created rpc-only instance, instance: %s, rpc: %s, registerState: registered", + instanceRes.ResourceKey(), rpcInstanceRes.ResourceKey()) instanceAddEvent := events.NewResourceChangedEvent(cache.Added, nil, instanceRes) s.eventEmitter.Send(instanceAddEvent) @@ -135,10 +143,26 @@ func (s *RPCInstanceEventSubscriber) processDelete(rpcInstanceRes *meshresource. logger.Warnf("cannot find instance resource for rpc instance %s, skipped deleting instance", rpcInstanceRes.Name) return nil } + meshresource.ClearRPCInstanceFromInstance(instanceRes) + if meshresource.HasRuntimeInstanceSource(instanceRes) { + if err := s.instanceStore.Update(instanceRes); err != nil { + logger.Errorf("update instance resource failed after rpc delete, instance: %s, err: %s", + instanceRes.ResourceKey(), err.Error()) + return err + } + logger.Infof("instance lifecycle rpc source removed, keep instance by runtime source, instance: %s, rpc: %s, deployState: %s", + instanceRes.ResourceKey(), rpcInstanceRes.ResourceKey(), instanceRes.Spec.DeployState) + instanceUpdateEvent := events.NewResourceChangedEvent(cache.Updated, instanceRes, instanceRes) + s.eventEmitter.Send(instanceUpdateEvent) + logger.Debugf("rpc instance delete trigger instance update event, event: %s", instanceUpdateEvent.String()) + return nil + } if err := s.instanceStore.Delete(instanceRes); err != nil { logger.Errorf("delete instance resource failed, instance: %s, err: %s", instanceRes.ResourceKey(), err.Error()) return err } + logger.Infof("instance lifecycle rpc source removed and no runtime source remains, deleted instance: %s, rpc: %s", + instanceRes.ResourceKey(), rpcInstanceRes.ResourceKey()) instanceDeleteEvent := events.NewResourceChangedEvent(cache.Deleted, instanceRes, nil) s.eventEmitter.Send(instanceDeleteEvent) logger.Debugf("rpc instance delete trigger instance delete event, event: %s", instanceDeleteEvent.String()) @@ -173,7 +197,7 @@ func (s *RPCInstanceEventSubscriber) getRelatedInstanceRes( func (s *RPCInstanceEventSubscriber) findRelatedRuntimeInstanceAndMerge(instanceRes *meshresource.InstanceResource) { switch s.engineCfg.Type { case enginecfg.Kubernetes: - rtInstance := s.getRuntimeInstanceByIp(instanceRes.Spec.Ip) + rtInstance := s.getRuntimeInstanceForInstance(instanceRes) if rtInstance == nil { logger.Warnf("cannot find runtime instance for instace %s, skipping merging", instanceRes.ResourceKey()) return @@ -184,7 +208,8 @@ func (s *RPCInstanceEventSubscriber) findRelatedRuntimeInstanceAndMerge(instance } } -func (s *RPCInstanceEventSubscriber) getRuntimeInstanceByIp(ip string) *meshresource.RuntimeInstanceResource { +func (s *RPCInstanceEventSubscriber) getRuntimeInstanceForInstance(instanceRes *meshresource.InstanceResource) *meshresource.RuntimeInstanceResource { + ip := instanceRes.Spec.Ip resources, err := s.rtInstanceStore.ListByIndexes(map[string]string{ index.ByRuntimeInstanceIPIndex: ip, }) @@ -195,9 +220,29 @@ func (s *RPCInstanceEventSubscriber) getRuntimeInstanceByIp(ip string) *meshreso if len(resources) == 0 { return nil } - runtimeInstanceRes, ok := resources[0].(*meshresource.RuntimeInstanceResource) - if !ok { - return nil + candidates := make([]*meshresource.RuntimeInstanceResource, 0, len(resources)) + for _, item := range resources { + res, ok := item.(*meshresource.RuntimeInstanceResource) + if !ok { + continue + } + candidates = append(candidates, res) + } + // Prefer exact runtime identity match by app + rpcPort + mesh. + for _, candidate := range candidates { + if candidate.Spec == nil { + continue + } + if candidate.Mesh == instanceRes.Mesh && + candidate.Spec.AppName == instanceRes.Spec.AppName && + candidate.Spec.RpcPort == instanceRes.Spec.RpcPort { + return candidate + } } - return runtimeInstanceRes + keys := slice.Map(candidates, func(_ int, item *meshresource.RuntimeInstanceResource) string { + return item.ResourceKey() + }) + logger.Warnf("cannot find exact runtime instance match by identity, skip runtime merge for instance %s, ip: %s, runtime candidates: %v", + instanceRes.ResourceKey(), ip, keys) + return nil } diff --git a/pkg/core/engine/component.go b/pkg/core/engine/component.go index 992eec8ac..18623bd41 100644 --- a/pkg/core/engine/component.go +++ b/pkg/core/engine/component.go @@ -22,8 +22,6 @@ import ( "math" "reflect" - "k8s.io/client-go/tools/cache" - "github.com/apache/dubbo-admin/pkg/common/bizerror" enginecfg "github.com/apache/dubbo-admin/pkg/config/engine" "github.com/apache/dubbo-admin/pkg/core/controller" @@ -33,6 +31,7 @@ import ( meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" "github.com/apache/dubbo-admin/pkg/core/runtime" "github.com/apache/dubbo-admin/pkg/core/store" + "k8s.io/client-go/tools/cache" ) func init() { @@ -122,7 +121,7 @@ func (e *engineComponent) initInformers(cfg *enginecfg.Config, emitter events.Em if err != nil { return fmt.Errorf("can not find store for resource kind %s, %w", rk, err) } - informer := controller.NewInformerWithOptions(lw, emitter, rs, cache.MetaNamespaceKeyFunc, controller.Options{ResyncPeriod: 0}) + informer := controller.NewInformerWithOptions(lw, emitter, rs, resolveInformerKeyFunc(lw), controller.Options{ResyncPeriod: 0}) if lw.TransformFunc() != nil { err = informer.SetTransform(lw.TransformFunc()) if err != nil { @@ -135,6 +134,13 @@ func (e *engineComponent) initInformers(cfg *enginecfg.Config, emitter events.Em return nil } +func resolveInformerKeyFunc(lw controller.ResourceListerWatcher) cache.KeyFunc { + if provider, ok := lw.(controller.ResourceKeyProvider); ok { + return provider.KeyFunc() + } + return cache.MetaNamespaceKeyFunc +} + func (e *engineComponent) initSubscribers(eventbus events.EventBus) error { rs, err := e.storeRouter.ResourceKindRoute(meshresource.InstanceKind) if err != nil { diff --git a/pkg/core/engine/subscriber/runtime_instance.go b/pkg/core/engine/subscriber/runtime_instance.go index 6038462c0..c32b8dc69 100644 --- a/pkg/core/engine/subscriber/runtime_instance.go +++ b/pkg/core/engine/subscriber/runtime_instance.go @@ -92,14 +92,7 @@ func (s *RuntimeInstanceEventSubscriber) ProcessEvent(event events.Event) error // processUpsert when runtime instance added or updated, we should add/update the corresponding instance resource func (s *RuntimeInstanceEventSubscriber) processUpsert(rtInstanceRes *meshresource.RuntimeInstanceResource) error { - var instanceResource *meshresource.InstanceResource - var err error - switch rtInstanceRes.Spec.SourceEngineType { - case string(enginecfg.Kubernetes): - instanceResource, err = s.getRelatedInstanceByIP(rtInstanceRes) - default: - instanceResource, err = s.getRelatedInstanceByName(rtInstanceRes) - } + instanceResource, err := s.getRelatedInstance(rtInstanceRes) if err != nil { return err } @@ -107,12 +100,19 @@ func (s *RuntimeInstanceEventSubscriber) processUpsert(rtInstanceRes *meshresour // so we should merge the runtime info into it if instanceResource != nil { meshresource.MergeRuntimeInstanceIntoInstance(rtInstanceRes, instanceResource) - return s.instanceStore.Update(instanceResource) + if err = s.instanceStore.Update(instanceResource); err != nil { + return err + } + logger.Infof("instance lifecycle merged runtime source, instance: %s, runtime: %s, deployState: %s, hasRPCSource: %t", + instanceResource.ResourceKey(), rtInstanceRes.ResourceKey(), instanceResource.Spec.DeployState, + meshresource.HasRPCInstanceSource(instanceResource)) + return nil } // if instance resource does not exist, that is to say the rpc instance does not exist in remote registry. // we need to check whether the runtime instance resource is enough to create a new instance resource if !checkAttributesEnough(rtInstanceRes) { - logger.Warnf("cannot identify runtime instance %s to a dubbo instance, skipped creating new instance", rtInstanceRes.ResourceKey()) + logger.Warnf("cannot identify runtime instance %s to a dubbo instance, skipped creating new instance, reason: %s", + rtInstanceRes.ResourceKey(), runtimeIdentityMissingReason(rtInstanceRes)) return nil } // if conditions met, we should create a new instance resource by runtime instance @@ -121,6 +121,8 @@ func (s *RuntimeInstanceEventSubscriber) processUpsert(rtInstanceRes *meshresour logger.Errorf("add instance resource failed, instance: %s, err: %s", instanceRes.ResourceKey(), err.Error()) return err } + logger.Infof("instance lifecycle created runtime-only instance, instance: %s, runtime: %s, deployState: %s", + instanceRes.ResourceKey(), rtInstanceRes.ResourceKey(), instanceRes.Spec.DeployState) instanceAddEvent := events.NewResourceChangedEvent(cache.Added, nil, instanceRes) s.eventEmitter.Send(instanceAddEvent) logger.Debugf("runtime instance upsert trigger instance add event, event: %s", instanceAddEvent.String()) @@ -129,14 +131,7 @@ func (s *RuntimeInstanceEventSubscriber) processUpsert(rtInstanceRes *meshresour // processDelete when runtime instance deleted, we should delete the corresponding instance resource func (s *RuntimeInstanceEventSubscriber) processDelete(rtInstanceRes *meshresource.RuntimeInstanceResource) error { - var instanceResource *meshresource.InstanceResource - var err error - switch rtInstanceRes.Spec.SourceEngineType { - case string(enginecfg.Kubernetes): - instanceResource, err = s.getRelatedInstanceByIP(rtInstanceRes) - default: - instanceResource, err = s.getRelatedInstanceByName(rtInstanceRes) - } + instanceResource, err := s.getRelatedInstance(rtInstanceRes) if err != nil { return err } @@ -144,16 +139,102 @@ func (s *RuntimeInstanceEventSubscriber) processDelete(rtInstanceRes *meshresour logger.Warnf("cannot find instance resource by runtime instance %s, skipped deleting instance", rtInstanceRes.ResourceKey()) return nil } + meshresource.ClearRuntimeInstanceFromInstance(instanceResource) + if meshresource.HasRPCInstanceSource(instanceResource) { + if err = s.instanceStore.Update(instanceResource); err != nil { + logger.Errorf("update instance resource failed after runtime delete, instance: %s, err: %s", + instanceResource.ResourceKey(), err.Error()) + return err + } + logger.Infof("instance lifecycle runtime source removed, keep instance by rpc source, instance: %s, runtime: %s, registerState: registered", + instanceResource.ResourceKey(), rtInstanceRes.ResourceKey()) + instanceUpdateEvent := events.NewResourceChangedEvent(cache.Updated, instanceResource, instanceResource) + s.eventEmitter.Send(instanceUpdateEvent) + logger.Debugf("runtime instance delete trigger instance update event, event: %s", instanceUpdateEvent.String()) + return nil + } if err = s.instanceStore.Delete(instanceResource); err != nil { logger.Errorf("delete instance resource failed, instance: %s, err: %s", instanceResource.ResourceKey(), err.Error()) return err } + logger.Infof("instance lifecycle runtime source removed and no rpc source remains, deleted instance: %s, runtime: %s", + instanceResource.ResourceKey(), rtInstanceRes.ResourceKey()) instanceDeleteEvent := events.NewResourceChangedEvent(cache.Deleted, instanceResource, nil) s.eventEmitter.Send(instanceDeleteEvent) logger.Debugf("runtime instance delete trigger instance delete event, event: %s", instanceDeleteEvent.String()) return nil } +func (s *RuntimeInstanceEventSubscriber) getRelatedInstance( + rtInstanceRes *meshresource.RuntimeInstanceResource) (*meshresource.InstanceResource, error) { + if rtInstanceRes == nil || rtInstanceRes.Spec == nil { + return nil, nil + } + switch rtInstanceRes.Spec.SourceEngineType { + case string(enginecfg.Kubernetes): + return s.getRelatedKubernetesInstance(rtInstanceRes) + default: + return s.getRelatedInstanceByName(rtInstanceRes) + } +} + +func (s *RuntimeInstanceEventSubscriber) getRelatedKubernetesInstance( + rtInstanceRes *meshresource.RuntimeInstanceResource) (*meshresource.InstanceResource, error) { + if rtInstanceRes == nil || rtInstanceRes.Spec == nil { + return nil, nil + } + if strutil.IsBlank(rtInstanceRes.Spec.AppName) || + strutil.IsBlank(rtInstanceRes.Spec.Ip) || + rtInstanceRes.Spec.RpcPort <= 0 { + return nil, nil + } + instanceResName := meshresource.BuildInstanceResName(rtInstanceRes.Spec.AppName, rtInstanceRes.Spec.Ip, rtInstanceRes.Spec.RpcPort) + + // 1) Mesh-aware exact match when mesh is explicit. + if strutil.IsNotBlank(rtInstanceRes.Mesh) && constants.DefaultMesh != rtInstanceRes.Mesh { + res, exists, err := s.instanceStore.GetByKey(coremodel.BuildResourceKey(rtInstanceRes.Mesh, instanceResName)) + if err != nil { + return nil, err + } + if !exists { + return nil, nil + } + instanceRes, ok := res.(*meshresource.InstanceResource) + if !ok { + return nil, bizerror.NewAssertionError("InstanceResource", reflect.TypeOf(res).Name()) + } + return instanceRes, nil + } + + // 2) Mesh is missing/default: allow unique lookup by identity name only. + // This keeps safety (no IP fallback) while tolerating missing registry labels. + resources, err := s.instanceStore.ListByIndexes(map[string]string{ + index.ByInstanceNameIndex: instanceResName, + }) + if err != nil { + return nil, err + } + if len(resources) == 0 { + return nil, nil + } + if len(resources) > 1 { + resKeys := make([]string, 0, len(resources)) + for _, item := range resources { + if res, ok := item.(*meshresource.InstanceResource); ok { + resKeys = append(resKeys, res.ResourceKey()) + } + } + logger.Warnf("cannot merge runtime instance by name because matched resources are ambiguous, runtime: %s, name: %s, candidates: %v", + rtInstanceRes.ResourceKey(), instanceResName, resKeys) + return nil, nil + } + instanceRes, ok := resources[0].(*meshresource.InstanceResource) + if !ok { + return nil, bizerror.NewAssertionError("InstanceResource", reflect.TypeOf(resources[0]).Name()) + } + return instanceRes, nil +} + func (s *RuntimeInstanceEventSubscriber) getRelatedInstanceByName( rtInstanceRes *meshresource.RuntimeInstanceResource) (*meshresource.InstanceResource, error) { if rtInstanceRes.Spec == nil || strutil.IsBlank(rtInstanceRes.Spec.AppName) || @@ -171,21 +252,28 @@ func (s *RuntimeInstanceEventSubscriber) getRelatedInstanceByName( return nil, nil } instanceResList := make([]*meshresource.InstanceResource, len(resources)) + filtered := make([]*meshresource.InstanceResource, 0, len(resources)) for i, item := range resources { res, ok := item.(*meshresource.InstanceResource) if !ok { return nil, bizerror.NewAssertionError("InstanceResource", reflect.TypeOf(item).Name()) } instanceResList[i] = res + if strutil.IsBlank(rtInstanceRes.Mesh) || res.Mesh == rtInstanceRes.Mesh { + filtered = append(filtered, res) + } + } + if len(filtered) == 0 { + return nil, nil } - if len(instanceResList) > 1 { - resKeys := slice.Map(instanceResList, func(index int, item *meshresource.InstanceResource) string { + if len(filtered) > 1 { + resKeys := slice.Map(filtered, func(index int, item *meshresource.InstanceResource) string { return item.ResourceKey() }) logger.Warnf("there are more than two instances which have the same name, instance keys: %s, name: %s", resKeys, instanceResName) return nil, nil } - return instanceResList[0], nil + return filtered[0], nil } func (s *RuntimeInstanceEventSubscriber) getRelatedInstanceByIP( @@ -200,21 +288,28 @@ func (s *RuntimeInstanceEventSubscriber) getRelatedInstanceByIP( return nil, nil } instanceResList := make([]*meshresource.InstanceResource, len(resources)) + filtered := make([]*meshresource.InstanceResource, 0, len(resources)) for i, item := range resources { res, ok := item.(*meshresource.InstanceResource) if !ok { return nil, bizerror.NewAssertionError("InstanceResource", reflect.TypeOf(item).Name()) } instanceResList[i] = res + if strutil.IsBlank(rtInstanceRes.Mesh) || res.Mesh == rtInstanceRes.Mesh { + filtered = append(filtered, res) + } + } + if len(filtered) == 0 { + return nil, nil } - if len(instanceResList) > 1 { - resKeys := slice.Map(instanceResList, func(index int, item *meshresource.InstanceResource) string { + if len(filtered) > 1 { + resKeys := slice.Map(filtered, func(index int, item *meshresource.InstanceResource) string { return item.ResourceKey() }) logger.Warnf("there are instances which have same ip, instance keys: %s, ip: %s", resKeys, rtInstanceRes.Spec.Ip) return nil, nil } - return instanceResList[0], nil + return filtered[0], nil } func checkAttributesEnough(rtInstanceRes *meshresource.RuntimeInstanceResource) bool { @@ -225,3 +320,28 @@ func checkAttributesEnough(rtInstanceRes *meshresource.RuntimeInstanceResource) } return true } + +func runtimeIdentityMissingReason(rtInstanceRes *meshresource.RuntimeInstanceResource) string { + if rtInstanceRes == nil { + return "runtime instance is nil" + } + if rtInstanceRes.Spec == nil { + return "runtime instance spec is nil" + } + if strutil.IsBlank(rtInstanceRes.Spec.AppName) { + return "appName is empty" + } + if strutil.IsBlank(rtInstanceRes.Spec.Ip) { + return "pod ip is empty" + } + if rtInstanceRes.Spec.RpcPort <= 0 { + return "rpcPort is not configured" + } + if strutil.IsBlank(rtInstanceRes.Mesh) { + return "mesh is empty" + } + if constants.DefaultMesh == rtInstanceRes.Mesh { + return "mesh is default, missing registry identifier" + } + return "unknown" +} diff --git a/pkg/core/resource/apis/mesh/v1alpha1/instance_helper.go b/pkg/core/resource/apis/mesh/v1alpha1/instance_helper.go index ffc72babe..f7b86c346 100644 --- a/pkg/core/resource/apis/mesh/v1alpha1/instance_helper.go +++ b/pkg/core/resource/apis/mesh/v1alpha1/instance_helper.go @@ -19,6 +19,9 @@ package v1alpha1 import ( "fmt" + "time" + + "github.com/apache/dubbo-admin/pkg/common/constants" ) func BuildInstanceResName(appName string, ip string, rpcPort int64) string { @@ -79,3 +82,55 @@ func MergeRuntimeInstanceIntoInstance( instanceRes.Spec.Conditions = rtInstanceRes.Spec.Conditions instanceRes.Spec.SourceEngine = rtInstanceRes.Spec.SourceEngine } + +func ClearRPCInstanceFromInstance(instanceRes *InstanceResource) { + if instanceRes == nil || instanceRes.Spec == nil { + return + } + instanceRes.Spec.ReleaseVersion = "" + instanceRes.Spec.RegisterTime = "" + instanceRes.Spec.UnregisterTime = time.Now().Format(constants.TimeFormatStr) + instanceRes.Spec.Protocol = "" + instanceRes.Spec.Serialization = "" + instanceRes.Spec.PreferSerialization = "" + instanceRes.Spec.Tags = nil +} + +func ClearRuntimeInstanceFromInstance(instanceRes *InstanceResource) { + if instanceRes == nil || instanceRes.Spec == nil { + return + } + instanceRes.Labels = nil + instanceRes.Spec.Image = "" + instanceRes.Spec.CreateTime = "" + instanceRes.Spec.StartTime = "" + instanceRes.Spec.ReadyTime = "" + instanceRes.Spec.DeployState = "" + instanceRes.Spec.WorkloadType = "" + instanceRes.Spec.WorkloadName = "" + instanceRes.Spec.Node = "" + instanceRes.Spec.Probes = nil + instanceRes.Spec.Conditions = nil + instanceRes.Spec.SourceEngine = "" +} + +func HasRuntimeInstanceSource(instanceRes *InstanceResource) bool { + if instanceRes == nil || instanceRes.Spec == nil { + return false + } + return instanceRes.Spec.SourceEngine != "" || + instanceRes.Spec.DeployState != "" || + instanceRes.Spec.WorkloadName != "" || + instanceRes.Spec.Node != "" || + instanceRes.Spec.Image != "" || + instanceRes.Spec.StartTime != "" || + instanceRes.Spec.ReadyTime != "" || + len(instanceRes.Spec.Conditions) > 0 +} + +func HasRPCInstanceSource(instanceRes *InstanceResource) bool { + if instanceRes == nil || instanceRes.Spec == nil { + return false + } + return instanceRes.Spec.RegisterTime != "" +} diff --git a/pkg/engine/kubernetes/listerwatcher/runtime_instance.go b/pkg/engine/kubernetes/listerwatcher/runtime_instance.go index 725b9218c..bc35f3fb9 100644 --- a/pkg/engine/kubernetes/listerwatcher/runtime_instance.go +++ b/pkg/engine/kubernetes/listerwatcher/runtime_instance.go @@ -48,6 +48,7 @@ type PodListerWatcher struct { } var _ controller.ResourceListerWatcher = &PodListerWatcher{} +var _ controller.ResourceKeyProvider = &PodListerWatcher{} func NewPodListWatcher(clientset *kubernetes.Clientset, cfg *enginecfg.Config) (*PodListerWatcher, error) { var selector fields.Selector @@ -105,10 +106,7 @@ func (p *PodListerWatcher) TransformFunc() cache.TransformFunc { readyTime = c.LastTransitionTime.Format(constants.TimeFormatStr) } }) - phase := string(pod.Status.Phase) - if pod.DeletionTimestamp != nil { - phase = meshproto.InstanceTerminating - } + phase := derivePodPhase(pod) var workloadName string var workloadType string if len(pod.GetOwnerReferences()) > 0 { @@ -173,6 +171,105 @@ func (p *PodListerWatcher) TransformFunc() cache.TransformFunc { } } +func (p *PodListerWatcher) KeyFunc() cache.KeyFunc { + return p.resourceKeyFromObject +} + +func (p *PodListerWatcher) resourceKeyFromObject(obj interface{}) (string, error) { + for { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + break + } + obj = tombstone.Obj + } + switch o := obj.(type) { + case *v1.Pod: + return p.resourceKeyFromPod(o), nil + case *meshresource.RuntimeInstanceResource: + return o.ResourceKey(), nil + default: + if obj == nil { + return "", bizerror.NewAssertionError("Pod", "nil") + } + return "", bizerror.NewAssertionError("Pod", reflect.TypeOf(obj).Name()) + } +} + +func (p *PodListerWatcher) resourceKeyFromPod(pod *v1.Pod) string { + return coremodel.BuildResourceKey(p.getDubboMesh(pod), pod.Name) +} + +func derivePodPhase(pod *v1.Pod) string { + if pod == nil { + return "Unknown" + } + if pod.DeletionTimestamp != nil { + return meshproto.InstanceTerminating + } + if hasCrashingContainerStatus(pod.Status.InitContainerStatuses) || hasCrashingContainerStatus(pod.Status.ContainerStatuses) { + return meshproto.InstanceCrashing + } + switch pod.Status.Phase { + case v1.PodPending: + if hasStartingContainerStatus(pod.Status.InitContainerStatuses) || hasStartingContainerStatus(pod.Status.ContainerStatuses) { + return meshproto.InstanceStarting + } + return string(v1.PodPending) + case v1.PodRunning: + if !isPodReady(pod.Status.Conditions) { + return meshproto.InstanceStarting + } + return string(v1.PodRunning) + case v1.PodFailed: + return string(v1.PodFailed) + case v1.PodSucceeded: + return string(v1.PodSucceeded) + case v1.PodUnknown: + return string(v1.PodUnknown) + default: + return string(pod.Status.Phase) + } +} + +func isPodReady(conditions []v1.PodCondition) bool { + for _, condition := range conditions { + if condition.Type == v1.PodReady { + return condition.Status == v1.ConditionTrue + } + } + return false +} + +func hasStartingContainerStatus(statuses []v1.ContainerStatus) bool { + for _, status := range statuses { + if status.State.Waiting == nil { + continue + } + switch status.State.Waiting.Reason { + case "ContainerCreating", "PodInitializing": + return true + } + } + return false +} + +func hasCrashingContainerStatus(statuses []v1.ContainerStatus) bool { + for _, status := range statuses { + if status.State.Waiting != nil { + switch status.State.Waiting.Reason { + case "CrashLoopBackOff", "ImagePullBackOff", "ErrImagePull", "RunContainerError", + "CreateContainerConfigError", "CreateContainerError", "StartError": + return true + } + } + if status.State.Terminated != nil && status.State.Terminated.ExitCode != 0 { + return true + } + } + return false +} + func (p *PodListerWatcher) getMainContainer(pod *v1.Pod) *v1.Container { containers := pod.Spec.Containers strategy := p.cfg.Properties.MainContainerChooseStrategy diff --git a/release/kubernetes/dubbo-samples-shop/dubbo-samples-shop-all.yaml b/release/kubernetes/dubbo-samples-shop/dubbo-samples-shop-all.yaml index 91a5961e8..1bcdfdc36 100644 --- a/release/kubernetes/dubbo-samples-shop/dubbo-samples-shop-all.yaml +++ b/release/kubernetes/dubbo-samples-shop/dubbo-samples-shop-all.yaml @@ -114,6 +114,8 @@ spec: app: shop-order orderVersion: v1 app-type: dubbo + org.apache.dubbo/dubbo-rpc-port: "20882" + org.apache.dubbo/registry: nacos2.5 spec: containers: - name: shop-order @@ -161,6 +163,8 @@ spec: app: shop-order orderVersion: v2 app-type: dubbo + org.apache.dubbo/dubbo-rpc-port: "20883" + org.apache.dubbo/registry: nacos2.5 spec: containers: - name: shop-order @@ -205,6 +209,8 @@ spec: labels: app: shop-user app-type: dubbo + org.apache.dubbo/dubbo-rpc-port: "20884" + org.apache.dubbo/registry: nacos2.5 spec: containers: - name: shop-user @@ -250,6 +256,8 @@ spec: app: shop-detail detailVersion: v1 app-type: dubbo + org.apache.dubbo/dubbo-rpc-port: "20885" + org.apache.dubbo/registry: nacos2.5 spec: containers: - name: shop-detail @@ -297,6 +305,8 @@ spec: app: shop-detail detailVersion: v2 app-type: dubbo + org.apache.dubbo/dubbo-rpc-port: "20886" + org.apache.dubbo/registry: nacos2.5 spec: containers: - name: shop-detail @@ -344,6 +354,8 @@ spec: app: shop-comment commentVersion: v1 app-type: dubbo + org.apache.dubbo/dubbo-rpc-port: "20887" + org.apache.dubbo/registry: nacos2.5 spec: containers: - name: shop-comment @@ -391,6 +403,8 @@ spec: app: shop-comment commentVersion: v2 app-type: dubbo + org.apache.dubbo/dubbo-rpc-port: "20888" + org.apache.dubbo/registry: nacos2.5 spec: containers: - name: shop-comment @@ -434,6 +448,8 @@ spec: labels: app: shop-user app-type: dubbo + org.apache.dubbo/dubbo-rpc-port: "20892" + org.apache.dubbo/registry: nacos2.5 spec: containers: - name: shop-user-gray @@ -480,6 +496,8 @@ spec: app: shop-order orderVersion: v1 app-type: dubbo + org.apache.dubbo/dubbo-rpc-port: "20891" + org.apache.dubbo/registry: nacos2.5 spec: containers: - name: shop-order @@ -525,6 +543,8 @@ spec: app: shop-detail detailVersion: v1 app-type: dubbo + org.apache.dubbo/dubbo-rpc-port: "20890" + org.apache.dubbo/registry: nacos2.5 spec: containers: - name: shop-detail @@ -570,6 +590,8 @@ spec: app: shop-comment commentVersion: v1 app-type: dubbo + org.apache.dubbo/dubbo-rpc-port: "20889" + org.apache.dubbo/registry: nacos2.5 spec: containers: - name: shop-comment diff --git a/release/kubernetes/dubbo-system/dubbo-admin.yaml b/release/kubernetes/dubbo-system/dubbo-admin.yaml index 18faf418d..54216c4f2 100644 --- a/release/kubernetes/dubbo-system/dubbo-admin.yaml +++ b/release/kubernetes/dubbo-system/dubbo-admin.yaml @@ -113,6 +113,20 @@ data: id: default name: default type: kubernetes + properties: + podWatchSelector: metadata.namespace=dubbo-samples-shop + dubboAppIdentifier: + type: ByLabel + labelKey: app + dubboRPCPortIdentifier: + type: ByLabel + labelKey: org.apache.dubbo/dubbo-rpc-port + dubboDiscoveryIdentifier: + type: ByLabel + labelKey: org.apache.dubbo/registry + mainContainerChooseStrategy: + type: ByIndex + index: 0 --- apiVersion: v1 kind: Service diff --git a/ui-vue3/src/base/constants.ts b/ui-vue3/src/base/constants.ts index 166772eff..e1c865fe9 100644 --- a/ui-vue3/src/base/constants.ts +++ b/ui-vue3/src/base/constants.ts @@ -57,8 +57,8 @@ export const PRIMARY_COLOR_T = (percent: string) => computed(() => PRIMARY_COLOR export const PRIMARY_COLOR_R = computed(() => getTextColorByBackground(PRIMARY_COLOR.value)) export const INSTANCE_REGISTER_COLOR: { [key: string]: string } = { - HEALTHY: 'green', - REGISTED: 'green' + REGISTERED: 'green', + UNREGISTERED: 'default' } export const TAB_HEADER_TITLE: Component = { @@ -83,7 +83,19 @@ export const TAB_HEADER_TITLE: Component = { */ export const INSTANCE_DEPLOY_COLOR: { [key: string]: string } = { RUNNING: 'green', + STARTING: 'gold', PENDING: 'yellow', TERMINATING: 'red', - CRASHING: 'darkRed' + FAILED: 'red', + UNKNOWN: 'default', + CRASHING: 'red' +} + +export const INSTANCE_LIFECYCLE_COLOR: { [key: string]: string } = { + STARTING: 'gold', + SERVING: 'green', + DRAINING: 'orange', + TERMINATING: 'red', + ERROR: 'red', + UNKNOWN: 'default' } diff --git a/ui-vue3/src/base/i18n/en.ts b/ui-vue3/src/base/i18n/en.ts index f81082e98..777c7dc94 100644 --- a/ui-vue3/src/base/i18n/en.ts +++ b/ui-vue3/src/base/i18n/en.ts @@ -172,6 +172,7 @@ const words: I18nType = { instanceName: 'InstanceName', ip: 'Ip', name: 'Name', + lifecycleState: 'Lifecycle State', deployState: 'Deploy State', deployCluster: 'Deploy Cluster', deployClusters: 'Deploy Clusters', @@ -564,6 +565,7 @@ const words: I18nType = { dependentService: 'Dependent Service', submit: 'Submit', reset: 'Reset', + refresh: 'Refresh', router: { resource: { app: { diff --git a/ui-vue3/src/base/i18n/zh.ts b/ui-vue3/src/base/i18n/zh.ts index 3d1c47652..c382c0e9e 100644 --- a/ui-vue3/src/base/i18n/zh.ts +++ b/ui-vue3/src/base/i18n/zh.ts @@ -194,6 +194,7 @@ const words: I18nType = { instanceIP: '实例IP', ip: 'IP', name: '实例名称', + lifecycleState: '生命周期状态', deployState: '部署状态', deployCluster: '部署集群', deployClusters: '部署集群', @@ -554,6 +555,7 @@ const words: I18nType = { idx: '序号', submit: '提交', reset: '重置', + refresh: '刷新', router: { resource: { app: { diff --git a/ui-vue3/src/components/SearchTable.vue b/ui-vue3/src/components/SearchTable.vue index 5324e6a47..e67424df5 100644 --- a/ui-vue3/src/components/SearchTable.vue +++ b/ui-vue3/src/components/SearchTable.vue @@ -68,7 +68,7 @@ - +
@@ -221,22 +221,23 @@ function hideColumn(item: any) { .common-tool { margin-top: 5px; - width: 100px; cursor: pointer; position: relative; + flex: none; .button { - vertical-align: center; + display: flex; + align-items: center; + justify-content: center; line-height: 24px; font-size: 24px; - float: right; &:hover { color: v-bind('PRIMARY_COLOR'); } svg { - margin-left: 10px; + margin-left: 0; } } diff --git a/ui-vue3/src/views/resources/applications/tabs/instance.vue b/ui-vue3/src/views/resources/applications/tabs/instance.vue index ed59e4e07..f6c4624c4 100644 --- a/ui-vue3/src/views/resources/applications/tabs/instance.vue +++ b/ui-vue3/src/views/resources/applications/tabs/instance.vue @@ -39,13 +39,24 @@ +