Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions pkg/common/cns-lib/volume/listview.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,22 @@ func (l *ListViewImpl) AddTask(ctx context.Context, taskMoRef types.ManagedObjec
return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef)
}

l.taskMap.Upsert(taskMoRef, TaskDetails{
oldDetails, existed := l.taskMap.Upsert(taskMoRef, TaskDetails{
Reference: taskMoRef,
MarkedForRemoval: false,
ResultCh: ch,
})
// If task was replaced, send error to old channel to unblock waiters
if existed && oldDetails.ResultCh != nil && oldDetails.ResultCh != ch {
log.Warnf("replacing existing task %+v in map, sending error to old channel", taskMoRef)
select {
case oldDetails.ResultCh <- TaskResult{
TaskInfo: nil,
Err: fmt.Errorf("task %v was replaced by a new request", taskMoRef),
}:
default:
}
}
log.Debugf("task %+v added to map", taskMoRef)
log.Infof("client is valid. trying to add task to listview object")

Expand Down Expand Up @@ -403,25 +414,20 @@ func (l *ListViewImpl) listenToTaskUpdates() {
}
}

// reportErrorOnAllPendingTasks returns failure to all pending tasks in the map in case of vc failure
// reportErrorOnAllPendingTasks returns failure to all pending tasks in case of vc failure.
func (l *ListViewImpl) reportErrorOnAllPendingTasks(err error) {
log := logger.GetLogger(context.Background())
for _, taskDetails := range l.taskMap.GetAll() {
result := TaskResult{
TaskInfo: nil,
Err: err,
}
// Non-blocking send
select {
case taskDetails.ResultCh <- result:
case taskDetails.ResultCh <- TaskResult{TaskInfo: nil, Err: err}:
log.Infof("reported error for task %+v", taskDetails.Reference)
default:
log.Warnf("failed to report error for task %+v: channel blocked", taskDetails.Reference)
}
}
}

// processTaskUpdate is processes each task update in a separate goroutine
// processTaskUpdate processes each task update in a separate goroutine.
func (l *ListViewImpl) processTaskUpdate(prop types.PropertyChange) {
log := logger.GetLogger(l.ctx)
log.Infof("processTaskUpdate for property change update: %+v", prop)
Expand All @@ -433,29 +439,26 @@ func (l *ListViewImpl) processTaskUpdate(prop types.PropertyChange) {
if taskInfo.State == types.TaskInfoStateQueued || taskInfo.State == types.TaskInfoStateRunning {
return
}
result := TaskResult{}

taskDetails, ok := l.taskMap.Get(taskInfo.Task)
if !ok {
// if vc sends a duplicate success event for a task,
// and we've already processed an earlier success event for the same task
// and removed it from our map, we will see this error
log.Errorf("failed to retrieve receiver channel for task %+v", taskInfo.Task)
log.Debugf("task %+v not found in map, skipping update", taskInfo.Task)
return
} else if taskInfo.State == types.TaskInfoStateError {
}

result := TaskResult{}
if taskInfo.State == types.TaskInfoStateError {
result.TaskInfo = nil
result.Err = errors.New(taskInfo.Error.LocalizedMessage)
} else {
result.TaskInfo = &taskInfo
result.Err = nil
}
// Use a non-blocking send to prevent deadlocks when multiple goroutines
// try to send to the same channel (e.g., due to duplicate task updates from vSphere)

select {
case taskDetails.ResultCh <- result:
log.Infof("Successfully sent task result for task %+v", taskInfo.Task)
default:
// Channel is full/blocked, which means another goroutine already sent the result
// This can happen when vSphere sends duplicate task update events
log.Warnf("result channel full for task %+v, ignoring duplicate update", taskInfo.Task)
}
}
Expand Down
Loading