Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions internal/controller/reconciler_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/temporalio/temporal-worker-controller/internal/controller/clientpool"
"github.com/temporalio/temporal-worker-controller/internal/planner"
deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
Expand Down Expand Up @@ -263,6 +264,11 @@ func (s *stubTemporalClient) ExecuteWorkflow(_ context.Context, _ sdkclient.Star
return nil, s.execErr
}

// Close satisfies sdkclient.Client.Close so the stub can be evicted via
// ClientPool.EvictClient without panicking through the embedded nil Client
// interface.
func (s *stubTemporalClient) Close() {}

// newStubTemporalClient returns a stub client whose WorkflowService().DescribeWorkerDeployment
// returns a valid empty response, and whose ExecuteWorkflow returns execErr.
func newStubTemporalClient(execErr error) *stubTemporalClient {
Expand Down Expand Up @@ -387,6 +393,56 @@ func TestSyncConditions(t *testing.T) {
})
}

func TestShouldEvictClient(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{
name: "Nil",
err: nil,
want: false,
},
{
name: "DeadlineExceeded",
err: fmt.Errorf("wrapped: %w", context.DeadlineExceeded),
want: true,
},
{
name: "Unavailable",
err: fmt.Errorf("wrapped: %w", serviceerror.NewUnavailable("temporary transport failure")),
want: true,
},
{
name: "PermissionDenied",
err: fmt.Errorf("wrapped: %w", serviceerror.NewPermissionDenied("bad credentials", "")),
want: true,
},
{
name: "Canceled",
err: context.Canceled,
want: false,
},
{
name: "ResourceExhausted",
err: serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, "rate limited"),
want: false,
},
{
name: "NotFound",
err: &serviceerror.NotFound{},
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, shouldEvictClient(tt.err))
})
}
}

// ─── Reconcile tests ──────────────────────────────────────────────────────────

func TestReconcile_TWDNotFound_NoEvent(t *testing.T) {
Expand Down Expand Up @@ -671,6 +727,39 @@ func TestReconcile_DescribeWorkerDeploymentNotFound(t *testing.T) {
assertNoEventEmitted(t, drainEvents(recorder), ReasonPlanGenerationFailed)
}

// TestReconcile_EvictsCachedClientOnTransportFailure verifies that transport-level
// failures from the main Reconcile path evict the cached SDK client. Otherwise the
// next reconcile would reuse the same client and can remain wedged until the
// controller pod restarts and drops the in-memory pool.
func TestReconcile_EvictsCachedClientOnTransportFailure(t *testing.T) {
k8sNamespace := "default"
hostPort := "localhost:7233"

conn := makeNoCredsConnection("my-conn", k8sNamespace, hostPort)
twd := makeWD("test-worker", k8sNamespace, conn.Name)

r, recorder := newTestReconciler([]client.Object{twd, conn})

poolKey := noCredsPoolKey(conn.Spec.HostPort, twd.Spec.WorkerOptions.TemporalNamespace)
poisoned := newStubTemporalClient(nil)
poisoned.describeDeploymentErr = context.DeadlineExceeded
r.TemporalClientPool.SetClientForTesting(poolKey, poisoned)

cached, ok := r.TemporalClientPool.GetSDKClient(poolKey)
require.True(t, ok, "poisoned client should be cached before Reconcile runs")
require.Same(t, poisoned, cached)

_, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: twd.Name, Namespace: twd.Namespace},
})
require.Error(t, err, "Reconcile must surface the Temporal Describe error")
require.ErrorIs(t, err, context.DeadlineExceeded, "the original transport error must propagate")
assertEventEmitted(t, drainEvents(recorder), temporaliov1alpha1.ReasonTemporalStateFetchFailed)

_, ok = r.TemporalClientPool.GetSDKClient(poolKey)
require.False(t, ok, "poisoned client must be evicted so the next reconcile dials a fresh one")
}

// ─── executeK8sOperations tests ──────────────────────────────────────────────

func TestExecuteK8sOperations_EmitsEventOnFailure(t *testing.T) {
Expand Down Expand Up @@ -818,3 +907,61 @@ func TestUpdateVersionConfig_EmitsEventOnFailure(t *testing.T) {
})
}
}

