Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api/mesh/v1alpha1/runtime_instance_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ const (
StartupProbe = "startup"
)

const InstanceTerminating = "Terminating"
const (
InstanceStarting = "Starting"
InstanceCrashing = "Crashing"
InstanceTerminating = "Terminating"
)
1 change: 1 addition & 0 deletions pkg/console/model/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func NewApplicationTabInstanceInfoReq() *ApplicationTabInstanceInfoReq {
type AppInstanceInfoResp struct {
AppName string `json:"appName"`
CreateTime string `json:"createTime"`
LifecycleState string `json:"lifecycleState"`
DeployState string `json:"deployState"`
DeployClusters string `json:"deployClusters"`
IP string `json:"ip"`
Expand Down
86 changes: 70 additions & 16 deletions pkg/console/model/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package model
import (
"github.com/duke-git/lancet/v2/strutil"

meshproto "github.com/apache/dubbo-admin/api/mesh/v1alpha1"
"github.com/apache/dubbo-admin/pkg/config/app"
meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
Expand Down Expand Up @@ -58,6 +59,7 @@ type SearchInstanceResp struct {
Name string `json:"name"`
WorkloadName string `json:"workloadName"`
AppName string `json:"appName"`
LifecycleState string `json:"lifecycleState"`
DeployState string `json:"deployState"`
DeployCluster string `json:"deployCluster"`
RegisterState string `json:"registerState"`
Expand Down Expand Up @@ -85,13 +87,10 @@ func (r *SearchInstanceResp) FromInstanceResource(instanceResource *meshresource
if cfg.Engine != nil && cfg.Engine.ID == instance.SourceEngine {
r.DeployCluster = cfg.Engine.Name
}
if r.RegisterTime != "" {
r.RegisterState = "Registered"
} else {
r.RegisterState = "UnRegistered"
}
r.RegisterState = DeriveInstanceRegisterState(instance)
r.Labels = instance.Tags
r.DeployState = instance.DeployState
r.DeployState = DeriveInstanceDeployState(instance)
r.LifecycleState = DeriveInstanceLifecycleState(instance, r.DeployState, r.RegisterState)
r.WorkloadName = instance.WorkloadName
r.AppName = instance.AppName
return r
Expand All @@ -115,6 +114,7 @@ type InstanceDetailResp struct {
RegisterTime string `json:"registerTime"`
RegisterClusters []string `json:"registerClusters"`
DeployCluster string `json:"deployCluster"`
LifecycleState string `json:"lifecycleState"`
DeployState string `json:"deployState"`
RegisterState string `json:"registerState"`
Node string `json:"node"`
Expand Down Expand Up @@ -158,16 +158,9 @@ func FromInstanceResource(res *meshresource.InstanceResource, cfg app.AdminConfi
if cfg.Engine.ID == res.Spec.SourceEngine {
r.DeployCluster = cfg.Engine.Name
}
if strutil.IsNotBlank(instance.DeployState) {
r.DeployState = instance.DeployState
} else {
r.DeployState = "Unknown"
}
if strutil.IsBlank(r.RegisterTime) {
r.RegisterState = "UnRegistered"
} else {
r.RegisterState = "Registered"
}
r.DeployState = DeriveInstanceDeployState(instance)
r.RegisterState = DeriveInstanceRegisterState(instance)
r.LifecycleState = DeriveInstanceLifecycleState(instance, r.DeployState, r.RegisterState)
r.Node = instance.Node
r.Image = instance.Image
r.Probes = ProbeStruct{}
Expand Down Expand Up @@ -196,3 +189,64 @@ func FromInstanceResource(res *meshresource.InstanceResource, cfg app.AdminConfi
}
return r
}

func DeriveInstanceDeployState(instance *meshproto.Instance) string {
if instance == nil || strutil.IsBlank(instance.DeployState) {
return "Unknown"
}
switch instance.DeployState {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: 看能不能把DeployState,RegisterState,LifecycleState统统定义成枚举/自定义类型,定义在一个地方,再写上对应的注释,就能清楚每个state的具体含义

case "Running":
if !isPodReady(instance) {
return "Starting"
}
return "Running"
default:
return instance.DeployState
}
}

func DeriveInstanceRegisterState(instance *meshproto.Instance) string {
if instance == nil || strutil.IsBlank(instance.RegisterTime) {
return "UnRegistered"
}
return "Registered"
}

func DeriveInstanceLifecycleState(instance *meshproto.Instance, deployState string, registerState string) string {
switch deployState {
case "Crashing", "Failed", "Unknown", "Succeeded":
return "Error"
case "Terminating":
return "Terminating"
}

if registerState == "Registered" {
if deployState == "Running" {
return "Serving"
}
return "Error"
}

if deployState == "Running" && strutil.IsNotBlank(instance.UnregisterTime) {
return "Draining"
}

switch deployState {
case "Pending", "Starting", "Running":
return "Starting"
default:
return "Unknown"
}
}

func isPodReady(instance *meshproto.Instance) bool {
for _, condition := range instance.Conditions {
if condition == nil {
continue
}
if condition.Type == "Ready" {
return condition.Status == "True"
}
}
return false
}
5 changes: 3 additions & 2 deletions pkg/console/service/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func buildAppInstanceInfoResp(instanceRes *meshresource.InstanceResource, cfg ap
resp.Name = instance.Name
resp.AppName = instance.AppName
resp.CreateTime = instance.CreateTime
resp.DeployState = instance.DeployState
resp.DeployState = model.DeriveInstanceDeployState(instance)
resp.LifecycleState = model.DeriveInstanceLifecycleState(instance, resp.DeployState, model.DeriveInstanceRegisterState(instance))
if cfg.Engine.ID == instance.SourceEngine {
resp.DeployClusters = cfg.Engine.Name
}
Expand All @@ -104,7 +105,7 @@ func buildAppInstanceInfoResp(instanceRes *meshresource.InstanceResource, cfg ap
if d := cfg.FindDiscovery(instanceRes.Mesh); d != nil {
resp.RegisterCluster = d.Name
}
resp.RegisterState = "Registered"
resp.RegisterState = model.DeriveInstanceRegisterState(instance)
resp.RegisterTime = instance.RegisterTime
resp.WorkloadName = instance.WorkloadName
return resp
Expand Down
42 changes: 31 additions & 11 deletions pkg/core/controller/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,10 @@ func (s *informer) HandleDeltas(obj interface{}, _ bool) error {
}
// from oldest to newest
for _, d := range deltas {
var resource model.Resource
var object interface{}
if o, ok := d.Object.(cache.DeletedFinalStateUnknown); ok {
object = o.Obj
} else {
object = d.Object
}
resource, ok := object.(model.Resource)
if !ok {
logger.Errorf("object from ListWatcher is not conformed to Resource, obj: %v", obj)
return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name())
resource, err := s.toResource(d.Object)
if err != nil {
logger.Errorf("object from ListWatcher is not conformed to Resource, obj: %v, err: %v", obj, err)
return err
}
switch d.Type {
case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
Expand All @@ -257,6 +250,8 @@ func (s *informer) HandleDeltas(obj interface{}, _ bool) error {
s.EmitEvent(cache.Added, nil, resource)
}
case cache.Deleted:
logger.Infof("informer processing delete delta, resource kind: %s, key: %s",
resource.ResourceKind().ToString(), resource.ResourceKey())
if err := s.indexer.Delete(resource); err != nil {
logger.Errorf("failed to delete resource from informer, cause %v, resource: %s,", err, resource.String())
return err
Expand All @@ -267,6 +262,31 @@ func (s *informer) HandleDeltas(obj interface{}, _ bool) error {
return nil
}

func (s *informer) toResource(obj interface{}) (model.Resource, error) {
object := obj
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
logger.Debugf("informer resolved tombstone object during delete handling, key: %s", tombstone.Key)
object = tombstone.Obj
}
if resource, ok := object.(model.Resource); ok {
return resource, nil
}
if s.transform != nil {
transformed, err := s.transform(object)
if err != nil {
return nil, err
}
if resource, ok := transformed.(model.Resource); ok {
return resource, nil
}
object = transformed
}
if object == nil {
return nil, bizerror.NewAssertionError("Resource", "nil")
}
return nil, bizerror.NewAssertionError("Resource", reflect.TypeOf(object).Name())
}

// EmitEvent emits an event to the event bus.
func (s *informer) EmitEvent(typ cache.DeltaType, oldObj model.Resource, newObj model.Resource) {
event := events.NewResourceChangedEvent(typ, oldObj, newObj)
Expand Down
6 changes: 6 additions & 0 deletions pkg/core/controller/listwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ type ResourceListerWatcher interface {
// return nil if there is no need to transform, see cache.SharedInformer for detail
TransformFunc() cache.TransformFunc
}

// ResourceKeyProvider can be optionally implemented by a ResourceListerWatcher when
// raw watch objects need a key that is consistent with the transformed resource key.
type ResourceKeyProvider interface {
KeyFunc() cache.KeyFunc
}
59 changes: 52 additions & 7 deletions pkg/core/discovery/subscriber/rpc_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package subscriber
import (
"reflect"

"github.com/duke-git/lancet/v2/slice"
"github.com/duke-git/lancet/v2/strutil"
"k8s.io/client-go/tools/cache"

Expand Down Expand Up @@ -110,7 +111,12 @@ func (s *RPCInstanceEventSubscriber) processUpsert(rpcInstanceRes *meshresource.
// We should merge the rpc info into it
if instanceRes != nil {
meshresource.MergeRPCInstanceIntoInstance(rpcInstanceRes, instanceRes)
return s.instanceStore.Update(instanceRes)
if err := s.instanceStore.Update(instanceRes); err != nil {
return err
}
logger.Infof("instance lifecycle merged rpc source, instance: %s, rpc: %s, registerState: registered, deployState: %s",
instanceRes.ResourceKey(), rpcInstanceRes.ResourceKey(), instanceRes.Spec.DeployState)
return nil
}
// Otherwise we can create a new instance resource by rpc instance
instanceRes = meshresource.FromRPCInstance(rpcInstanceRes)
Expand All @@ -119,6 +125,8 @@ func (s *RPCInstanceEventSubscriber) processUpsert(rpcInstanceRes *meshresource.
logger.Errorf("add instance resource failed, instance: %s, err: %s", instanceRes.ResourceKey(), err.Error())
return err
}
logger.Infof("instance lifecycle created rpc-only instance, instance: %s, rpc: %s, registerState: registered",
instanceRes.ResourceKey(), rpcInstanceRes.ResourceKey())

instanceAddEvent := events.NewResourceChangedEvent(cache.Added, nil, instanceRes)
s.eventEmitter.Send(instanceAddEvent)
Expand All @@ -135,10 +143,26 @@ func (s *RPCInstanceEventSubscriber) processDelete(rpcInstanceRes *meshresource.
logger.Warnf("cannot find instance resource for rpc instance %s, skipped deleting instance", rpcInstanceRes.Name)
return nil
}
meshresource.ClearRPCInstanceFromInstance(instanceRes)
if meshresource.HasRuntimeInstanceSource(instanceRes) {
if err := s.instanceStore.Update(instanceRes); err != nil {
logger.Errorf("update instance resource failed after rpc delete, instance: %s, err: %s",
instanceRes.ResourceKey(), err.Error())
return err
}
logger.Infof("instance lifecycle rpc source removed, keep instance by runtime source, instance: %s, rpc: %s, deployState: %s",
instanceRes.ResourceKey(), rpcInstanceRes.ResourceKey(), instanceRes.Spec.DeployState)
instanceUpdateEvent := events.NewResourceChangedEvent(cache.Updated, instanceRes, instanceRes)
s.eventEmitter.Send(instanceUpdateEvent)
logger.Debugf("rpc instance delete trigger instance update event, event: %s", instanceUpdateEvent.String())
return nil
}
if err := s.instanceStore.Delete(instanceRes); err != nil {
logger.Errorf("delete instance resource failed, instance: %s, err: %s", instanceRes.ResourceKey(), err.Error())
return err
}
logger.Infof("instance lifecycle rpc source removed and no runtime source remains, deleted instance: %s, rpc: %s",
instanceRes.ResourceKey(), rpcInstanceRes.ResourceKey())
instanceDeleteEvent := events.NewResourceChangedEvent(cache.Deleted, instanceRes, nil)
s.eventEmitter.Send(instanceDeleteEvent)
logger.Debugf("rpc instance delete trigger instance delete event, event: %s", instanceDeleteEvent.String())
Expand Down Expand Up @@ -173,7 +197,7 @@ func (s *RPCInstanceEventSubscriber) getRelatedInstanceRes(
func (s *RPCInstanceEventSubscriber) findRelatedRuntimeInstanceAndMerge(instanceRes *meshresource.InstanceResource) {
switch s.engineCfg.Type {
case enginecfg.Kubernetes:
rtInstance := s.getRuntimeInstanceByIp(instanceRes.Spec.Ip)
rtInstance := s.getRuntimeInstanceForInstance(instanceRes)
if rtInstance == nil {
logger.Warnf("cannot find runtime instance for instace %s, skipping merging", instanceRes.ResourceKey())
return
Expand All @@ -184,7 +208,8 @@ func (s *RPCInstanceEventSubscriber) findRelatedRuntimeInstanceAndMerge(instance
}
}

func (s *RPCInstanceEventSubscriber) getRuntimeInstanceByIp(ip string) *meshresource.RuntimeInstanceResource {
func (s *RPCInstanceEventSubscriber) getRuntimeInstanceForInstance(instanceRes *meshresource.InstanceResource) *meshresource.RuntimeInstanceResource {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussion: 这里用ip查出来之后,再用mesh,appName,rpcPort来做一个过滤选择,但可能都会被过滤掉,因为mesh,appName,rpcPort这些属性是需要用户在k8s pod部署时手动打上的。在之前的代码逻辑里面,确定engine type是k8s后用ip来查询,这是因为k8s中ip是唯一的,ip能够将pod和注册中心上的rpc instance一一关联起来,是没有问题的。所以这里建议是加一个兜底返回,或者还是用之前的逻辑

ip := instanceRes.Spec.Ip
resources, err := s.rtInstanceStore.ListByIndexes(map[string]string{
index.ByRuntimeInstanceIPIndex: ip,
})
Expand All @@ -195,9 +220,29 @@ func (s *RPCInstanceEventSubscriber) getRuntimeInstanceByIp(ip string) *meshreso
if len(resources) == 0 {
return nil
}
runtimeInstanceRes, ok := resources[0].(*meshresource.RuntimeInstanceResource)
if !ok {
return nil
candidates := make([]*meshresource.RuntimeInstanceResource, 0, len(resources))
for _, item := range resources {
res, ok := item.(*meshresource.RuntimeInstanceResource)
if !ok {
continue
}
candidates = append(candidates, res)
}
// Prefer exact runtime identity match by app + rpcPort + mesh.
for _, candidate := range candidates {
if candidate.Spec == nil {
continue
}
if candidate.Mesh == instanceRes.Mesh &&
candidate.Spec.AppName == instanceRes.Spec.AppName &&
candidate.Spec.RpcPort == instanceRes.Spec.RpcPort {
return candidate
}
}
return runtimeInstanceRes
keys := slice.Map(candidates, func(_ int, item *meshresource.RuntimeInstanceResource) string {
return item.ResourceKey()
})
logger.Warnf("cannot find exact runtime instance match by identity, skip runtime merge for instance %s, ip: %s, runtime candidates: %v",
instanceRes.ResourceKey(), ip, keys)
return nil
}
12 changes: 9 additions & 3 deletions pkg/core/engine/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"math"
"reflect"

"k8s.io/client-go/tools/cache"

"github.com/apache/dubbo-admin/pkg/common/bizerror"
enginecfg "github.com/apache/dubbo-admin/pkg/config/engine"
"github.com/apache/dubbo-admin/pkg/core/controller"
Expand All @@ -33,6 +31,7 @@ import (
meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
"github.com/apache/dubbo-admin/pkg/core/runtime"
"github.com/apache/dubbo-admin/pkg/core/store"
"k8s.io/client-go/tools/cache"
)

func init() {
Expand Down Expand Up @@ -122,7 +121,7 @@ func (e *engineComponent) initInformers(cfg *enginecfg.Config, emitter events.Em
if err != nil {
return fmt.Errorf("can not find store for resource kind %s, %w", rk, err)
}
informer := controller.NewInformerWithOptions(lw, emitter, rs, cache.MetaNamespaceKeyFunc, controller.Options{ResyncPeriod: 0})
informer := controller.NewInformerWithOptions(lw, emitter, rs, resolveInformerKeyFunc(lw), controller.Options{ResyncPeriod: 0})
if lw.TransformFunc() != nil {
err = informer.SetTransform(lw.TransformFunc())
if err != nil {
Expand All @@ -135,6 +134,13 @@ func (e *engineComponent) initInformers(cfg *enginecfg.Config, emitter events.Em
return nil
}

func resolveInformerKeyFunc(lw controller.ResourceListerWatcher) cache.KeyFunc {
if provider, ok := lw.(controller.ResourceKeyProvider); ok {
return provider.KeyFunc()
}
return cache.MetaNamespaceKeyFunc
}

func (e *engineComponent) initSubscribers(eventbus events.EventBus) error {
rs, err := e.storeRouter.ResourceKindRoute(meshresource.InstanceKind)
if err != nil {
Expand Down
Loading
Loading