Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .gimps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ sets:
- 'k8s.io/**'
- 'sigs.k8s.io/controller-runtime/**'
- 'sigs.k8s.io/controller-tools/**'
- 'sigs.k8s.io/multicluster-runtime/**'
- 'sigs.k8s.io/yaml/**'
- 'github.com/kcp-dev/client-go/**'
- 'github.com/kcp-dev/kubernetes/**'
- name: kcp
patterns:
- 'github.com/kcp-dev/kcp/**'
- 'github.com/kcp-dev/multicluster-provider/**'
- 'github.com/kcp-dev/sdk/**'
- 'github.com/kcp-dev/logicalcluster/**'
- 'github.com/kcp-dev/code-generator/**'
- 'sigs.k8s.io/multicluster-runtime/**'
- 'github.com/kcp-dev/multicluster-provider/**'
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ linters:
# Controller Runtime
- pkg: sigs.k8s.io/controller-runtime/pkg/client
alias: ctrlruntimeclient
# kcp APIs
- pkg: github.com/kcp-dev/sdk/apis/(\w+)/(v[\w\d]+)
alias: kcp$1$2
no-unaliased: true
exclusions:
generated: lax
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ install-yq:
@UNCOMPRESSED=true hack/uget.sh https://github.com/mikefarah/yq/releases/download/v{VERSION}/yq_{GOOS}_{GOARCH} yq $(YQ_VERSION) yq_*

.PHONY: install-kcp
install-kcp: UGET_CHECKSUMS=false # do not checksum because the version regularly gets overwritten in CI jobs
install-kcp: UGET_CHECKSUMS= # do not checksum because the version regularly gets overwritten in CI jobs
install-kcp:
@hack/uget.sh https://github.com/kcp-dev/kcp/releases/download/v{VERSION}/kcp_{VERSION}_{GOOS}_{GOARCH}.tar.gz kcp $(KCP_VERSION)

Expand Down
144 changes: 54 additions & 90 deletions cmd/api-syncagent/kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import (
"fmt"
"regexp"

"github.com/kcp-dev/logicalcluster/v3"

"github.com/kcp-dev/api-syncagent/internal/kcp"

kcpdevv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
kcpdevcore "github.com/kcp-dev/kcp/sdk/apis/core"
kcpdevcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
"github.com/kcp-dev/logicalcluster/v3"
kcpapisv1alpha1 "github.com/kcp-dev/sdk/apis/apis/v1alpha1"
kcpcore "github.com/kcp-dev/sdk/apis/core"
kcpcorev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1"

"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -39,44 +38,29 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cluster"
)

// The agent has two potentially different kcp clusters:
//
// endpointCluster - this is where the source of the virtual workspace URLs
// live, i.e. where the APIExport/EndpointSlice.
// managedCluster - this is where the APIExport and APIResourceSchemas
// exist that are meant to be reconciled.
//
// The managedCluster always exists, the endpointCluster only if the workspace
// for the virtual workspace source is different from the managed cluster.

// setupEndpointKcpCluster sets up a plain, non-kcp-aware ctrl-runtime Cluster object
// that is solvely used to watch whichever object holds the virtual workspace URLs,
// either the APIExport or the APIExportEndpointSlice.
func setupEndpointKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
// no need for a dedicated endpoint cluster
if endpoint.EndpointSlice == nil || endpoint.EndpointSlice.Cluster == endpoint.APIExport.Cluster {
return nil, nil
}

func setupEndpointKcpCluster(endpointSlice qualifiedAPIExportEndpointSlice) (cluster.Cluster, error) {
scheme := runtime.NewScheme()

if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
if err := kcpapisv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpapisv1alpha1.SchemeGroupVersion, err)
}

if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
if err := kcpcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpcorev1alpha1.SchemeGroupVersion, err)
}

// RBAC in kcp might be very tight and might not allow to list/watch all objects;
// restrict the cache's selectors accordingly so we can still make use of caching.
byObject := map[ctrlruntimeclient.Object]cache.ByObject{
&kcpdevv1alpha1.APIExportEndpointSlice{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpoint.EndpointSlice.Name}),
&kcpapisv1alpha1.APIExportEndpointSlice{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpointSlice.Name}),
},
}

return cluster.New(endpoint.EndpointSlice.Config, func(o *cluster.Options) {
return cluster.New(endpointSlice.Config, func(o *cluster.Options) {
o.Scheme = scheme
o.Cache = cache.Options{
Scheme: scheme,
Expand All @@ -87,26 +71,26 @@ func setupEndpointKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {

// setupManagedKcpCluster sets up a plain, non-kcp-aware ctrl-runtime Cluster object
// that is solvely used to manage the APIExport and APIResourceSchemas.
func setupManagedKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
func setupManagedKcpCluster(apiExport qualifiedAPIExport) (cluster.Cluster, error) {
scheme := runtime.NewScheme()

if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
if err := kcpapisv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpapisv1alpha1.SchemeGroupVersion, err)
}

if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
if err := kcpcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpcorev1alpha1.SchemeGroupVersion, err)
}

// RBAC in kcp might be very tight and might not allow to list/watch all objects;
// restrict the cache's selectors accordingly so we can still make use of caching.
byObject := map[ctrlruntimeclient.Object]cache.ByObject{
&kcpdevv1alpha1.APIExport{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpoint.APIExport.Name}),
&kcpapisv1alpha1.APIExport{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": apiExport.Name}),
},
}

return cluster.New(endpoint.APIExport.Config, func(o *cluster.Options) {
return cluster.New(apiExport.Config, func(o *cluster.Options) {
o.Scheme = scheme
o.Cache = cache.Options{
Scheme: scheme,
Expand All @@ -122,18 +106,18 @@ type qualifiedCluster struct {
}

type qualifiedAPIExport struct {
*kcpdevv1alpha1.APIExport
*kcpapisv1alpha1.APIExport
qualifiedCluster
}

type qualifiedAPIExportEndpointSlice struct {
*kcpdevv1alpha1.APIExportEndpointSlice
*kcpapisv1alpha1.APIExportEndpointSlice
qualifiedCluster
}

type syncEndpoint struct {
APIExport qualifiedAPIExport
EndpointSlice *qualifiedAPIExportEndpointSlice
EndpointSlice qualifiedAPIExportEndpointSlice
}

// resolveSyncEndpoint takes the user provided (usually via CLI flags) APIExportEndpointSliceRef and
Expand All @@ -142,14 +126,14 @@ type syncEndpoint struct {
// must point to the cluster where the APIExport lives, and vice versa for the endpoint slice;
// however the endpoint slice references an APIExport in potentially another cluster, and for this
// case the initialRestConfig will be rewritten accordingly).
func resolveSyncEndpoint(ctx context.Context, initialRestConfig *rest.Config, endpointSliceRef string, apiExportRef string) (*syncEndpoint, error) {
func resolveSyncEndpoint(ctx context.Context, initialRestConfig *rest.Config, endpointSliceRef string) (*syncEndpoint, error) {
// construct temporary, uncached client
scheme := runtime.NewScheme()
if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
if err := kcpcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpcorev1alpha1.SchemeGroupVersion, err)
}
if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
if err := kcpapisv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpapisv1alpha1.SchemeGroupVersion, err)
}

clientOpts := ctrlruntimeclient.Options{Scheme: scheme}
Expand All @@ -160,58 +144,38 @@ func resolveSyncEndpoint(ctx context.Context, initialRestConfig *rest.Config, en

se := &syncEndpoint{}

// When an endpoint ref is given, both the APIExportEndpointSlice and the APIExport must exist.
if endpointSliceRef != "" {
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, endpointSliceRef)
if err != nil {
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
}
endpointSlice.Config = initialRestConfig

// find the APIExport referenced not by the user (can't: both ref parameters to this function
// are mutually exclusive), but in the APIExportEndpointSlice.
restConfig, err := retargetRestConfig(initialRestConfig, endpointSlice.Spec.APIExport.Path)
if err != nil {
return nil, fmt.Errorf("failed to re-target the given kubeconfig to cluster %q: %w", endpointSlice.Spec.APIExport.Path, err)
}

client, err := ctrlruntimeclient.New(restConfig, clientOpts)
if err != nil {
return nil, fmt.Errorf("failed to create service reader: %w", err)
}
// First we find the APIExportEndpointSlice.
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, endpointSliceRef)
if err != nil {
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
}
endpointSlice.Config = initialRestConfig

apiExport, err := resolveAPIExport(ctx, client, endpointSlice.Spec.APIExport.Name)
if err != nil {
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
}
apiExport.Config = restConfig

se.APIExport = apiExport
se.EndpointSlice = &endpointSlice
} else { // if an export ref is given, the endpoint slice is optional (for compat with kcp <0.28)
apiExport, err := resolveAPIExport(ctx, client, apiExportRef)
if err != nil {
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
}
apiExport.Config = initialRestConfig
// Now we find the APIExport referenced in the APIExportEndpointSlice.
restConfig, err := retargetRestConfig(initialRestConfig, endpointSlice.Spec.APIExport.Path)
if err != nil {
return nil, fmt.Errorf("failed to re-target the given kubeconfig to cluster %q: %w", endpointSlice.Spec.APIExport.Path, err)
}

se.APIExport = apiExport
client, err = ctrlruntimeclient.New(restConfig, clientOpts)
if err != nil {
return nil, fmt.Errorf("failed to create service reader: %w", err)
}

// try to find an endpoint slice in the same workspace with the same name as the APIExport
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, apiExportRef)
if ctrlruntimeclient.IgnoreNotFound(err) != nil {
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
} else if err == nil {
apiExport.Config = initialRestConfig
se.EndpointSlice = &endpointSlice
}
apiExport, err := resolveAPIExport(ctx, client, endpointSlice.Spec.APIExport.Name)
if err != nil {
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
}
apiExport.Config = restConfig

se.APIExport = apiExport
se.EndpointSlice = endpointSlice

return se, nil
}

func resolveAPIExportEndpointSlice(ctx context.Context, client ctrlruntimeclient.Client, ref string) (qualifiedAPIExportEndpointSlice, error) {
endpointSlice := &kcpdevv1alpha1.APIExportEndpointSlice{}
endpointSlice := &kcpapisv1alpha1.APIExportEndpointSlice{}
key := types.NamespacedName{Name: ref}
if err := client.Get(ctx, key, endpointSlice); err != nil {
return qualifiedAPIExportEndpointSlice{}, fmt.Errorf("failed to get APIExportEndpointSlice %q: %w", ref, err)
Expand All @@ -232,7 +196,7 @@ func resolveAPIExportEndpointSlice(ctx context.Context, client ctrlruntimeclient
}

func resolveAPIExport(ctx context.Context, client ctrlruntimeclient.Client, ref string) (qualifiedAPIExport, error) {
apiExport := &kcpdevv1alpha1.APIExport{}
apiExport := &kcpapisv1alpha1.APIExport{}
key := types.NamespacedName{Name: ref}
if err := client.Get(ctx, key, apiExport); err != nil {
return qualifiedAPIExport{}, fmt.Errorf("failed to get APIExport %q: %w", ref, err)
Expand All @@ -253,13 +217,13 @@ func resolveAPIExport(ctx context.Context, client ctrlruntimeclient.Client, ref
}

func resolveCurrentCluster(ctx context.Context, client ctrlruntimeclient.Client) (logicalcluster.Name, logicalcluster.Path, error) {
lc := &kcpdevcorev1alpha1.LogicalCluster{}
lc := &kcpcorev1alpha1.LogicalCluster{}
if err := client.Get(ctx, types.NamespacedName{Name: kcp.IdentityClusterName}, lc); err != nil {
return "", logicalcluster.None, fmt.Errorf("failed to resolve current workspace: %w", err)
}

lcName := logicalcluster.From(lc)
lcPath := logicalcluster.NewPath(lc.Annotations[kcpdevcore.LogicalClusterPathAnnotationKey])
lcPath := logicalcluster.NewPath(lc.Annotations[kcpcore.LogicalClusterPathAnnotationKey])

return lcName, lcPath, nil
}
Expand Down
Loading