// TestHandleDeletion_EvictsCachedClientOnTemporalFailure verifies that
// transport-level failures inside handleDeletion evict the cached SDK client.
// NotFound is treated as "already cleaned up" and must not evict.
func TestHandleDeletion_EvictsCachedClientOnTemporalFailure(t *testing.T) {
t.Run("DescribeError_EvictsClient", func(t *testing.T) {
k8sNamespace := "default"
hostPort := "localhost:7233"

conn := makeNoCredsConnection("my-conn", k8sNamespace, hostPort)
twd := makeWD("del-worker", k8sNamespace, conn.Name)

r, _ := newTestReconciler([]client.Object{twd, conn})

poolKey := noCredsPoolKey(conn.Spec.HostPort, twd.Spec.WorkerOptions.TemporalNamespace)
poisoned := &stubTemporalClient{
wdClient: &stubWDClient{handle: &stubWDHandle{
describeErr: context.DeadlineExceeded,
}},
}
r.TemporalClientPool.SetClientForTesting(poolKey, poisoned)

// Sanity: the poisoned client is what handleDeletion will pick up.
cached, ok := r.TemporalClientPool.GetSDKClient(poolKey)
require.True(t, ok, "poisoned client should be cached before handleDeletion runs")
require.Same(t, poisoned, cached)

err := r.handleDeletion(context.Background(), logr.Discard(), twd)
require.Error(t, err, "handleDeletion must surface the Temporal Describe error")
require.ErrorIs(t, err, context.DeadlineExceeded, "the original error must propagate so the reconciler requeues")

_, ok = r.TemporalClientPool.GetSDKClient(poolKey)
require.False(t, ok, "poisoned client must be evicted from the pool after a Temporal-server-side failure so the next reconcile dials a fresh one")
})

t.Run("DescribeNotFound_RetainsClient", func(t *testing.T) {
// NotFound on Describe is treated as success (nothing to clean up).
// The cached client is healthy and must not be evicted.
k8sNamespace := "default"
hostPort := "localhost:7233"

conn := makeNoCredsConnection("my-conn", k8sNamespace, hostPort)
twd := makeWD("del-worker", k8sNamespace, conn.Name)

r, _ := newTestReconciler([]client.Object{twd, conn})

poolKey := noCredsPoolKey(conn.Spec.HostPort, twd.Spec.WorkerOptions.TemporalNamespace)
healthy := newStubTemporalClient(nil) // describeErr=&serviceerror.NotFound{}
r.TemporalClientPool.SetClientForTesting(poolKey, healthy)

err := r.handleDeletion(context.Background(), logr.Discard(), twd)
require.NoError(t, err, "Describe returning NotFound must be treated as success")

cached, ok := r.TemporalClientPool.GetSDKClient(poolKey)
require.True(t, ok, "a healthy cached client must remain in the pool after a successful handleDeletion")
require.Same(t, healthy, cached)
})
}
35 changes: 30 additions & 5 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (r *WorkerDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
getControllerIdentity(),
)
if err != nil {
if isAccessDeniedErr(err) {
if shouldEvictClient(err) {
r.TemporalClientPool.EvictClient(clientPoolKey)
}
var rateLimitErr *serviceerror.ResourceExhausted
Expand Down Expand Up @@ -367,7 +367,7 @@ func (r *WorkerDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req

// Execute the plan, handling any errors
if err := r.executePlan(ctx, l, &workerDeploy, temporalClient, plan); err != nil {
if isAccessDeniedErr(err) {
if shouldEvictClient(err) {
r.TemporalClientPool.EvictClient(clientPoolKey)
}
r.recordWarningAndSetBlocked(ctx, &workerDeploy,
Expand Down Expand Up @@ -545,7 +545,7 @@ func (r *WorkerDeploymentReconciler) handleDeletion(
ctx context.Context,
l logr.Logger,
workerDeploy *temporaliov1alpha1.WorkerDeployment,
) error {
) (retErr error) {
// Resolve Connection.
// The Connection is guaranteed to exist because we hold a finalizer on it
// that prevents deletion while any WD references it.
Expand All @@ -562,12 +562,14 @@ func (r *WorkerDeploymentReconciler) handleDeletion(
return fmt.Errorf("unable to resolve auth secret name: %w", err)
}

temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{
clientPoolKey := clientpool.ClientPoolKey{
HostPort: temporalConnection.Spec.HostPort,
Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
SecretName: secretName,
AuthMode: authMode,
})
}

temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientPoolKey)
if !ok {
clientOpts, key, clientAuth, err := r.TemporalClientPool.ParseClientSecret(ctx, secretName, authMode, clientpool.NewClientOptions{
K8sNamespace: workerDeploy.Namespace,
Expand All @@ -585,6 +587,15 @@ func (r *WorkerDeploymentReconciler) handleDeletion(
temporalClient = c
}

// Evict cached SDK clients on failures that indicate the cached client may
// no longer be usable. Domain responses such as NotFound are handled as
// success below and should not churn otherwise healthy clients.
defer func() {
if shouldEvictClient(retErr) {
r.TemporalClientPool.EvictClient(clientPoolKey)
}
}()

workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy)
deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(workerDeploymentName)

Expand Down Expand Up @@ -927,3 +938,17 @@ func isAccessDeniedErr(err error) bool {
}
return grpcstatus.Code(err) == codes.Unauthenticated
}

func shouldEvictClient(err error) bool {
if err == nil {
return false
}
if isAccessDeniedErr(err) {
return true
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
var unavailable *serviceerror.Unavailable
return errors.As(err, &unavailable)
}