diff --git a/internal/controller/reconciler_events_test.go b/internal/controller/reconciler_events_test.go index e9056697..33be6412 100644 --- a/internal/controller/reconciler_events_test.go +++ b/internal/controller/reconciler_events_test.go @@ -19,6 +19,7 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/controller/clientpool" "github.com/temporalio/temporal-worker-controller/internal/planner" deploymentpb "go.temporal.io/api/deployment/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" sdkclient "go.temporal.io/sdk/client" @@ -263,6 +264,11 @@ func (s *stubTemporalClient) ExecuteWorkflow(_ context.Context, _ sdkclient.Star return nil, s.execErr } +// Close satisfies sdkclient.Client.Close so the stub can be evicted via +// ClientPool.EvictClient without panicking through the embedded nil Client +// interface. +func (s *stubTemporalClient) Close() {} + // newStubTemporalClient returns a stub client whose WorkflowService().DescribeWorkerDeployment // returns a valid empty response, and whose ExecuteWorkflow returns execErr. func newStubTemporalClient(execErr error) *stubTemporalClient { @@ -387,6 +393,56 @@ func TestSyncConditions(t *testing.T) { }) } +func TestShouldEvictClient(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "Nil", + err: nil, + want: false, + }, + { + name: "DeadlineExceeded", + err: fmt.Errorf("wrapped: %w", context.DeadlineExceeded), + want: true, + }, + { + name: "Unavailable", + err: fmt.Errorf("wrapped: %w", serviceerror.NewUnavailable("temporary transport failure")), + want: true, + }, + { + name: "PermissionDenied", + err: fmt.Errorf("wrapped: %w", serviceerror.NewPermissionDenied("bad credentials", "")), + want: true, + }, + { + name: "Canceled", + err: context.Canceled, + want: false, + }, + { + name: "ResourceExhausted", + err: serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, "rate limited"), + want: false, + }, + { + name: "NotFound", + err: &serviceerror.NotFound{}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, shouldEvictClient(tt.err)) + }) + } +} + // ─── Reconcile tests ────────────────────────────────────────────────────────── func TestReconcile_TWDNotFound_NoEvent(t *testing.T) { @@ -671,6 +727,39 @@ func TestReconcile_DescribeWorkerDeploymentNotFound(t *testing.T) { assertNoEventEmitted(t, drainEvents(recorder), ReasonPlanGenerationFailed) } +// TestReconcile_EvictsCachedClientOnTransportFailure verifies that transport-level +// failures from the main Reconcile path evict the cached SDK client. Otherwise the +// next reconcile would reuse the same client and can remain wedged until the +// controller pod restarts and drops the in-memory pool. +func TestReconcile_EvictsCachedClientOnTransportFailure(t *testing.T) { + k8sNamespace := "default" + hostPort := "localhost:7233" + + conn := makeNoCredsConnection("my-conn", k8sNamespace, hostPort) + twd := makeWD("test-worker", k8sNamespace, conn.Name) + + r, recorder := newTestReconciler([]client.Object{twd, conn}) + + poolKey := noCredsPoolKey(conn.Spec.HostPort, twd.Spec.WorkerOptions.TemporalNamespace) + poisoned := newStubTemporalClient(nil) + poisoned.describeDeploymentErr = context.DeadlineExceeded + r.TemporalClientPool.SetClientForTesting(poolKey, poisoned) + + cached, ok := r.TemporalClientPool.GetSDKClient(poolKey) + require.True(t, ok, "poisoned client should be cached before Reconcile runs") + require.Same(t, poisoned, cached) + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: twd.Name, Namespace: twd.Namespace}, + }) + require.Error(t, err, "Reconcile must surface the Temporal Describe error") + require.ErrorIs(t, err, context.DeadlineExceeded, "the original transport error must propagate") + assertEventEmitted(t, drainEvents(recorder), temporaliov1alpha1.ReasonTemporalStateFetchFailed) + + _, ok = r.TemporalClientPool.GetSDKClient(poolKey) + require.False(t, ok, "poisoned client must be evicted so the next reconcile dials a fresh one") +} + // ─── executeK8sOperations tests ────────────────────────────────────────────── func TestExecuteK8sOperations_EmitsEventOnFailure(t *testing.T) { @@ -818,3 +907,61 @@ func TestUpdateVersionConfig_EmitsEventOnFailure(t *testing.T) { }) } } + +// TestHandleDeletion_EvictsCachedClientOnTemporalFailure verifies that +// transport-level failures inside handleDeletion evict the cached SDK client. +// NotFound is treated as "already cleaned up" and must not evict. +func TestHandleDeletion_EvictsCachedClientOnTemporalFailure(t *testing.T) { + t.Run("DescribeError_EvictsClient", func(t *testing.T) { + k8sNamespace := "default" + hostPort := "localhost:7233" + + conn := makeNoCredsConnection("my-conn", k8sNamespace, hostPort) + twd := makeWD("del-worker", k8sNamespace, conn.Name) + + r, _ := newTestReconciler([]client.Object{twd, conn}) + + poolKey := noCredsPoolKey(conn.Spec.HostPort, twd.Spec.WorkerOptions.TemporalNamespace) + poisoned := &stubTemporalClient{ + wdClient: &stubWDClient{handle: &stubWDHandle{ + describeErr: context.DeadlineExceeded, + }}, + } + r.TemporalClientPool.SetClientForTesting(poolKey, poisoned) + + // Sanity: the poisoned client is what handleDeletion will pick up. + cached, ok := r.TemporalClientPool.GetSDKClient(poolKey) + require.True(t, ok, "poisoned client should be cached before handleDeletion runs") + require.Same(t, poisoned, cached) + + err := r.handleDeletion(context.Background(), logr.Discard(), twd) + require.Error(t, err, "handleDeletion must surface the Temporal Describe error") + require.ErrorIs(t, err, context.DeadlineExceeded, "the original error must propagate so the reconciler requeues") + + _, ok = r.TemporalClientPool.GetSDKClient(poolKey) + require.False(t, ok, "poisoned client must be evicted from the pool after a Temporal-server-side failure so the next reconcile dials a fresh one") + }) + + t.Run("DescribeNotFound_RetainsClient", func(t *testing.T) { + // NotFound on Describe is treated as success (nothing to clean up). + // The cached client is healthy and must not be evicted. + k8sNamespace := "default" + hostPort := "localhost:7233" + + conn := makeNoCredsConnection("my-conn", k8sNamespace, hostPort) + twd := makeWD("del-worker", k8sNamespace, conn.Name) + + r, _ := newTestReconciler([]client.Object{twd, conn}) + + poolKey := noCredsPoolKey(conn.Spec.HostPort, twd.Spec.WorkerOptions.TemporalNamespace) + healthy := newStubTemporalClient(nil) // describeErr=&serviceerror.NotFound{} + r.TemporalClientPool.SetClientForTesting(poolKey, healthy) + + err := r.handleDeletion(context.Background(), logr.Discard(), twd) + require.NoError(t, err, "Describe returning NotFound must be treated as success") + + cached, ok := r.TemporalClientPool.GetSDKClient(poolKey) + require.True(t, ok, "a healthy cached client must remain in the pool after a successful handleDeletion") + require.Same(t, healthy, cached) + }) +} diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index aca40543..f21bda5b 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -321,7 +321,7 @@ func (r *WorkerDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req getControllerIdentity(), ) if err != nil { - if isAccessDeniedErr(err) { + if shouldEvictClient(err) { r.TemporalClientPool.EvictClient(clientPoolKey) } var rateLimitErr *serviceerror.ResourceExhausted @@ -367,7 +367,7 @@ func (r *WorkerDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req // Execute the plan, handling any errors if err := r.executePlan(ctx, l, &workerDeploy, temporalClient, plan); err != nil { - if isAccessDeniedErr(err) { + if shouldEvictClient(err) { r.TemporalClientPool.EvictClient(clientPoolKey) } r.recordWarningAndSetBlocked(ctx, &workerDeploy, @@ -545,7 +545,7 @@ func (r *WorkerDeploymentReconciler) handleDeletion( ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.WorkerDeployment, -) error { +) (retErr error) { // Resolve Connection. // The Connection is guaranteed to exist because we hold a finalizer on it // that prevents deletion while any WD references it. @@ -562,12 +562,14 @@ func (r *WorkerDeploymentReconciler) handleDeletion( return fmt.Errorf("unable to resolve auth secret name: %w", err) } - temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{ + clientPoolKey := clientpool.ClientPoolKey{ HostPort: temporalConnection.Spec.HostPort, Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace, SecretName: secretName, AuthMode: authMode, - }) + } + + temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientPoolKey) if !ok { clientOpts, key, clientAuth, err := r.TemporalClientPool.ParseClientSecret(ctx, secretName, authMode, clientpool.NewClientOptions{ K8sNamespace: workerDeploy.Namespace, @@ -585,6 +587,15 @@ func (r *WorkerDeploymentReconciler) handleDeletion( temporalClient = c } + // Evict cached SDK clients on failures that indicate the cached client may + // no longer be usable. Domain responses such as NotFound are handled as + // success below and should not churn otherwise healthy clients. + defer func() { + if shouldEvictClient(retErr) { + r.TemporalClientPool.EvictClient(clientPoolKey) + } + }() + workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy) deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(workerDeploymentName) @@ -927,3 +938,17 @@ func isAccessDeniedErr(err error) bool { } return grpcstatus.Code(err) == codes.Unauthenticated } + +func shouldEvictClient(err error) bool { + if err == nil { + return false + } + if isAccessDeniedErr(err) { + return true + } + if errors.Is(err, context.DeadlineExceeded) { + return true + } + var unavailable *serviceerror.Unavailable + return errors.As(err, &unavailable) +}