Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions api/v1alpha1/workerresourcetemplate_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,62 @@ func validateWorkerResourceTemplateSpec(spec WorkerResourceTemplateSpec, allowed
// the controller generates the correct per-version values at render time.
// User labels (e.g. task_type: "Activity") are allowed alongside the controller-owned keys.
checkMetricSelectorLabelsNotSet(innerSpec, innerSpecPath, &allErrs)

// 8. triggers[*].metadata.workerDeploymentName / workerDeploymentBuildId
// (KEDA ScaledObject): the controller owns these for triggers of type "temporal" so
// each per-version ScaledObject queries the correct Temporal-server worker deployment.
// Allow empty-string opt-in ("") and reject any other value.
checkTemporalTriggerMetadataNotSet(innerSpec, innerSpecPath, &allErrs)
}

return warnings, allErrs
}

// checkTemporalTriggerMetadataNotSet validates that, for each KEDA trigger of type "temporal"
// in spec.triggers, the controller-owned metadata keys (workerDeploymentName,
// workerDeploymentBuildId, namespace) are either absent or set to the empty-string opt-in
// sentinel. A non-empty value is rejected — the controller fills these at render time from
// the WorkerDeployment's connection / per-version state, and a hardcoded value would point
// at the wrong worker deployment / build / Temporal namespace.
// Non-temporal triggers (prometheus, cron, etc.) are not validated.
func checkTemporalTriggerMetadataNotSet(spec map[string]interface{}, path *field.Path, allErrs *field.ErrorList) {
triggers, ok := spec["triggers"].([]interface{})
if !ok {
return
}
triggersPath := path.Child("triggers")
for i, t := range triggers {
trigger, ok := t.(map[string]interface{})
if !ok {
continue
}
triggerType, ok := trigger["type"].(string)
if !ok || triggerType != "temporal" {
continue
}
metadata, ok := trigger["metadata"].(map[string]interface{})
if !ok {
continue
}
triggerPath := triggersPath.Index(i).Child("metadata")
for _, key := range []string{"workerDeploymentName", "workerDeploymentBuildId", "namespace"} {
val, present := metadata[key]
if !present {
continue
}
s, isString := val.(string)
if !isString || s != "" {
*allErrs = append(*allErrs, field.Forbidden(
triggerPath.Child(key),
"if "+key+" is present on a temporal trigger, the controller owns it and "+
"will set it to the per-version or per-connection value; set it to \"\" to opt in "+
"to auto-injection, or remove it entirely if you do not need it",
))
}
}
}
}

// checkScaleTargetRefNotSet recursively traverses obj looking for any scaleTargetRef that is
// set to a non-empty value. If absent or empty ({}), the controller injects it to point
// at the versioned Deployment. If non-empty, reject — the controller owns this field when present,
Expand Down
117 changes: 116 additions & 1 deletion api/v1alpha1/workerresourcetemplate_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
)

// newWRT builds a WorkerResourceTemplate with an arbitrary embedded object spec.
Expand Down Expand Up @@ -423,6 +424,120 @@ func TestWorkerResourceTemplate_ValidateCreate(t *testing.T) {
}
}

// newValidatorNoAPIWithKEDA mirrors newValidatorNoAPI but additionally allows the
// KEDA ScaledObject kind, so tests of KEDA-specific validation can reach the relevant
// check without being rejected by the allow-list.
func newValidatorNoAPIWithKEDA() *temporaliov1alpha1.WorkerResourceTemplateValidator {
return &temporaliov1alpha1.WorkerResourceTemplateValidator{
Client: fake.NewClientBuilder().Build(),
RESTMapper: newFakeRESTMapper(
schema.GroupVersionKind{Group: "autoscaling", Version: "v2", Kind: "HorizontalPodAutoscaler"},
schema.GroupVersionKind{Group: "policy", Version: "v1", Kind: "PodDisruptionBudget"},
schema.GroupVersionKind{Group: "keda.sh", Version: "v1alpha1", Kind: "ScaledObject"},
),
AllowedKinds: []string{"HorizontalPodAutoscaler", "PodDisruptionBudget", "ScaledObject"},
}
}

// scaledObjectTemplateWithTemporalTrigger builds a minimal KEDA ScaledObject template
// where the user can set their own trigger metadata values; auto-injection placeholders
// for workerDeploymentName / workerDeploymentBuildId default to the empty-string sentinel
// unless overridden by the caller.
func scaledObjectTemplateWithTemporalTrigger(metadataOverrides map[string]interface{}) map[string]interface{} {
metadata := map[string]interface{}{
"endpoint": "us-east-1.aws.api.temporal.io:7233",
"namespace": "", // opt-in sentinel — controller injects from connection
"taskQueue": "my-tq",
"workerDeploymentName": "", // opt-in sentinel
"workerDeploymentBuildId": "", // opt-in sentinel
}
for k, v := range metadataOverrides {
metadata[k] = v
}
return map[string]interface{}{
"apiVersion": "keda.sh/v1alpha1",
"kind": "ScaledObject",
"spec": map[string]interface{}{
"scaleTargetRef": map[string]interface{}{}, // opt-in
"minReplicaCount": float64(1),
"maxReplicaCount": float64(10),
"triggers": []interface{}{
map[string]interface{}{
"type": "temporal",
"metadata": metadata,
},
},
},
}
}

