Skip to content

Commit bc1a54e

Browse files
committed
Initial version
1 parent dbae0d4 commit bc1a54e

7 files changed

Lines changed: 291 additions & 0 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
name: CI
2+
3+
on: [push]
4+
5+
jobs:
6+
build:
7+
runs-on: ubuntu-latest
8+
9+
steps:
10+
- uses: actions/checkout@v4
11+
- name: Setup Go
12+
uses: actions/setup-go@v5
13+
with:
14+
go-version: '1.24'
15+
- name: Install dependencies
16+
run: go get .
17+
- name: Build
18+
run: go build -v ./...
19+
- name: Test with the Go CLI
20+
run: go test
21+

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# demux
2+
3+
[![CI](https://github.com/floatdrop/demux/actions/workflows/ci.yaml/badge.svg)](https://github.com/floatdrop/demux/actions/workflows/ci.yaml)
4+
[![Go Report Card](https://goreportcard.com/badge/github.com/floatdrop/demux)](https://goreportcard.com/report/github.com/floatdrop/demux)
5+
[![Go Reference](https://pkg.go.dev/badge/github.com/floatdrop/demux.svg)](https://pkg.go.dev/github.com/floatdrop/demux)
6+
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
7+
8+
## Installation
9+
10+
```bash
11+
go get github.com/floatdrop/demux
12+
```
13+
14+
## Contributing
15+
16+
Contributions are welcome! Please feel free to submit a Pull Request.
17+
18+
## License
19+
20+
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

dynamic.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package demux
2+
3+
// Dynamic creates dynamic demultiplexer that routes items from 'in' based on keys returned by 'keyFunc'.
4+
// For each unique key, a new goroutine is spawned running 'consumeFunc'.
5+
// Each consumeFunc receives a channel that delivers values matching its key.
6+
func Dynamic[T any, K comparable](
7+
in <-chan T,
8+
keyFunc func(T) K,
9+
consumeFunc func(K, <-chan T),
10+
) {
11+
outChans := make(map[K]chan T)
12+
defer func() {
13+
for _, ch := range outChans {
14+
close(ch)
15+
}
16+
}()
17+
18+
for t := range in {
19+
key := keyFunc(t)
20+
ch, exists := outChans[key]
21+
if !exists {
22+
ch = make(chan T)
23+
outChans[key] = ch
24+
25+
go consumeFunc(key, ch)
26+
}
27+
ch <- t
28+
}
29+
}

dynamic_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package demux_test
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
8+
"github.com/floatdrop/demux"
9+
)
10+
11+
func TestDynamic_BasicDemuxing(t *testing.T) {
12+
type input struct {
13+
id int
14+
data string
15+
}
16+
17+
in := make(chan input)
18+
var mu sync.Mutex
19+
result := make(map[int][]string)
20+
21+
go demux.Dynamic(in,
22+
func(i input) int { return i.id },
23+
func(k int, ch <-chan input) {
24+
var items []string
25+
for item := range ch {
26+
items = append(items, item.data)
27+
}
28+
mu.Lock()
29+
result[k] = items
30+
mu.Unlock()
31+
})
32+
33+
// Send data
34+
go func() {
35+
in <- input{id: 1, data: "a"}
36+
in <- input{id: 2, data: "b"}
37+
in <- input{id: 1, data: "c"}
38+
in <- input{id: 2, data: "d"}
39+
close(in)
40+
}()
41+
42+
time.Sleep(100 * time.Millisecond)
43+
44+
// Assertions
45+
if len(result) != 2 {
46+
t.Fatalf("expected 2 keys, got %d", len(result))
47+
}
48+
if len(result[1]) != 2 || result[1][0] != "a" || result[1][1] != "c" {
49+
t.Errorf("unexpected data for key 1: %v", result[1])
50+
}
51+
if len(result[2]) != 2 || result[2][0] != "b" || result[2][1] != "d" {
52+
t.Errorf("unexpected data for key 2: %v", result[2])
53+
}
54+
}
55+
56+
func TestDynamic_SingleKey(t *testing.T) {
57+
in := make(chan int)
58+
var collected []int
59+
done := make(chan struct{})
60+
61+
go demux.Dynamic(
62+
in,
63+
func(i int) string { return "only" },
64+
func(k string, ch <-chan int) {
65+
for i := range ch {
66+
collected = append(collected, i)
67+
}
68+
close(done)
69+
})
70+
71+
go func() {
72+
in <- 10
73+
in <- 20
74+
in <- 30
75+
close(in)
76+
}()
77+
78+
select {
79+
case <-done:
80+
case <-time.After(time.Second):
81+
t.Fatal("Timeout waiting for consumer")
82+
}
83+
84+
expected := []int{10, 20, 30}
85+
for i, val := range expected {
86+
if collected[i] != val {
87+
t.Errorf("expected %d at index %d, got %d", val, i, collected[i])
88+
}
89+
}
90+
}
91+
92+
func TestDynamic_NoInput(t *testing.T) {
93+
in := make(chan string)
94+
called := false
95+
96+
go demux.Dynamic(
97+
in,
98+
func(s string) int { return 0 },
99+
func(k int, ch <-chan string) {
100+
called = true
101+
},
102+
)
103+
104+
close(in)
105+
106+
time.Sleep(100 * time.Millisecond)
107+
if called {
108+
t.Error("consumeFunc should not be called for empty input")
109+
}
110+
}

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module github.com/floatdrop/demux
2+
3+
go 1.24.4

static.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package demux
2+
3+
// Static creates Static demultipler that routes each element to channel in channels map by key
4+
// computed by keyFunc.
5+
func Static[T any, K comparable](in <-chan T, keyFunc func(T) K, channels map[K]chan<- T) {
6+
defer func() {
7+
for _, ch := range channels {
8+
close(ch)
9+
}
10+
}()
11+
12+
for t := range in {
13+
k := keyFunc(t)
14+
if ch, ok := channels[k]; ok {
15+
ch <- t
16+
}
17+
}
18+
}

static_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package demux_test
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
8+
"github.com/floatdrop/demux"
9+
)
10+
11+
func TestStatic_RoutesToCorrectChannels(t *testing.T) {
12+
in := make(chan int)
13+
outA := make(chan int, 2)
14+
outB := make(chan int, 2)
15+
16+
channels := map[string]chan<- int{
17+
"A": outA,
18+
"B": outB,
19+
}
20+
21+
keyFunc := func(i int) string {
22+
if i%2 == 0 {
23+
return "A"
24+
}
25+
return "B"
26+
}
27+
28+
var wg sync.WaitGroup
29+
wg.Add(1)
30+
31+
go func() {
32+
defer wg.Done()
33+
demux.Static(in, keyFunc, channels)
34+
}()
35+
36+
// Send data
37+
go func() {
38+
in <- 1
39+
in <- 2
40+
in <- 3
41+
in <- 4
42+
close(in)
43+
}()
44+
45+
wg.Wait()
46+
47+
// Collect output
48+
var aValues, bValues []int
49+
50+
done := make(chan struct{})
51+
go func() {
52+
for v := range outA {
53+
aValues = append(aValues, v)
54+
}
55+
for v := range outB {
56+
bValues = append(bValues, v)
57+
}
58+
close(done)
59+
}()
60+
61+
select {
62+
case <-done:
63+
case <-time.After(2 * time.Second):
64+
t.Fatal("Test timed out")
65+
}
66+
67+
// Validate
68+
expectedA := []int{2, 4}
69+
expectedB := []int{1, 3}
70+
71+
if !equal(aValues, expectedA) {
72+
t.Errorf("expected A channel values %v, got %v", expectedA, aValues)
73+
}
74+
if !equal(bValues, expectedB) {
75+
t.Errorf("expected B channel values %v, got %v", expectedB, bValues)
76+
}
77+
}
78+
79+
// Helper for slice comparison
80+
func equal[T comparable](a, b []T) bool {
81+
if len(a) != len(b) {
82+
return false
83+
}
84+
for i, v := range a {
85+
if v != b[i] {
86+
return false
87+
}
88+
}
89+
return true
90+
}

0 commit comments

Comments
 (0)