Skip to content

Commit 76b3b68

Browse files
committed
Fix listview task channel handling when task is replaced
When Upsert is called with a new TaskDetails for an existing task (e.g., due to retry logic), the old channel was orphaned. The caller blocked on waitForResultOrTimeout would never receive a response. Changes: - Modified Upsert to return (TaskDetails, bool) so caller can detect when a task was replaced and send an error to the old channel - AddTask now sends error to old channel when replacing a task to unblock waiters - Added thread-safe locking to Count() method - Added comprehensive unit tests for TaskMap and listview functions The race condition between processTaskUpdate and reportErrorOnAllPendingTasks is handled correctly by existing non-blocking sends with buffer size 1.
1 parent c5c497b commit 76b3b68

File tree

4 files changed

+1004
-33
lines changed

4 files changed

+1004
-33
lines changed

pkg/common/cns-lib/volume/listview.go

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -209,11 +209,22 @@ func (l *ListViewImpl) AddTask(ctx context.Context, taskMoRef types.ManagedObjec
209209
return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef)
210210
}
211211

212-
l.taskMap.Upsert(taskMoRef, TaskDetails{
212+
oldDetails, existed := l.taskMap.Upsert(taskMoRef, TaskDetails{
213213
Reference: taskMoRef,
214214
MarkedForRemoval: false,
215215
ResultCh: ch,
216216
})
217+
// If task was replaced, send error to old channel to unblock waiters
218+
if existed && oldDetails.ResultCh != nil && oldDetails.ResultCh != ch {
219+
log.Warnf("replacing existing task %+v in map, sending error to old channel", taskMoRef)
220+
select {
221+
case oldDetails.ResultCh <- TaskResult{
222+
TaskInfo: nil,
223+
Err: fmt.Errorf("task %v was replaced by a new request", taskMoRef),
224+
}:
225+
default:
226+
}
227+
}
217228
log.Debugf("task %+v added to map", taskMoRef)
218229
log.Infof("client is valid. trying to add task to listview object")
219230

@@ -403,25 +414,20 @@ func (l *ListViewImpl) listenToTaskUpdates() {
403414
}
404415
}
405416

406-
// reportErrorOnAllPendingTasks returns failure to all pending tasks in the map in case of vc failure
417+
// reportErrorOnAllPendingTasks returns failure to all pending tasks in case of vc failure.
407418
func (l *ListViewImpl) reportErrorOnAllPendingTasks(err error) {
408419
log := logger.GetLogger(context.Background())
409420
for _, taskDetails := range l.taskMap.GetAll() {
410-
result := TaskResult{
411-
TaskInfo: nil,
412-
Err: err,
413-
}
414-
// Non-blocking send
415421
select {
416-
case taskDetails.ResultCh <- result:
422+
case taskDetails.ResultCh <- TaskResult{TaskInfo: nil, Err: err}:
417423
log.Infof("reported error for task %+v", taskDetails.Reference)
418424
default:
419425
log.Warnf("failed to report error for task %+v: channel blocked", taskDetails.Reference)
420426
}
421427
}
422428
}
423429

424-
// processTaskUpdate is processes each task update in a separate goroutine
430+
// processTaskUpdate processes each task update in a separate goroutine.
425431
func (l *ListViewImpl) processTaskUpdate(prop types.PropertyChange) {
426432
log := logger.GetLogger(l.ctx)
427433
log.Infof("processTaskUpdate for property change update: %+v", prop)
@@ -433,29 +439,26 @@ func (l *ListViewImpl) processTaskUpdate(prop types.PropertyChange) {
433439
if taskInfo.State == types.TaskInfoStateQueued || taskInfo.State == types.TaskInfoStateRunning {
434440
return
435441
}
436-
result := TaskResult{}
442+
437443
taskDetails, ok := l.taskMap.Get(taskInfo.Task)
438444
if !ok {
439-
// if vc sends a duplicate success event for a task,
440-
// and we've already processed an earlier success event for the same task
441-
// and removed it from our map, we will see this error
442-
log.Errorf("failed to retrieve receiver channel for task %+v", taskInfo.Task)
445+
log.Debugf("task %+v not found in map, skipping update", taskInfo.Task)
443446
return
444-
} else if taskInfo.State == types.TaskInfoStateError {
447+
}
448+
449+
result := TaskResult{}
450+
if taskInfo.State == types.TaskInfoStateError {
445451
result.TaskInfo = nil
446452
result.Err = errors.New(taskInfo.Error.LocalizedMessage)
447453
} else {
448454
result.TaskInfo = &taskInfo
449455
result.Err = nil
450456
}
451-
// Use a non-blocking send to prevent deadlocks when multiple goroutines
452-
// try to send to the same channel (e.g., due to duplicate task updates from vSphere)
457+
453458
select {
454459
case taskDetails.ResultCh <- result:
455460
log.Infof("Successfully sent task result for task %+v", taskInfo.Task)
456461
default:
457-
// Channel is full/blocked, which means another goroutine already sent the result
458-
// This can happen when vSphere sends duplicate task update events
459462
log.Warnf("result channel full for task %+v, ignoring duplicate update", taskInfo.Task)
460463
}
461464
}

0 commit comments

Comments
 (0)