func TestWorkerResourceTemplate_ValidateCreate_TemporalTriggerMetadata(t *testing.T) {
tests := map[string]struct {
obj runtime.Object
errorMsg string
}{
"empty workerDeploymentName + workerDeploymentBuildId opt-in is valid": {
obj: newWRT("keda-opt-in", "my-worker", scaledObjectTemplateWithTemporalTrigger(nil)),
},
"non-empty workerDeploymentName is rejected": {
obj: newWRT("keda-bad-name", "my-worker", scaledObjectTemplateWithTemporalTrigger(map[string]interface{}{
"workerDeploymentName": "some-other-name",
})),
errorMsg: "if workerDeploymentName is present on a temporal trigger, the controller owns it",
},
"non-empty workerDeploymentBuildId is rejected": {
obj: newWRT("keda-bad-build", "my-worker", scaledObjectTemplateWithTemporalTrigger(map[string]interface{}{
"workerDeploymentBuildId": "abc123",
})),
errorMsg: "if workerDeploymentBuildId is present on a temporal trigger, the controller owns it",
},
"non-empty namespace is rejected": {
obj: newWRT("keda-bad-namespace", "my-worker", scaledObjectTemplateWithTemporalTrigger(map[string]interface{}{
"namespace": "some-other-temporal-ns",
})),
errorMsg: "if namespace is present on a temporal trigger, the controller owns it",
},
"non-temporal trigger with same keys is allowed (validation only targets temporal triggers)": {
obj: newWRT("keda-prom-trigger", "my-worker", map[string]interface{}{
"apiVersion": "keda.sh/v1alpha1",
"kind": "ScaledObject",
"spec": map[string]interface{}{
"scaleTargetRef": map[string]interface{}{},
"minReplicaCount": float64(1),
"maxReplicaCount": float64(10),
"triggers": []interface{}{
map[string]interface{}{
"type": "prometheus",
"metadata": map[string]interface{}{
"serverAddress": "http://prom",
"workerDeploymentName": "anything-goes",
"workerDeploymentBuildId": "anything-goes",
},
},
},
},
}),
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
ctx := context.Background()
v := newValidatorNoAPIWithKEDA()

warnings, err := v.ValidateCreate(ctx, tc.obj)

if tc.errorMsg != "" {
require.Error(t, err, "expected an error containing %q", tc.errorMsg)
assert.Contains(t, err.Error(), tc.errorMsg)
} else {
require.NoError(t, err)
}
assert.Nil(t, warnings)
})
}
}

func TestWorkerResourceTemplate_ValidateUpdate_Immutability(t *testing.T) {
tests := map[string]struct {
oldWorkerDeploymentRef string
Expand Down
33 changes: 24 additions & 9 deletions internal/k8s/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,35 @@ import (
"strings"

"github.com/distribution/reference"
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/controller/k8s.io/utils"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/controller/k8s.io/utils"
)

const (
DeployOwnerKey = ".metadata.controller"
// BuildIDLabel is the label that identifies the build ID for a deployment
BuildIDLabel = "temporal.io/build-id"
// WorkerDeploymentNameLabel identifies Deployments managed for a TemporalWorkerDeployment.
WorkerDeploymentNameLabel = "temporal.io/deployment-name"
WorkerDeploymentNameLabel = "temporal.io/deployment-name"
// WorkerDeploymentNameSeparator joins the K8s namespace and the WorkerDeployment resource
// name to form the Temporal-server-side worker deployment name (namespace/wdName).
WorkerDeploymentNameSeparator = "/"
ResourceNameSeparator = "-"
MaxBuildIDLen = 63
MaxDeploymentNameLen = 47
ConnectionSpecHashAnnotation = "temporal.io/connection-spec-hash"
PodTemplateSpecHashAnnotation = "temporal.io/pod-template-spec-hash"
// WorkerDeploymentNameSeparatorK8sLabelCompliant is the substitute used when the
// Temporal-server worker deployment name needs to be stored in a Kubernetes label value
// (which disallows "/"). See cleanDeploymentNameForK8sLabelValue.
WorkerDeploymentNameSeparatorK8sLabelCompliant = "_"
ResourceNameSeparator = "-"
MaxBuildIDLen = 63
MaxDeploymentNameLen = 47
ConnectionSpecHashAnnotation = "temporal.io/connection-spec-hash"
PodTemplateSpecHashAnnotation = "temporal.io/pod-template-spec-hash"
)

// DeploymentState represents the Kubernetes state of all deployments for a temporal worker deployment
Expand Down Expand Up @@ -139,8 +146,12 @@ func ComputeBuildID(w *temporaliov1alpha1.WorkerDeployment) string {

// ComputeWorkerDeploymentName generates the base worker deployment name
func ComputeWorkerDeploymentName(w *temporaliov1alpha1.WorkerDeployment) string {
return computeWorkerDeploymentName(w.GetNamespace(), w.GetName())
}

func computeWorkerDeploymentName(k8sNamespace, workerDeploymentResourceName string) string {
// Use the name and namespace to form the worker deployment name
return w.GetNamespace() + WorkerDeploymentNameSeparator + w.GetName()
return k8sNamespace + WorkerDeploymentNameSeparator + workerDeploymentResourceName
}

// ComputeVersionedDeploymentName generates a name for a versioned deployment
Expand Down Expand Up @@ -192,6 +203,10 @@ func CleanStringForDNS(s string) string {
return strings.ToLower(re.ReplaceAllString(s, ResourceNameSeparator))
}

func cleanDeploymentNameForK8sLabelValue(s string) string {
return strings.ReplaceAll(s, WorkerDeploymentNameSeparator, WorkerDeploymentNameSeparatorK8sLabelCompliant)
}

// Build ID is used as a label in k8s, and as the build ID for
// the worker in Temporal. That means it needs to conform to both
// system's requirements.
Expand Down
Loading
Loading