Bug Report: Zip + Future Deadlock
Summary
The ro library has a critical bug where combining Zip (or Zip2-Zip6) with Future observables causes a deadlock. The Zip observable never emits a value and the program hangs indefinitely.
Root Cause
In operator_combining.go line 1137, the zipInnerSubscription function has a bug in its OnComplete handler:
func(ctx context.Context) {
mu.Lock()
*completed = true
if len(*values) == 0 {
mu.Unlock()
destination.CompleteWithContext(ctx)
} else {
mu.Unlock()
}
subscriptions.Unsubscribe() // ⚠️ BUG: Unsubscribes ALL observables!
},
When any single observable completes, it calls subscriptions.Unsubscribe() which unsubscribes ALL the observables in the Zip, not just the one that completed.
Since Future observables emit one value and then immediately complete, the first Future to complete will cancel all the other Futures in the Zip before they can emit their values. This prevents the Zip from ever collecting all values and emitting the combined tuple.
Minimal Reproduction
The program will hang indefinitely. You'll see all 6 Futures emit their values, but the Zip never fires OnNext, OnComplete, or OnError. The program must be killed after timing out.
Here's a standalone version:
package main
import (
"fmt"
"time"
"github.com/samber/lo"
"github.com/samber/ro"
)
func main() {
fmt.Println("Creating 6 Future observables that will be combined with Zip6...")
// Create 6 simple Future observables - each emits one value and completes
obs1 := ro.Future(func() (int, error) {
fmt.Println(" Future 1: Emitting value 1")
return 1, nil
})
obs2 := ro.Future(func() (int, error) {
fmt.Println(" Future 2: Emitting value 2")
return 2, nil
})
obs3 := ro.Future(func() (int, error) {
fmt.Println(" Future 3: Emitting value 3")
return 3, nil
})
obs4 := ro.Future(func() (int, error) {
fmt.Println(" Future 4: Emitting value 4")
return 4, nil
})
obs5 := ro.Future(func() (int, error) {
fmt.Println(" Future 5: Emitting value 5")
return 5, nil
})
obs6 := ro.Future(func() (int, error) {
fmt.Println(" Future 6: Emitting value 6")
return 6, nil
})
// Combine with Zip6
zipped := ro.Zip6(obs1, obs2, obs3, obs4, obs5, obs6)
resultChan := make(chan string, 1)
timeout := time.After(2 * time.Second)
// Subscribe
sub := zipped.Subscribe(
ro.NewObserver(
func(tuple lo.Tuple6[int, int, int, int, int, int]) {
fmt.Println("✅ SUCCESS: OnNext fired with values:", tuple)
resultChan <- fmt.Sprintf("%v", tuple)
},
func(err error) {
fmt.Println("❌ ERROR:", err)
},
func() {
fmt.Println("⚠️ OnComplete fired (without emitting a value)")
},
),
)
defer sub.Unsubscribe()
// Wait for result
select {
case result := <-resultChan:
fmt.Println("✅ Test PASSED:", result)
case <-timeout:
fmt.Println("❌ Test FAILED: Timeout - Zip never emitted despite all Futures completing")
}
}
Expected behavior:
✅ SUCCESS: OnNext fired with values: {1 2 3 4 5 6}
✅ Test PASSED: {1 2 3 4 5 6}
Actual behavior:
All Futures emit their values, but the Zip never fires OnNext. The program hangs until timeout:
Creating 6 Future observables that will be combined with Zip6...
Combining with Zip6 and subscribing...
Future 1: Starting...
Future 2: Starting...
Future 3: Starting...
Future 4: Starting...
Future 5: Starting...
Future 6: Starting...
Future 1: Emitting value 1
Future 3: Emitting value 3
Future 2: Emitting value 2
Future 4: Emitting value 4
Future 5: Emitting value 5
Future 6: Emitting value 6
❌ Test FAILED: Timeout after 2 seconds
This is the classic symptom of the Zip6 + Future deadlock.
Impact
This bug affects any code using:
ro.Zip, ro.Zip2, ro.Zip3, ro.Zip4, ro.Zip5, ro.Zip6 with ro.Future observables
- Potentially affects other combining operators that share the same subscription management pattern
Suggested Fix
The OnComplete handler should NOT unsubscribe all observables. It should only mark that specific observable as completed. The Zip's teardown function should handle cleanup when all observables have been processed or when an error occurs.
Possible fix in operator_combining.go:
func(ctx context.Context) {
mu.Lock()
*completed = true
// Check if we should complete the destination
if len(*values) == 0 {
// No more values from this observable and it's completed
// Check if ALL observables are completed
allCompleted := true
// ... check logic for all observables ...
if allCompleted {
mu.Unlock()
destination.CompleteWithContext(ctx)
subscriptions.Unsubscribe() // Only unsubscribe when ALL are done
return
}
}
mu.Unlock()
// Don't unsubscribe here - other observables may still be emitting
},
Bug Report: Zip + Future Deadlock
Summary
The
rolibrary has a critical bug where combiningZip(orZip2-Zip6) withFutureobservables causes a deadlock. The Zip observable never emits a value and the program hangs indefinitely.Root Cause
In
operator_combining.goline 1137, thezipInnerSubscriptionfunction has a bug in itsOnCompletehandler:When any single observable completes, it calls
subscriptions.Unsubscribe()which unsubscribes ALL the observables in the Zip, not just the one that completed.Since
Futureobservables emit one value and then immediately complete, the first Future to complete will cancel all the other Futures in the Zip before they can emit their values. This prevents the Zip from ever collecting all values and emitting the combined tuple.Minimal Reproduction
The program will hang indefinitely. You'll see all 6 Futures emit their values, but the Zip never fires OnNext, OnComplete, or OnError. The program must be killed after timing out.
Here's a standalone version:
Expected behavior:
Actual behavior:
All Futures emit their values, but the Zip never fires OnNext. The program hangs until timeout:
Impact
This bug affects any code using:
ro.Zip,ro.Zip2,ro.Zip3,ro.Zip4,ro.Zip5,ro.Zip6withro.FutureobservablesSuggested Fix
The
OnCompletehandler should NOT unsubscribe all observables. It should only mark that specific observable as completed. The Zip's teardown function should handle cleanup when all observables have been processed or when an error occurs.Possible fix in
operator_combining.go: