Skip to content

Commit b953b88

Browse files
authored
refactor(shared): improve replicateAsyncIterator (#923)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - Bug Fixes - Replicated async streams now propagate errors to all consumers reliably and coordinate termination to avoid premature stops or redundant cancellations. - Performance - Per-consumer buffering improves handling of multiple concurrent consumers, reducing contention and preventing dropped items. - Chores - Internal lifecycle and coordination logic streamlined for more robust behavior without changing public APIs. - Notes - No user-facing API changes. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent dc23561 commit b953b88

1 file changed

Lines changed: 32 additions & 35 deletions

File tree

packages/shared/src/iterator.ts

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -101,69 +101,66 @@ export function replicateAsyncIterator<T, TReturn, TNext>(
101101
source: AsyncIterator<T, TReturn, TNext>,
102102
count: number,
103103
): (AsyncIteratorClass<T, TReturn, TNext>)[] {
104-
const queue = new AsyncIdQueue<IteratorResult<T, TReturn>>()
104+
const queue = new AsyncIdQueue<
105+
{ next: IteratorResult<T, TReturn> } | { next?: never, error: unknown }
106+
>()
105107

106-
const replicated: AsyncIteratorClass<T, TReturn, TNext>[] = []
107-
108-
let error: undefined | { value: unknown }
108+
const ids = Array.from({ length: count }, (_, i) => i.toString())
109+
let isSourceFinished = false
109110

110111
const start = once(async () => {
111112
try {
112113
while (true) {
113114
const item = await source.next()
114115

115-
for (let i = 0; i < count; i++) {
116-
const id = i.toString()
117-
116+
ids.forEach((id) => {
118117
if (queue.isOpen(id)) {
119-
queue.push(id, item)
118+
queue.push(id, { next: item })
120119
}
121-
}
120+
})
122121

123122
if (item.done) {
124123
break
125124
}
126125
}
127126
}
128-
catch (reason) {
129-
error = { value: reason }
130-
131-
queue.waiterIds.forEach((id) => {
132-
queue.close({ id, reason })
127+
catch (error) {
128+
ids.forEach((id) => {
129+
if (queue.isOpen(id)) {
130+
queue.push(id, { error })
131+
}
133132
})
134133
}
134+
finally {
135+
isSourceFinished = true
136+
}
135137
})
136138

137-
for (let i = 0; i < count; i++) {
138-
const id = i.toString()
139-
139+
const replicated: AsyncIteratorClass<T, TReturn, TNext>[] = ids.map((id) => {
140140
queue.open(id)
141-
replicated.push(new AsyncIteratorClass(
142-
() => {
141+
142+
return new AsyncIteratorClass(
143+
async () => {
143144
start()
144145

145-
return new Promise((resolve, reject) => {
146-
if (!error || queue.hasBufferedItems(id)) {
147-
queue.pull(id)
148-
.then(resolve)
149-
.catch(reject)
150-
}
151-
else {
152-
reject(error.value)
153-
}
154-
})
146+
const item = await queue.pull(id)
147+
148+
if (item.next) {
149+
return item.next
150+
}
151+
152+
throw item.error
155153
},
156154
async (reason) => {
157155
queue.close({ id })
158156

159-
if (reason !== 'next') {
160-
if (!queue.length) {
161-
await source?.return?.()
162-
}
157+
if (reason !== 'next' && !queue.length && !isSourceFinished) {
158+
isSourceFinished = true
159+
await source?.return?.()
163160
}
164161
},
165-
))
166-
}
162+
)
163+
})
167164

168165
return replicated
169166
}

0 commit comments

Comments
 (0)