Skip to content

Zip + Future Deadlock #193

@Enrico2

Description

@Enrico2

Bug Report: Zip + Future Deadlock

Summary

The ro library has a critical bug where combining Zip (or Zip2-Zip6) with Future observables causes a deadlock. The Zip observable never emits a value and the program hangs indefinitely.

Root Cause

In operator_combining.go line 1137, the zipInnerSubscription function has a bug in its OnComplete handler:

func(ctx context.Context) {
    mu.Lock()
    *completed = true
    if len(*values) == 0 {
        mu.Unlock()
        destination.CompleteWithContext(ctx)
    } else {
        mu.Unlock()
    }
    subscriptions.Unsubscribe()  // ⚠️ BUG: Unsubscribes ALL observables!
},

When any single observable completes, it calls subscriptions.Unsubscribe() which unsubscribes ALL the observables in the Zip, not just the one that completed.

Since Future observables emit one value and then immediately complete, the first Future to complete will cancel all the other Futures in the Zip before they can emit their values. This prevents the Zip from ever collecting all values and emitting the combined tuple.

Minimal Reproduction

The program will hang indefinitely. You'll see all 6 Futures emit their values, but the Zip never fires OnNext, OnComplete, or OnError. The program must be killed after timing out.

Here's a standalone version:

package main

import (
	"fmt"
	"time"

	"github.com/samber/lo"
	"github.com/samber/ro"
)

func main() {
	fmt.Println("Creating 6 Future observables that will be combined with Zip6...")

	// Create 6 simple Future observables - each emits one value and completes
	obs1 := ro.Future(func() (int, error) {
		fmt.Println("  Future 1: Emitting value 1")
		return 1, nil
	})

	obs2 := ro.Future(func() (int, error) {
		fmt.Println("  Future 2: Emitting value 2")
		return 2, nil
	})

	obs3 := ro.Future(func() (int, error) {
		fmt.Println("  Future 3: Emitting value 3")
		return 3, nil
	})

	obs4 := ro.Future(func() (int, error) {
		fmt.Println("  Future 4: Emitting value 4")
		return 4, nil
	})

	obs5 := ro.Future(func() (int, error) {
		fmt.Println("  Future 5: Emitting value 5")
		return 5, nil
	})

	obs6 := ro.Future(func() (int, error) {
		fmt.Println("  Future 6: Emitting value 6")
		return 6, nil
	})

	// Combine with Zip6
	zipped := ro.Zip6(obs1, obs2, obs3, obs4, obs5, obs6)

	resultChan := make(chan string, 1)
	timeout := time.After(2 * time.Second)

	// Subscribe
	sub := zipped.Subscribe(
		ro.NewObserver(
			func(tuple lo.Tuple6[int, int, int, int, int, int]) {
				fmt.Println("✅ SUCCESS: OnNext fired with values:", tuple)
				resultChan <- fmt.Sprintf("%v", tuple)
			},
			func(err error) {
				fmt.Println("❌ ERROR:", err)
			},
			func() {
				fmt.Println("⚠️  OnComplete fired (without emitting a value)")
			},
		),
	)
	defer sub.Unsubscribe()

	// Wait for result
	select {
	case result := <-resultChan:
		fmt.Println("✅ Test PASSED:", result)
	case <-timeout:
		fmt.Println("❌ Test FAILED: Timeout - Zip never emitted despite all Futures completing")
	}
}

Expected behavior:

✅ SUCCESS: OnNext fired with values: {1 2 3 4 5 6}
✅ Test PASSED: {1 2 3 4 5 6}

Actual behavior:
All Futures emit their values, but the Zip never fires OnNext. The program hangs until timeout:

Creating 6 Future observables that will be combined with Zip6...
Combining with Zip6 and subscribing...

  Future 1: Starting...
  Future 2: Starting...
  Future 3: Starting...
  Future 4: Starting...
  Future 5: Starting...
  Future 6: Starting...
  Future 1: Emitting value 1
  Future 3: Emitting value 3
  Future 2: Emitting value 2
  Future 4: Emitting value 4
  Future 5: Emitting value 5
  Future 6: Emitting value 6

❌ Test FAILED: Timeout after 2 seconds
   This is the classic symptom of the Zip6 + Future deadlock.

Impact

This bug affects any code using:

  • ro.Zip, ro.Zip2, ro.Zip3, ro.Zip4, ro.Zip5, ro.Zip6 with ro.Future observables
  • Potentially affects other combining operators that share the same subscription management pattern

Suggested Fix

The OnComplete handler should NOT unsubscribe all observables. It should only mark that specific observable as completed. The Zip's teardown function should handle cleanup when all observables have been processed or when an error occurs.

Possible fix in operator_combining.go:

func(ctx context.Context) {
    mu.Lock()
    *completed = true

    // Check if we should complete the destination
    if len(*values) == 0 {
        // No more values from this observable and it's completed
        // Check if ALL observables are completed
        allCompleted := true
        // ... check logic for all observables ...

        if allCompleted {
            mu.Unlock()
            destination.CompleteWithContext(ctx)
            subscriptions.Unsubscribe()  // Only unsubscribe when ALL are done
            return
        }
    }

    mu.Unlock()
    // Don't unsubscribe here - other observables may still be emitting
},

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions