diff --git a/api/v1alpha1/workerresourcetemplate_webhook.go b/api/v1alpha1/workerresourcetemplate_webhook.go index 0adb9424..c41a18e0 100644 --- a/api/v1alpha1/workerresourcetemplate_webhook.go +++ b/api/v1alpha1/workerresourcetemplate_webhook.go @@ -313,11 +313,62 @@ func validateWorkerResourceTemplateSpec(spec WorkerResourceTemplateSpec, allowed // the controller generates the correct per-version values at render time. // User labels (e.g. task_type: "Activity") are allowed alongside the controller-owned keys. checkMetricSelectorLabelsNotSet(innerSpec, innerSpecPath, &allErrs) + + // 8. triggers[*].metadata.workerDeploymentName / workerDeploymentBuildId + // (KEDA ScaledObject): the controller owns these for triggers of type "temporal" so + // each per-version ScaledObject queries the correct Temporal-server worker deployment. + // Allow empty-string opt-in ("") and reject any other value. + checkTemporalTriggerMetadataNotSet(innerSpec, innerSpecPath, &allErrs) } return warnings, allErrs } +// checkTemporalTriggerMetadataNotSet validates that, for each KEDA trigger of type "temporal" +// in spec.triggers, the controller-owned metadata keys (workerDeploymentName, +// workerDeploymentBuildId, namespace) are either absent or set to the empty-string opt-in +// sentinel. A non-empty value is rejected — the controller fills these at render time from +// the WorkerDeployment's connection / per-version state, and a hardcoded value would point +// at the wrong worker deployment / build / Temporal namespace. +// Non-temporal triggers (prometheus, cron, etc.) are not validated. +func checkTemporalTriggerMetadataNotSet(spec map[string]interface{}, path *field.Path, allErrs *field.ErrorList) { + triggers, ok := spec["triggers"].([]interface{}) + if !ok { + return + } + triggersPath := path.Child("triggers") + for i, t := range triggers { + trigger, ok := t.(map[string]interface{}) + if !ok { + continue + } + triggerType, ok := trigger["type"].(string) + if !ok || triggerType != "temporal" { + continue + } + metadata, ok := trigger["metadata"].(map[string]interface{}) + if !ok { + continue + } + triggerPath := triggersPath.Index(i).Child("metadata") + for _, key := range []string{"workerDeploymentName", "workerDeploymentBuildId", "namespace"} { + val, present := metadata[key] + if !present { + continue + } + s, isString := val.(string) + if !isString || s != "" { + *allErrs = append(*allErrs, field.Forbidden( + triggerPath.Child(key), + "if "+key+" is present on a temporal trigger, the controller owns it and "+ + "will set it to the per-version or per-connection value; set it to \"\" to opt in "+ + "to auto-injection, or remove it entirely if you do not need it", + )) + } + } + } +} + // checkScaleTargetRefNotSet recursively traverses obj looking for any scaleTargetRef that is // set to a non-empty value. If absent or empty ({}), the controller injects it to point // at the versioned Deployment. If non-empty, reject — the controller owns this field when present, diff --git a/api/v1alpha1/workerresourcetemplate_webhook_test.go b/api/v1alpha1/workerresourcetemplate_webhook_test.go index 595e5e3f..bc7780b3 100644 --- a/api/v1alpha1/workerresourcetemplate_webhook_test.go +++ b/api/v1alpha1/workerresourcetemplate_webhook_test.go @@ -7,13 +7,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client/fake" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" ) // newWRT builds a WorkerResourceTemplate with an arbitrary embedded object spec. @@ -423,6 +424,120 @@ func TestWorkerResourceTemplate_ValidateCreate(t *testing.T) { } } +// newValidatorNoAPIWithKEDA mirrors newValidatorNoAPI but additionally allows the +// KEDA ScaledObject kind, so tests of KEDA-specific validation can reach the relevant +// check without being rejected by the allow-list. +func newValidatorNoAPIWithKEDA() *temporaliov1alpha1.WorkerResourceTemplateValidator { + return &temporaliov1alpha1.WorkerResourceTemplateValidator{ + Client: fake.NewClientBuilder().Build(), + RESTMapper: newFakeRESTMapper( + schema.GroupVersionKind{Group: "autoscaling", Version: "v2", Kind: "HorizontalPodAutoscaler"}, + schema.GroupVersionKind{Group: "policy", Version: "v1", Kind: "PodDisruptionBudget"}, + schema.GroupVersionKind{Group: "keda.sh", Version: "v1alpha1", Kind: "ScaledObject"}, + ), + AllowedKinds: []string{"HorizontalPodAutoscaler", "PodDisruptionBudget", "ScaledObject"}, + } +} + +// scaledObjectTemplateWithTemporalTrigger builds a minimal KEDA ScaledObject template +// where the user can set their own trigger metadata values; auto-injection placeholders +// for workerDeploymentName / workerDeploymentBuildId default to the empty-string sentinel +// unless overridden by the caller. +func scaledObjectTemplateWithTemporalTrigger(metadataOverrides map[string]interface{}) map[string]interface{} { + metadata := map[string]interface{}{ + "endpoint": "us-east-1.aws.api.temporal.io:7233", + "namespace": "", // opt-in sentinel — controller injects from connection + "taskQueue": "my-tq", + "workerDeploymentName": "", // opt-in sentinel + "workerDeploymentBuildId": "", // opt-in sentinel + } + for k, v := range metadataOverrides { + metadata[k] = v + } + return map[string]interface{}{ + "apiVersion": "keda.sh/v1alpha1", + "kind": "ScaledObject", + "spec": map[string]interface{}{ + "scaleTargetRef": map[string]interface{}{}, // opt-in + "minReplicaCount": float64(1), + "maxReplicaCount": float64(10), + "triggers": []interface{}{ + map[string]interface{}{ + "type": "temporal", + "metadata": metadata, + }, + }, + }, + } +} + +func TestWorkerResourceTemplate_ValidateCreate_TemporalTriggerMetadata(t *testing.T) { + tests := map[string]struct { + obj runtime.Object + errorMsg string + }{ + "empty workerDeploymentName + workerDeploymentBuildId opt-in is valid": { + obj: newWRT("keda-opt-in", "my-worker", scaledObjectTemplateWithTemporalTrigger(nil)), + }, + "non-empty workerDeploymentName is rejected": { + obj: newWRT("keda-bad-name", "my-worker", scaledObjectTemplateWithTemporalTrigger(map[string]interface{}{ + "workerDeploymentName": "some-other-name", + })), + errorMsg: "if workerDeploymentName is present on a temporal trigger, the controller owns it", + }, + "non-empty workerDeploymentBuildId is rejected": { + obj: newWRT("keda-bad-build", "my-worker", scaledObjectTemplateWithTemporalTrigger(map[string]interface{}{ + "workerDeploymentBuildId": "abc123", + })), + errorMsg: "if workerDeploymentBuildId is present on a temporal trigger, the controller owns it", + }, + "non-empty namespace is rejected": { + obj: newWRT("keda-bad-namespace", "my-worker", scaledObjectTemplateWithTemporalTrigger(map[string]interface{}{ + "namespace": "some-other-temporal-ns", + })), + errorMsg: "if namespace is present on a temporal trigger, the controller owns it", + }, + "non-temporal trigger with same keys is allowed (validation only targets temporal triggers)": { + obj: newWRT("keda-prom-trigger", "my-worker", map[string]interface{}{ + "apiVersion": "keda.sh/v1alpha1", + "kind": "ScaledObject", + "spec": map[string]interface{}{ + "scaleTargetRef": map[string]interface{}{}, + "minReplicaCount": float64(1), + "maxReplicaCount": float64(10), + "triggers": []interface{}{ + map[string]interface{}{ + "type": "prometheus", + "metadata": map[string]interface{}{ + "serverAddress": "http://prom", + "workerDeploymentName": "anything-goes", + "workerDeploymentBuildId": "anything-goes", + }, + }, + }, + }, + }), + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + v := newValidatorNoAPIWithKEDA() + + warnings, err := v.ValidateCreate(ctx, tc.obj) + + if tc.errorMsg != "" { + require.Error(t, err, "expected an error containing %q", tc.errorMsg) + assert.Contains(t, err.Error(), tc.errorMsg) + } else { + require.NoError(t, err) + } + assert.Nil(t, warnings) + }) + } +} + func TestWorkerResourceTemplate_ValidateUpdate_Immutability(t *testing.T) { tests := map[string]struct { oldWorkerDeploymentRef string diff --git a/internal/k8s/deployments.go b/internal/k8s/deployments.go index 2e5c89cc..4a8071ce 100644 --- a/internal/k8s/deployments.go +++ b/internal/k8s/deployments.go @@ -15,14 +15,15 @@ import ( "strings" "github.com/distribution/reference" - temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" - "github.com/temporalio/temporal-worker-controller/internal/controller/k8s.io/utils" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/controller/k8s.io/utils" ) const ( @@ -30,13 +31,19 @@ const ( // BuildIDLabel is the label that identifies the build ID for a deployment BuildIDLabel = "temporal.io/build-id" // WorkerDeploymentNameLabel identifies Deployments managed for a TemporalWorkerDeployment. - WorkerDeploymentNameLabel = "temporal.io/deployment-name" + WorkerDeploymentNameLabel = "temporal.io/deployment-name" + // WorkerDeploymentNameSeparator joins the K8s namespace and the WorkerDeployment resource + // name to form the Temporal-server-side worker deployment name (namespace/wdName). WorkerDeploymentNameSeparator = "/" - ResourceNameSeparator = "-" - MaxBuildIDLen = 63 - MaxDeploymentNameLen = 47 - ConnectionSpecHashAnnotation = "temporal.io/connection-spec-hash" - PodTemplateSpecHashAnnotation = "temporal.io/pod-template-spec-hash" + // WorkerDeploymentNameSeparatorK8sLabelCompliant is the substitute used when the + // Temporal-server worker deployment name needs to be stored in a Kubernetes label value + // (which disallows "/"). See cleanDeploymentNameForK8sLabelValue. + WorkerDeploymentNameSeparatorK8sLabelCompliant = "_" + ResourceNameSeparator = "-" + MaxBuildIDLen = 63 + MaxDeploymentNameLen = 47 + ConnectionSpecHashAnnotation = "temporal.io/connection-spec-hash" + PodTemplateSpecHashAnnotation = "temporal.io/pod-template-spec-hash" ) // DeploymentState represents the Kubernetes state of all deployments for a temporal worker deployment @@ -139,8 +146,12 @@ func ComputeBuildID(w *temporaliov1alpha1.WorkerDeployment) string { // ComputeWorkerDeploymentName generates the base worker deployment name func ComputeWorkerDeploymentName(w *temporaliov1alpha1.WorkerDeployment) string { + return computeWorkerDeploymentName(w.GetNamespace(), w.GetName()) +} + +func computeWorkerDeploymentName(k8sNamespace, workerDeploymentResourceName string) string { // Use the name and namespace to form the worker deployment name - return w.GetNamespace() + WorkerDeploymentNameSeparator + w.GetName() + return k8sNamespace + WorkerDeploymentNameSeparator + workerDeploymentResourceName } // ComputeVersionedDeploymentName generates a name for a versioned deployment @@ -192,6 +203,10 @@ func CleanStringForDNS(s string) string { return strings.ToLower(re.ReplaceAllString(s, ResourceNameSeparator)) } +func cleanDeploymentNameForK8sLabelValue(s string) string { + return strings.ReplaceAll(s, WorkerDeploymentNameSeparator, WorkerDeploymentNameSeparatorK8sLabelCompliant) +} + // Build ID is used as a label in k8s, and as the build ID for // the worker in Temporal. That means it needs to conform to both // system's requirements. diff --git a/internal/k8s/workerresourcetemplates.go b/internal/k8s/workerresourcetemplates.go index 8fb31ae7..413024de 100644 --- a/internal/k8s/workerresourcetemplates.go +++ b/internal/k8s/workerresourcetemplates.go @@ -7,10 +7,11 @@ import ( "fmt" "strings" - temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" ) // RenderedHashLen is the number of hex characters used for rendered-object hashes stored in @@ -40,7 +41,7 @@ const ( ) // ComputeWorkerResourceTemplateName generates a deterministic, DNS-safe name for the worker resource template -// instance corresponding to a given (twdName, wrtName, buildID) triple. +// instance corresponding to a given (wdName, wrtName, buildID) triple. // // The name has the form: // @@ -50,16 +51,16 @@ const ( // capping occurs. This guarantees that two different triples — including triples that // differ only in the buildID — always produce different names, even if the human-readable // prefix is truncated. The buildID is therefore always uniquely represented via the hash, -// regardless of how long twdName or wrtName are. -func ComputeWorkerResourceTemplateName(twdName, wrtName, buildID string) string { +// regardless of how long wdName or wrtName are. +func ComputeWorkerResourceTemplateName(wdName, wrtName, buildID string) string { // Hash the full triple first, before any truncation. - h := sha256.Sum256([]byte(twdName + wrtName + buildID)) + h := sha256.Sum256([]byte(wdName + wrtName + buildID)) hashSuffix := hex.EncodeToString(h[:workerResourceTemplateHashLen/2]) // 4 bytes → 8 hex chars // Build the human-readable prefix and truncate so the total fits in maxLen. // suffixLen = len("-") + workerResourceTemplateHashLen const suffixLen = 1 + workerResourceTemplateHashLen - raw := CleanStringForDNS(twdName + ResourceNameSeparator + wrtName + ResourceNameSeparator + buildID) + raw := CleanStringForDNS(wdName + ResourceNameSeparator + wrtName + ResourceNameSeparator + buildID) prefix := TruncateString(raw, workerResourceTemplateMaxNameLen-suffixLen) // Trim any trailing separator that results from truncating mid-segment. prefix = strings.TrimRight(prefix, ResourceNameSeparator) @@ -86,24 +87,30 @@ func RenderWorkerResourceTemplate( return nil, fmt.Errorf("failed to unmarshal spec.template: %w", err) } - twdName := wrt.Spec.EffectiveWorkerDeploymentName() - selectorLabels := ComputeSelectorLabels(twdName, buildID) + // The name of the Worker Deployment Kubernetes resource + wdName := wrt.Spec.EffectiveWorkerDeploymentName() + + // The name of the Worker Deployment in Temporal Server, prefixed by the Kubernetes namespace of the resource + twdName := computeWorkerDeploymentName(wrt.Namespace, wdName) + + selectorLabels := ComputeSelectorLabels(wdName, buildID) // Labels the controller appends to every metrics[*].external.metric.selector.matchLabels // that is present in the template. These identify the exact per-version Prometheus series. // Ordering is not a concern: matchLabels is a map; encoding/json serialises map keys in // sorted order, so ComputeRenderedObjectHash is deterministic regardless of insertion order. metricSelectorLabels := map[string]string{ - "temporal_worker_deployment_name": wrt.Namespace + "_" + twdName, + "temporal_worker_deployment_name": cleanDeploymentNameForK8sLabelValue(twdName), "temporal_worker_build_id": buildID, "temporal_namespace": temporalNamespace, } - // Step 2: auto-inject scaleTargetRef, selector.matchLabels, and metric selector labels. - // NestedFieldNoCopy returns a live reference so mutations are reflected in obj.Object directly. + // Step 2: auto-inject scaleTargetRef, selector.matchLabels, metric selector labels, + // and KEDA Temporal trigger metadata. NestedFieldNoCopy returns a live reference so + // mutations are reflected in obj.Object directly. if specRaw, ok, _ := unstructured.NestedFieldNoCopy(obj.Object, "spec"); ok { if spec, ok := specRaw.(map[string]interface{}); ok { - autoInjectFields(spec, deployment.Name, selectorLabels, metricSelectorLabels) + autoInjectFields(spec, deployment.Name, twdName, buildID, temporalNamespace, selectorLabels, metricSelectorLabels) } } @@ -152,9 +159,17 @@ func RenderWorkerResourceTemplate( // matchLabels is present (including {}). User labels like task_type coexist. // If matchLabels is absent, no injection occurs for that metric entry. // +// - spec.triggers[*].metadata (KEDA ScaledObject): for triggers of type "temporal", +// workerDeploymentName, workerDeploymentBuildId, and namespace are set whenever +// the key is present with the empty-string opt-in sentinel. The webhook rejects +// non-empty values for these keys. Required by KEDA's Temporal scaler +// (kedacore/keda#7672) for per-version backlog scaling against Worker Deployment +// Versioning. twdName must be the fully-qualified Temporal-server name +// (namespace/wdName, as returned by ComputeWorkerDeploymentName). +// // - scaleTargetRef: injected anywhere in the spec tree when {} (empty), via // injectScaleTargetRefRecursive. Unambiguous across all supported resource types. -func autoInjectFields(spec map[string]interface{}, deploymentName string, podSelectorLabels map[string]string, metricSelectorLabels map[string]string) { +func autoInjectFields(spec map[string]interface{}, deploymentName, twdName, buildID, temporalNamespace string, podSelectorLabels map[string]string, metricSelectorLabels map[string]string) { // spec.selector.matchLabels: {} opt-in sentinel. if sel, ok := spec["selector"].(map[string]interface{}); ok { if isEmptyMap(sel["matchLabels"]) { @@ -167,6 +182,9 @@ func autoInjectFields(spec map[string]interface{}, deploymentName string, podSel appendMetricsMatchLabelSelector(spec, metricSelectorLabels) } + // triggers[*].metadata: inject KEDA Temporal scaler version identifiers when present. + appendTemporalTriggerMetadata(spec, twdName, buildID, temporalNamespace) + // scaleTargetRef: inject anywhere in the spec tree. injectScaleTargetRefRecursive(spec, deploymentName) } @@ -208,6 +226,60 @@ func appendMetricsMatchLabelSelector(spec map[string]interface{}, metricSelector } } +// appendTemporalTriggerMetadata walks spec.triggers[*] and, for any KEDA trigger +// of type "temporal", sets workerDeploymentName, workerDeploymentBuildId, and namespace +// in its metadata map. Injection is opt-in via the empty-string sentinel, mirroring +// the scaleTargetRef {} sentinel pattern: absent = no injection; present-and-empty +// = inject; present-and-non-empty is rejected by the WorkerResourceTemplate validating +// webhook (defense in depth — if a non-empty value reaches the runtime, it is left +// untouched rather than silently overwritten). +// +// twdName must be the fully-qualified Temporal-server Worker Deployment name +// (computeWorkerDeploymentName(k8sNamespace, wdName)) — that's what workers register +// with at Temporal Cloud and what KEDA's scaler needs to query backlog for a specific +// version. temporalNamespace is the Temporal Cloud namespace from the connection. +// +// Required by KEDA's Temporal scaler (kedacore/keda#7672) for per-version backlog +// scaling against Temporal's Worker Deployment Versioning model. Non-temporal +// triggers in the same ScaledObject are untouched. +func appendTemporalTriggerMetadata(spec map[string]interface{}, twdName, buildID, temporalNamespace string) { + triggers, ok := spec["triggers"].([]interface{}) + if !ok { + return + } + for _, t := range triggers { + trigger, ok := t.(map[string]interface{}) + if !ok { + continue + } + triggerType, ok := trigger["type"].(string) + if !ok || triggerType != "temporal" { + continue + } + metadata, ok := trigger["metadata"].(map[string]interface{}) + if !ok { + continue + } + if isEmptyString(metadata["workerDeploymentName"]) { + metadata["workerDeploymentName"] = twdName + } + if isEmptyString(metadata["workerDeploymentBuildId"]) { + metadata["workerDeploymentBuildId"] = buildID + } + if isEmptyString(metadata["namespace"]) { + metadata["namespace"] = temporalNamespace + } + } +} + +// isEmptyString returns true if v is a string with no characters. The trigger-metadata +// injection mirrors scaleTargetRef's isEmptyMap sentinel pattern but for string fields, +// since KEDA's trigger metadata is a map of strings. +func isEmptyString(v interface{}) bool { + s, ok := v.(string) + return ok && s == "" +} + // injectScaleTargetRefRecursive recursively traverses obj and injects scaleTargetRef // wherever the key is present with an empty-object value (user opted in with {}). // scaleTargetRef is unambiguous across all supported resource types, so recursive diff --git a/internal/k8s/workerresourcetemplates_test.go b/internal/k8s/workerresourcetemplates_test.go index 6b6b2dc9..e4253c8f 100644 --- a/internal/k8s/workerresourcetemplates_test.go +++ b/internal/k8s/workerresourcetemplates_test.go @@ -9,11 +9,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" ) // expectedWorkerResourceTemplateName replicates the naming logic for use in tests. @@ -90,7 +91,7 @@ func TestAutoInjectFields_ScaleTargetRef(t *testing.T) { "minReplicas": 1, "maxReplicas": 5, } - autoInjectFields(spec, "my-worker-abc123", selectorLabels, nil) + autoInjectFields(spec, "my-worker-abc123", "my-worker", "abc123", "my-temporal-ns", selectorLabels, nil) _, hasKey := spec["scaleTargetRef"] assert.False(t, hasKey, "scaleTargetRef should not be injected when absent (user must opt in with {})") }) @@ -99,7 +100,7 @@ func TestAutoInjectFields_ScaleTargetRef(t *testing.T) { spec := map[string]interface{}{ "scaleTargetRef": map[string]interface{}{}, } - autoInjectFields(spec, "my-worker-abc123", selectorLabels, nil) + autoInjectFields(spec, "my-worker-abc123", "my-worker", "abc123", "my-temporal-ns", selectorLabels, nil) ref, ok := spec["scaleTargetRef"].(map[string]interface{}) require.True(t, ok) assert.Equal(t, "my-worker-abc123", ref["name"]) @@ -114,7 +115,7 @@ func TestAutoInjectFields_ScaleTargetRef(t *testing.T) { "kind": "Deployment", }, } - autoInjectFields(spec, "my-worker-abc123", selectorLabels, nil) + autoInjectFields(spec, "my-worker-abc123", "my-worker", "abc123", "my-temporal-ns", selectorLabels, nil) ref := spec["scaleTargetRef"].(map[string]interface{}) assert.Equal(t, "custom-deployment", ref["name"], "should not overwrite user-provided ref") }) @@ -130,7 +131,7 @@ func TestAutoInjectFields_MatchLabels(t *testing.T) { spec := map[string]interface{}{ "selector": map[string]interface{}{}, } - autoInjectFields(spec, "my-worker-abc123", selectorLabels, nil) + autoInjectFields(spec, "my-worker-abc123", "my-worker", "abc123", "my-temporal-ns", selectorLabels, nil) selector := spec["selector"].(map[string]interface{}) _, hasKey := selector["matchLabels"] assert.False(t, hasKey, "matchLabels should not be injected when absent (user must opt in with {})") @@ -142,7 +143,7 @@ func TestAutoInjectFields_MatchLabels(t *testing.T) { "matchLabels": map[string]interface{}{}, }, } - autoInjectFields(spec, "my-worker-abc123", selectorLabels, nil) + autoInjectFields(spec, "my-worker-abc123", "my-worker", "abc123", "my-temporal-ns", selectorLabels, nil) selector := spec["selector"].(map[string]interface{}) labels, ok := selector["matchLabels"].(map[string]interface{}) require.True(t, ok) @@ -158,7 +159,7 @@ func TestAutoInjectFields_MatchLabels(t *testing.T) { }, }, } - autoInjectFields(spec, "my-worker-abc123", selectorLabels, nil) + autoInjectFields(spec, "my-worker-abc123", "my-worker", "abc123", "my-temporal-ns", selectorLabels, nil) selector := spec["selector"].(map[string]interface{}) labels := selector["matchLabels"].(map[string]interface{}) assert.Equal(t, "label", labels["custom"], "should not overwrite user-provided labels") @@ -189,7 +190,7 @@ func TestAutoInjectFields_MatchLabels(t *testing.T) { }, }, } - autoInjectFields(spec, "my-worker-abc123", selectorLabels, metricLabels) + autoInjectFields(spec, "my-worker-abc123", "my-worker", "abc123", "my-temporal-ns", selectorLabels, metricLabels) // spec.selector.matchLabels gets pod selector labels only topSelector := spec["selector"].(map[string]interface{}) @@ -236,7 +237,7 @@ func TestAutoInjectFields_MetricSelector(t *testing.T) { t.Run("injects temporal labels when matchLabels is empty ({})", func(t *testing.T) { spec := metricSpec(map[string]interface{}{}) - autoInjectFields(spec, "my-worker-abc123", podLabels, metricLabels) + autoInjectFields(spec, "my-worker-abc123", "my-worker", "abc123", "my-temporal-ns", podLabels, metricLabels) ml := spec["metrics"].([]interface{})[0].(map[string]interface{})["external"].(map[string]interface{})["metric"].(map[string]interface{})["selector"].(map[string]interface{})["matchLabels"].(map[string]interface{}) assert.Equal(t, "default_my-worker", ml["temporal_worker_deployment_name"]) assert.Equal(t, "abc123", ml["temporal_worker_build_id"]) @@ -245,7 +246,7 @@ func TestAutoInjectFields_MetricSelector(t *testing.T) { t.Run("merges temporal labels alongside user labels", func(t *testing.T) { spec := metricSpec(map[string]interface{}{"task_type": "Activity"}) - autoInjectFields(spec, "my-worker-abc123", podLabels, metricLabels) + autoInjectFields(spec, "my-worker-abc123", "my-worker", "abc123", "my-temporal-ns", podLabels, metricLabels) ml := spec["metrics"].([]interface{})[0].(map[string]interface{})["external"].(map[string]interface{})["metric"].(map[string]interface{})["selector"].(map[string]interface{})["matchLabels"].(map[string]interface{}) assert.Equal(t, "Activity", ml["task_type"], "user label must be preserved") assert.Equal(t, "default_my-worker", ml["temporal_worker_deployment_name"]) @@ -266,7 +267,7 @@ func TestAutoInjectFields_MetricSelector(t *testing.T) { }, }, } - autoInjectFields(spec, "my-worker-abc123", podLabels, metricLabels) + autoInjectFields(spec, "my-worker-abc123", "my-worker", "abc123", "my-temporal-ns", podLabels, metricLabels) sel := spec["metrics"].([]interface{})[0].(map[string]interface{})["external"].(map[string]interface{})["metric"].(map[string]interface{})["selector"].(map[string]interface{}) _, hasMatchLabels := sel["matchLabels"] assert.False(t, hasMatchLabels, "matchLabels must not be created when absent") @@ -274,12 +275,143 @@ func TestAutoInjectFields_MetricSelector(t *testing.T) { t.Run("no-op when metricSelectorLabels is nil", func(t *testing.T) { spec := metricSpec(map[string]interface{}{}) - autoInjectFields(spec, "my-worker-abc123", podLabels, nil) + autoInjectFields(spec, "my-worker-abc123", "my-worker", "abc123", "my-temporal-ns", podLabels, nil) ml := spec["metrics"].([]interface{})[0].(map[string]interface{})["external"].(map[string]interface{})["metric"].(map[string]interface{})["selector"].(map[string]interface{})["matchLabels"].(map[string]interface{}) assert.Empty(t, ml, "no metric labels should be injected when metricSelectorLabels is nil") }) } +// scaledObjectSpec builds a minimal KEDA ScaledObject spec with one temporal trigger +// whose metadata starts with the given base entries. +func scaledObjectSpec(temporalMetadata map[string]interface{}) map[string]interface{} { + return map[string]interface{}{ + "scaleTargetRef": map[string]interface{}{}, // opt in to auto-injection + "triggers": []interface{}{ + map[string]interface{}{ + "type": "temporal", + "metadata": temporalMetadata, + }, + }, + } +} + +func TestAutoInjectFields_TemporalTriggerMetadata(t *testing.T) { + const twdName = "my-worker" + const buildID = "abc123" + + t.Run("injects workerDeploymentName, workerDeploymentBuildId, namespace when keys are present (empty string)", func(t *testing.T) { + spec := scaledObjectSpec(map[string]interface{}{ + "endpoint": "us-east-1.aws.api.temporal.io:7233", + "namespace": "", // opt-in sentinel + "taskQueue": "my-tq", + "workerDeploymentName": "", + "workerDeploymentBuildId": "", + }) + autoInjectFields(spec, "my-worker-abc123", twdName, buildID, "my-temporal-ns", nil, nil) + md := spec["triggers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{}) + assert.Equal(t, twdName, md["workerDeploymentName"]) + assert.Equal(t, buildID, md["workerDeploymentBuildId"]) + assert.Equal(t, "my-temporal-ns", md["namespace"], "namespace must be auto-injected from the Temporal connection") + // User-set fields are preserved. + assert.Equal(t, "us-east-1.aws.api.temporal.io:7233", md["endpoint"]) + assert.Equal(t, "my-tq", md["taskQueue"]) + }) + + t.Run("does not inject namespace when key is absent", func(t *testing.T) { + spec := scaledObjectSpec(map[string]interface{}{ + "endpoint": "us-east-1.aws.api.temporal.io:7233", + "taskQueue": "my-tq", + "workerDeploymentName": "", + "workerDeploymentBuildId": "", + // no namespace key at all + }) + autoInjectFields(spec, "my-worker-abc123", twdName, buildID, "my-temporal-ns", nil, nil) + md := spec["triggers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{}) + _, hasNamespace := md["namespace"] + assert.False(t, hasNamespace, "namespace must not be added when absent (opt-in)") + }) + + t.Run("does not overwrite non-empty user values (webhook rejects, runtime is defensive)", func(t *testing.T) { + // The validating webhook is the primary line of defence: a WorkerResourceTemplate + // whose template contains a non-empty workerDeploymentName or workerDeploymentBuildId + // is rejected at admission. The runtime injection is defensive: if a non-empty value + // somehow reaches it, the user-provided value is preserved (consistent with the + // scaleTargetRef pattern where only {} opts in). + spec := scaledObjectSpec(map[string]interface{}{ + "workerDeploymentName": "user-set-name", + "workerDeploymentBuildId": "user-set-build", + }) + autoInjectFields(spec, "my-worker-abc123", twdName, buildID, "my-temporal-ns", nil, nil) + md := spec["triggers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{}) + assert.Equal(t, "user-set-name", md["workerDeploymentName"], "runtime must not silently overwrite non-empty user value") + assert.Equal(t, "user-set-build", md["workerDeploymentBuildId"], "runtime must not silently overwrite non-empty user value") + }) + + t.Run("does not inject when keys are absent", func(t *testing.T) { + spec := scaledObjectSpec(map[string]interface{}{ + "endpoint": "us-east-1.aws.api.temporal.io:7233", + "namespace": "default", + "taskQueue": "my-tq", + }) + autoInjectFields(spec, "my-worker-abc123", twdName, buildID, "my-temporal-ns", nil, nil) + md := spec["triggers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{}) + _, hasName := md["workerDeploymentName"] + _, hasBuild := md["workerDeploymentBuildId"] + assert.False(t, hasName, "workerDeploymentName must not be added when absent (opt-in)") + assert.False(t, hasBuild, "workerDeploymentBuildId must not be added when absent (opt-in)") + }) + + t.Run("does not touch non-temporal triggers", func(t *testing.T) { + spec := map[string]interface{}{ + "triggers": []interface{}{ + map[string]interface{}{ + "type": "prometheus", + "metadata": map[string]interface{}{ + "serverAddress": "http://prom", + "workerDeploymentName": "should-stay-untouched", + "workerDeploymentBuildId": "should-stay-untouched", + }, + }, + map[string]interface{}{ + "type": "temporal", + "metadata": map[string]interface{}{ + "workerDeploymentName": "", + "workerDeploymentBuildId": "", + }, + }, + }, + } + autoInjectFields(spec, "my-worker-abc123", twdName, buildID, "my-temporal-ns", nil, nil) + triggers := spec["triggers"].([]interface{}) + promMd := triggers[0].(map[string]interface{})["metadata"].(map[string]interface{}) + assert.Equal(t, "should-stay-untouched", promMd["workerDeploymentName"], "prometheus trigger must not be modified") + assert.Equal(t, "should-stay-untouched", promMd["workerDeploymentBuildId"], "prometheus trigger must not be modified") + tempMd := triggers[1].(map[string]interface{})["metadata"].(map[string]interface{}) + assert.Equal(t, twdName, tempMd["workerDeploymentName"]) + assert.Equal(t, buildID, tempMd["workerDeploymentBuildId"]) + }) + + t.Run("scaleTargetRef auto-injection still works for ScaledObject", func(t *testing.T) { + spec := scaledObjectSpec(map[string]interface{}{ + "workerDeploymentName": "", + "workerDeploymentBuildId": "", + }) + autoInjectFields(spec, "my-worker-abc123", twdName, buildID, "my-temporal-ns", nil, nil) + ref := spec["scaleTargetRef"].(map[string]interface{}) + assert.Equal(t, "my-worker-abc123", ref["name"]) + assert.Equal(t, "Deployment", ref["kind"]) + }) + + t.Run("no-op when triggers is absent", func(t *testing.T) { + spec := map[string]interface{}{ + "minReplicas": 1, + } + autoInjectFields(spec, "my-worker-abc123", twdName, buildID, "my-temporal-ns", nil, nil) + _, hasTriggers := spec["triggers"] + assert.False(t, hasTriggers) + }) +} + func TestRenderWorkerResourceTemplate(t *testing.T) { hpaSpec := map[string]interface{}{ "apiVersion": "autoscaling/v2",