Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions doc/api/diagnostics_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,136 @@ channels.asyncStart.bindStore(myStore, (data) => {
});
```

#### `tracingChannel.traceIterator(fn[, context[, thisArg[, ...args]]])`

<!-- YAML
added: REPLACEME
-->

* `fn` {Function} Iterator or async iterator returning function to wrap a trace
around
* `context` {Object} Shared object to correlate trace events through
* `thisArg` {any} The receiver to be used for the function call
* `...args` {any} Optional arguments to pass to the function
* Returns: {Iterator|AsyncIterator|Promise} The iterator returned by the given
function, or a {Promise} resolving to it if the function is async

Trace an iterator-returning function call. This will always produce a
[`start` event][] and [`end` event][] around the synchronous portion of the
function execution. If the given function returns a promise (i.e. is an async
function), it will additionally produce an [`asyncStart` event][] and
[`asyncEnd` event][] when the promise resolves to the iterator.

Each call to `next()`, `return()`, or `throw()` on the returned iterator is
also traced via a sub-channel derived from the tracing channel name by appending
`:next`. For example, if the tracing channel is named `my-channel`, the
sub-channel will be `my-channel:next`. These calls follow the same event
pattern as the outer function call: [`start` event][] and [`end` event][] for
synchronous results, plus [`asyncStart` event][] and [`asyncEnd` event][] if
the method returns a promise (e.g. when iterating an async iterator). An
[`error` event][] is produced if `next()` throws or the iterator method rejects.

To ensure only correct trace graphs are formed, events will only be published
if subscribers are present prior to starting the trace. Subscriptions which are
added after the trace begins will not receive future events from that trace,
only future traces will be seen.

```mjs
import diagnostics_channel from 'node:diagnostics_channel';

const channels = diagnostics_channel.tracingChannel('my-channel');

// Sync function returning a sync iterator.
// Fires start/end on 'my-channel'; fires start/end on 'my-channel:next'
// for each next() call.
for (const value of channels.traceIterator(function*() {
yield 1;
yield 2;
}, { some: 'thing' })) {
// consume values
}

// Sync call to an async generator function, returning an AsyncIterator.
// Fires start/end on 'my-channel'; fires start/end/asyncStart/asyncEnd on
// 'my-channel:next' for each next() call because next() returns a Promise.
for await (const value of channels.traceIterator(async function*() {
yield 1;
yield 2;
}, { some: 'thing' })) {
// consume values
}

// Async function returning a sync iterator.
// Fires start/end/asyncStart/asyncEnd on 'my-channel' when the Promise
// resolves; fires start/end on 'my-channel:next' for each next() call.
const iter = await channels.traceIterator(async function() {
return [1, 2].values();
}, { some: 'thing' });
for (const value of iter) {
// consume values
}

// Async function returning an async iterator.
// Fires start/end/asyncStart/asyncEnd on 'my-channel' when the Promise
// resolves; fires start/end/asyncStart/asyncEnd on 'my-channel:next' for each
// next() call.
const asyncIter = await channels.traceIterator(async function() {
return (async function*() { yield 1; yield 2; })();
}, { some: 'thing' });
for await (const value of asyncIter) {
// consume values
}
```

```cjs
const diagnostics_channel = require('node:diagnostics_channel');

const channels = diagnostics_channel.tracingChannel('my-channel');

// Sync function returning a sync iterator.
// Fires start/end on 'my-channel'; fires start/end on 'my-channel:next'
// for each next() call.
for (const value of channels.traceIterator(function*() {
yield 1;
yield 2;
}, { some: 'thing' })) {
// consume values
}

(async () => {
// Sync call to an async generator function, returning an AsyncIterator.
// Fires start/end on 'my-channel'; fires start/end/asyncStart/asyncEnd on
// 'my-channel:next' for each next() call because next() returns a Promise.
for await (const value of channels.traceIterator(async function*() {
yield 1;
yield 2;
}, { some: 'thing' })) {
// consume values
}

// Async function returning a sync iterator.
// Fires start/end/asyncStart/asyncEnd on 'my-channel' when the Promise
// resolves; fires start/end on 'my-channel:next' for each next() call.
const iter = await channels.traceIterator(async function() {
return [1, 2].values();
}, { some: 'thing' });
for (const value of iter) {
// consume values
}

// Async function returning an async iterator.
// Fires start/end/asyncStart/asyncEnd on 'my-channel' when the Promise
// resolves; fires start/end/asyncStart/asyncEnd on 'my-channel:next' for
// each next() call.
const asyncIter = await channels.traceIterator(async function() {
return (async function*() { yield 1; yield 2; })();
}, { some: 'thing' });
for await (const value of asyncIter) {
// consume values
}
})();
```

#### `tracingChannel.hasSubscribers`

<!-- YAML
Expand Down
80 changes: 80 additions & 0 deletions lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ function tracingChannelFrom(nameOrChannels, name) {
}

class TracingChannel {
#nextChannel;

constructor(nameOrChannels) {
for (let i = 0; i < traceEvents.length; ++i) {
const eventName = traceEvents[i];
Expand Down Expand Up @@ -428,6 +430,84 @@ class TracingChannel {
}
});
}

traceIterator(fn, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
return ReflectApply(fn, thisArg, args);
}

const { start, end, asyncStart, asyncEnd, error } = this;

const nextChannel = this.#nextChannel ||= tracingChannel({
start: channel(start.name.slice(0, -6) + ':next:start'),
end: channel(end.name.slice(0, -4) + ':next:end'),
asyncStart: channel(asyncStart.name.slice(0, -11) + ':next:asyncStart'),
asyncEnd: channel(asyncEnd.name.slice(0, -9) + ':next:asyncEnd'),
error: channel(error.name.slice(0, -6) + ':next:error'),
});
Comment on lines +441 to +447
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge fan of magically deriving these from the main channels. Could we perhaps implement this traceIterator functionality as an entirely other class which we could simply construct with two separate TracingChannel instances for the overall execution and the per-yield execution?

I've been thinking we might want to split up TracingChannel into specializations anyway, something like: SyncTracingChannel, CallbackTracingChannel, PromiseTracingChannel, and a new IteratorTracingChannel?

Copy link
Contributor Author

@rochdev rochdev Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge fan of magically deriving these from the main channels. Could we perhaps implement this traceIterator functionality as an entirely other class which we could simply construct with two separate TracingChannel instances for the overall execution and the per-yield execution?

I agree and I'm also not a fan, but it felt like the simplest approach.

Are you saying to do instead something like this?

const ctx = {}
const iter = traceSync(fn, ctx)
return traceIterator(iter, ctx)

I thought about that, but I wasn't sure since it would be the first time that TracingChannel instruments anything that is not a function (I mean it still patches the functions on the iterator, but that's more magic than usual). It also makes it impossible to reimplement differently later, although in practice I'm not sure that would happen. And it would also potentially be inconsistent across implementors. One could use :yield, another could use :next, etc, and thenTracingChannel would lose the consistency it was built for, and subscribers could not be built generically.

Worth noting: I'm adding the same functionality to Orchestrion-JS right now and it's unclear whether the FunctionType should be Iterator or not. If this is split, then I guess it would probably be a separate IteratorType?

I've been thinking we might want to split up TracingChannel into specializations anyway, something like: SyncTracingChannel, CallbackTracingChannel, PromiseTracingChannel, and a new IteratorTracingChannel?

I prefer the one class because we often have to mix functionalities or use the underlying channels directly for example when functions accept a callback but can also return a promise or a sync value.


const wrapIter = (iter) => {
const { next: iterNext, return: iterReturn, throw: iterThrow } = iter;

iter.next = (...args) =>
nextChannel.#traceMaybePromise(iterNext, context, iter, ...args);
iter.return = (...args) =>
nextChannel.#traceMaybePromise(iterReturn, context, iter, ...args);
iter.throw = (...args) =>
nextChannel.#traceMaybePromise(iterThrow, context, iter, ...args);

return iter;
};

const result = this.#traceMaybePromise(fn, context, thisArg, ...args);

return result instanceof Promise ?
PromisePrototypeThen(result, wrapIter) :
wrapIter(result);
}

#traceMaybePromise(fn, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
return ReflectApply(fn, thisArg, args);
}

const { start, end, asyncStart, asyncEnd, error } = this;

function reject(err) {
context.error = err;
error.publish(context);
asyncStart.publish(context);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(context);
return PromiseReject(err);
}

function resolve(result) {
context.result = result;
asyncStart.publish(context);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(context);
return result;
}

return start.runStores(context, () => {
try {
const result = ReflectApply(fn, thisArg, args);
// TODO: Should tracePromise just always do this?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because thenables. That's why it does the PromiseResolve(...) currently, though that isn't actually quite right either. What we should be doing is using promise.then(resolve, reject) after rather than PromisePrototypeThen(result, resolve, reject) as that assumes it's a native promise, thus the PromiseResolve(...) to convert it into one if it is not one already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about functions that can return both though? Sometimes the caller handles that, and changing the return type may be unexpected. In some cases we end up having to manually do a traceSync and handle the promise. Relying on duck-typing might be more flexible no? It just feels weird to have this TracingChannel in Node when in at least half of cases we need to do everything manually because it's too rigid. I guess an alternative would be to keep the individual specialized ones and add a traceAny or similar that handles all cases automatically in a less strict way. It could even have an optional position parameter so that it can also do callbacks (so many cases where functions can be all 3 at the same time).

if (!(result instanceof Promise)) {
context.result = result;
return result;
}
return PromisePrototypeThen(result, resolve, reject);
} catch (err) {
context.error = err;
error.publish(context);
throw err;
} finally {
end.publish(context);
}
});
}
}

function tracingChannel(nameOrChannels) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');

const channel = dc.tracingChannel('test');
const nextChannel = dc.tracingChannel('test:next');

const expectedResult = { foo: 'bar' };
const input = { foo: 'bar' };
const thisArg = { baz: 'buz' };

function check(found) {
assert.deepStrictEqual(found, input);
}

function checkNextAsync(found) {
check(found);
assert.strictEqual(found.error, undefined);
assert.deepStrictEqual(found.result, { value: expectedResult, done: false });
}

// Async function* returns an AsyncGenerator synchronously, so no asyncStart/asyncEnd
// for the fn call itself
const handlers = {
start: common.mustCall(check),
end: common.mustCall(check),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustNotCall(),
};

// next() on an AsyncGenerator returns a Promise
const nextHandlers = {
start: common.mustCall(check),
end: common.mustCall(check),
asyncStart: common.mustCall(checkNextAsync),
asyncEnd: common.mustCall(checkNextAsync),
error: common.mustNotCall(),
};

channel.subscribe(handlers);
nextChannel.subscribe(nextHandlers);

const iter = channel.traceIterator(common.mustCall(async function*(value) {
assert.deepStrictEqual(this, thisArg);
yield value;
}), input, thisArg, expectedResult);

// next() returns a Promise since iter is an AsyncGenerator
iter.next().then(common.mustCall((result) => {
assert.deepStrictEqual(result, { value: expectedResult, done: false });
}));
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use strict';

const common = require('../common');
const dc = require('diagnostics_channel');

const channel = dc.tracingChannel('test');
const nextChannel = dc.tracingChannel('test:next');

const handlers = {
start: common.mustNotCall(),
end: common.mustNotCall(),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustNotCall(),
};

const nextHandlers = {
start: common.mustNotCall(),
end: common.mustNotCall(),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustNotCall(),
};

// Subscribe after traceIterator call - no events should fire for the iterator
// or for subsequent next() calls since the iterator was not wrapped
const iter = channel.traceIterator(function*() {
yield 1;
}, {});

channel.subscribe(handlers);
nextChannel.subscribe(nextHandlers);

iter.next();
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
'use strict';

const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');

const channel = dc.tracingChannel('test');
const nextChannel = dc.tracingChannel('test:next');

const expectedError = new Error('test');
const input = { foo: 'bar' };
const thisArg = { baz: 'buz' };

function check(found) {
assert.deepStrictEqual(found, input);
}

function checkError(found) {
check(found);
assert.deepStrictEqual(found.error, expectedError);
}

// Two traceIterator calls: one for next() error, one for throw() error
const handlers = {
start: common.mustCall(check, 2),
end: common.mustCall(check, 2),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustNotCall(),
};

// iter1: next() success + next() throws = start×2, end×2, error×1
// iter2: throw() throws = start×1, end×1, error×1
const nextHandlers = {
start: common.mustCall(check, 3),
end: common.mustCall(check, 3),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustCall(checkError, 2),
};

channel.subscribe(handlers);
nextChannel.subscribe(nextHandlers);

// Test next(): generator throws after the first yield
const iter1 = channel.traceIterator(common.mustCall(function*() {
assert.deepStrictEqual(this, thisArg);
yield 1;
throw expectedError;
}), input, thisArg);

assert.deepStrictEqual(iter1.next(), { value: 1, done: false });
assert.throws(() => iter1.next(), expectedError);

// Test throw(): propagates error through the iterator
const iter2 = channel.traceIterator(common.mustCall(function*() {
yield 1;
}), input, thisArg);

assert.throws(() => iter2.throw(expectedError), expectedError);
Loading
Loading