diff --git a/apps/server/src/diagnostics/ProcessDiagnostics.test.ts b/apps/server/src/diagnostics/ProcessDiagnostics.test.ts index 18a54326de..f8fbf79d32 100644 --- a/apps/server/src/diagnostics/ProcessDiagnostics.test.ts +++ b/apps/server/src/diagnostics/ProcessDiagnostics.test.ts @@ -1,8 +1,9 @@ -import { describe, expect, it } from "@effect/vitest"; +import { assert, describe, it } from "@effect/vitest"; import * as DateTime from "effect/DateTime"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; +import * as Schema from "effect/Schema"; import * as Sink from "effect/Sink"; import * as Stream from "effect/Stream"; import { ChildProcessSpawner } from "effect/unstable/process"; @@ -10,6 +11,7 @@ import { ChildProcessSpawner } from "effect/unstable/process"; import * as ProcessDiagnostics from "./ProcessDiagnostics.ts"; const encoder = new TextEncoder(); +const encodeJsonString = Schema.encodeUnknownSync(Schema.UnknownFromJsonString); function mockHandle(result: { readonly stdout?: string; @@ -41,7 +43,7 @@ describe("ProcessDiagnostics", () => { ].join("\n"), ); - expect(rows).toEqual([ + assert.deepEqual(rows, [ { pid: 10, ppid: 1, @@ -125,15 +127,21 @@ describe("ProcessDiagnostics", () => { ], }); - expect(diagnostics.serverPid).toBe(100); - expect(DateTime.formatIso(diagnostics.readAt)).toBe("2026-05-05T10:00:00.000Z"); - expect(diagnostics.processCount).toBe(2); - expect(diagnostics.totalRssBytes).toBe(6_000); - expect(diagnostics.totalCpuPercent).toBe(4.75); - expect(diagnostics.processes.map((process) => process.pid)).toEqual([101, 102]); - expect(diagnostics.processes.map((process) => process.depth)).toEqual([0, 1]); - expect(Option.getOrNull(diagnostics.processes[0]!.pgid)).toBe(100); - expect(diagnostics.processes[0]?.childPids).toEqual([102]); + assert.equal(diagnostics.serverPid, 100); + assert.equal(DateTime.formatIso(diagnostics.readAt), "2026-05-05T10:00:00.000Z"); + assert.equal(diagnostics.processCount, 2); + assert.equal(diagnostics.totalRssBytes, 6_000); + assert.equal(diagnostics.totalCpuPercent, 4.75); + assert.deepEqual( + diagnostics.processes.map((process) => process.pid), + [101, 102], + ); + assert.deepEqual( + diagnostics.processes.map((process) => process.depth), + [0, 1], + ); + assert.equal(Option.getOrNull(diagnostics.processes[0]!.pgid), 100); + assert.deepEqual(diagnostics.processes[0]?.childPids, [102]); }), ); @@ -176,7 +184,10 @@ describe("ProcessDiagnostics", () => { ], }); - expect(diagnostics.processes.map((process) => process.pid)).toEqual([101, 102, 103]); + assert.deepEqual( + diagnostics.processes.map((process) => process.pid), + [101, 102, 103], + ); }), ); @@ -184,9 +195,8 @@ describe("ProcessDiagnostics", () => { Effect.gen(function* () { const commands: Array<{ readonly command: string; readonly args: ReadonlyArray }> = []; - const spawnerLayer = Layer.succeed( - ChildProcessSpawner.ChildProcessSpawner, - ChildProcessSpawner.make((command) => { + const spawnerLayer = Layer.mock(ChildProcessSpawner.ChildProcessSpawner, { + spawn: (command) => { const childProcess = command as unknown as { readonly command: string; readonly args: ReadonlyArray; @@ -200,8 +210,8 @@ describe("ProcessDiagnostics", () => { ].join("\n"), }), ); - }), - ); + }, + }); const layer = ProcessDiagnostics.layer.pipe(Layer.provide(spawnerLayer)); const diagnostics = yield* Effect.service(ProcessDiagnostics.ProcessDiagnostics).pipe( @@ -209,8 +219,11 @@ describe("ProcessDiagnostics", () => { Effect.provide(layer), ); - expect(diagnostics.processes.map((process) => process.pid)).toEqual([4242]); - expect(commands).toEqual([ + assert.deepEqual( + diagnostics.processes.map((process) => process.pid), + [4242], + ); + assert.deepEqual(commands, [ { command: "ps", args: ["-axo", "pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command="], @@ -219,11 +232,64 @@ describe("ProcessDiagnostics", () => { }), ); + it.effect("decodes Windows process rows through Schema JSON parsing", () => + Effect.gen(function* () { + const commands: Array<{ readonly command: string; readonly args: ReadonlyArray }> = + []; + const spawnerLayer = Layer.mock(ChildProcessSpawner.ChildProcessSpawner, { + spawn: (command) => { + const childProcess = command as unknown as { + readonly command: string; + readonly args: ReadonlyArray; + }; + commands.push({ command: childProcess.command, args: childProcess.args }); + return Effect.succeed( + mockHandle({ + stdout: encodeJsonString([ + { + ProcessId: 4242, + ParentProcessId: 100, + Name: "node.exe", + CommandLine: "node server.js", + Status: "Running", + WorkingSetSize: 2048, + PercentProcessorTime: 12.5, + }, + { + ProcessId: 0, + ParentProcessId: 100, + Name: "invalid.exe", + }, + ]), + }), + ); + }, + }); + + const rows = yield* ProcessDiagnostics.readProcessRows("win32").pipe( + Effect.provide(spawnerLayer), + ); + + assert.deepEqual(rows, [ + { + pid: 4242, + ppid: 100, + pgid: null, + status: "Running", + cpuPercent: 12.5, + rssBytes: 2048, + elapsed: "", + command: "node server.js", + }, + ]); + assert.equal(commands[0]?.command, "powershell.exe"); + }), + ); + it.effect("does not allow signaling the diagnostics query process", () => Effect.gen(function* () { - const spawnerLayer = Layer.succeed( - ChildProcessSpawner.ChildProcessSpawner, - ChildProcessSpawner.make(() => + const spawnerLayer = Layer.mock(ChildProcessSpawner.ChildProcessSpawner, { + spawn: () => Effect.succeed( mockHandle({ stdout: [ @@ -232,8 +298,7 @@ describe("ProcessDiagnostics", () => { ].join("\n"), }), ), - ), - ); + }); const layer = ProcessDiagnostics.layer.pipe(Layer.provide(spawnerLayer)); const result = yield* Effect.service(ProcessDiagnostics.ProcessDiagnostics).pipe( @@ -241,7 +306,7 @@ describe("ProcessDiagnostics", () => { Effect.provide(layer), ); - expect(result).toEqual({ + assert.deepEqual(result, { pid: 4242, signal: "SIGINT", signaled: false, diff --git a/apps/server/src/diagnostics/ProcessDiagnostics.ts b/apps/server/src/diagnostics/ProcessDiagnostics.ts index f56bf21651..399645fb15 100644 --- a/apps/server/src/diagnostics/ProcessDiagnostics.ts +++ b/apps/server/src/diagnostics/ProcessDiagnostics.ts @@ -26,7 +26,7 @@ export interface ProcessRow { readonly command: string; } -const PROCESS_QUERY_TIMEOUT_MS = 1_000; +const PROCESS_QUERY_TIMEOUT = Duration.seconds(1); const POSIX_PROCESS_QUERY_COMMAND = "pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command="; const PROCESS_QUERY_MAX_OUTPUT_BYTES = 2 * 1024 * 1024; @@ -43,17 +43,73 @@ export class ProcessDiagnostics extends Context.Service< ProcessDiagnosticsShape >()("t3/diagnostics/ProcessDiagnostics") {} -class ProcessDiagnosticsError extends Schema.TaggedErrorClass()( - "ProcessDiagnosticsError", +class ProcessQueryError extends Schema.TaggedErrorClass()("ProcessQueryError", { + message: Schema.String, + cause: Schema.optional(Schema.Defect), +}) {} + +class ProcessQueryTimeoutError extends Schema.TaggedErrorClass()( + "ProcessQueryTimeoutError", + { + message: Schema.String, + timeoutMillis: Schema.Number, + }, +) {} + +class ProcessCommandExitError extends Schema.TaggedErrorClass()( + "ProcessCommandExitError", { message: Schema.String, + command: Schema.String, + exitCode: Schema.Number, + stderr: Schema.String, + }, +) {} + +class ProcessSignalRejectedError extends Schema.TaggedErrorClass()( + "ProcessSignalRejectedError", + { + message: Schema.String, + pid: Schema.Number, + reason: Schema.String, + }, +) {} + +class ProcessSignalFailedError extends Schema.TaggedErrorClass()( + "ProcessSignalFailedError", + { + message: Schema.String, + pid: Schema.Number, + signal: Schema.Literals(["SIGINT", "SIGKILL"]), cause: Schema.optional(Schema.Defect), }, ) {} -const isProcessDiagnosticsError = Schema.is(ProcessDiagnosticsError); -function toProcessDiagnosticsError(message: string, cause?: unknown): ProcessDiagnosticsError { - return new ProcessDiagnosticsError({ +type ProcessDiagnosticsError = + | ProcessQueryError + | ProcessQueryTimeoutError + | ProcessCommandExitError + | ProcessSignalRejectedError + | ProcessSignalFailedError; + +const isProcessQueryError = Schema.is(ProcessQueryError); +const isProcessQueryTimeoutError = Schema.is(ProcessQueryTimeoutError); +const isProcessCommandExitError = Schema.is(ProcessCommandExitError); +const isProcessSignalRejectedError = Schema.is(ProcessSignalRejectedError); +const isProcessSignalFailedError = Schema.is(ProcessSignalFailedError); + +function isProcessDiagnosticsError(error: unknown): error is ProcessDiagnosticsError { + return ( + isProcessQueryError(error) || + isProcessQueryTimeoutError(error) || + isProcessCommandExitError(error) || + isProcessSignalRejectedError(error) || + isProcessSignalFailedError(error) + ); +} + +function makeProcessQueryError(message: string, cause?: unknown): ProcessQueryError { + return new ProcessQueryError({ message, ...(cause === undefined ? {} : { cause }), }); @@ -74,6 +130,22 @@ function parseNumber(value: string): number | null { return Number.isFinite(parsed) ? parsed : null; } +const WindowsProcessRowSchema = Schema.Struct({ + ProcessId: Schema.optional(Schema.Number), + ParentProcessId: Schema.optional(Schema.Number), + CommandLine: Schema.optional(Schema.NullOr(Schema.String)), + Name: Schema.optional(Schema.NullOr(Schema.String)), + Status: Schema.optional(Schema.NullOr(Schema.String)), + WorkingSetSize: Schema.optional(Schema.Number), + PercentProcessorTime: Schema.optional(Schema.Number), +}); +type WindowsProcessRow = typeof WindowsProcessRowSchema.Type; + +const WindowsProcessRowsJsonSchema = Schema.fromJsonString( + Schema.Union([Schema.Array(WindowsProcessRowSchema), WindowsProcessRowSchema]), +); +const decodeWindowsProcessRowsJsonOption = Schema.decodeUnknownOption(WindowsProcessRowsJsonSchema); + export function parsePosixProcessRows(output: string): ReadonlyArray { const rows: ProcessRow[] = []; const rowPattern = @@ -139,9 +211,7 @@ export function parsePosixProcessRows(output: string): ReadonlyArray return rows; } -function normalizeWindowsProcessRow(value: unknown): ProcessRow | null { - if (typeof value !== "object" || value === null) return null; - const record = value as Record; +function normalizeWindowsProcessRow(record: WindowsProcessRow): ProcessRow | null { const pid = typeof record.ProcessId === "number" ? record.ProcessId : null; const ppid = typeof record.ParentProcessId === "number" ? record.ParentProcessId : null; const commandLine = @@ -174,16 +244,15 @@ function normalizeWindowsProcessRow(value: unknown): ProcessRow | null { function parseWindowsProcessRows(output: string): ReadonlyArray { if (output.trim().length === 0) return []; - try { - const parsed = JSON.parse(output) as unknown; - const records = Array.isArray(parsed) ? parsed : [parsed]; - return records.flatMap((record) => { - const row = normalizeWindowsProcessRow(record); - return row ? [row] : []; - }); - } catch { - return []; - } + + const decoded = decodeWindowsProcessRowsJsonOption(output); + if (Option.isNone(decoded)) return []; + + const records = Array.isArray(decoded.value) ? decoded.value : [decoded.value]; + return records.flatMap((record) => { + const row = normalizeWindowsProcessRow(record); + return row ? [row] : []; + }); } export function buildDescendantEntries( @@ -309,17 +378,21 @@ const runProcess = Effect.fn("runProcess")( (effect, input) => effect.pipe( Effect.scoped, - Effect.timeoutOption(Duration.millis(PROCESS_QUERY_TIMEOUT_MS)), + Effect.timeoutOption(PROCESS_QUERY_TIMEOUT), Effect.flatMap((result) => Option.match(result, { - onNone: () => Effect.fail(toProcessDiagnosticsError(`${input.errorMessage} timed out.`)), + onNone: () => + Effect.fail( + new ProcessQueryTimeoutError({ + message: `${input.errorMessage} timed out.`, + timeoutMillis: Duration.toMillis(PROCESS_QUERY_TIMEOUT), + }), + ), onSome: Effect.succeed, }), ), Effect.mapError((cause) => - isProcessDiagnosticsError(cause) - ? cause - : toProcessDiagnosticsError(input.errorMessage, cause), + isProcessDiagnosticsError(cause) ? cause : makeProcessQueryError(input.errorMessage, cause), ), ), ); @@ -336,7 +409,14 @@ function readPosixProcessRows(): Effect.Effect< }).pipe( Effect.flatMap((result) => result.exitCode !== 0 - ? Effect.fail(toProcessDiagnosticsError(result.stderr.trim() || "ps failed.")) + ? Effect.fail( + new ProcessCommandExitError({ + message: result.stderr.trim() || "ps failed.", + command: "ps", + exitCode: result.exitCode, + stderr: result.stderr, + }), + ) : Effect.succeed(parsePosixProcessRows(result.stdout)), ), ); @@ -363,7 +443,12 @@ function readWindowsProcessRows(): Effect.Effect< Effect.flatMap((result) => result.exitCode !== 0 ? Effect.fail( - toProcessDiagnosticsError(result.stderr.trim() || "PowerShell process query failed."), + new ProcessCommandExitError({ + message: result.stderr.trim() || "PowerShell process query failed.", + command: "powershell.exe", + exitCode: result.exitCode, + stderr: result.stderr, + }), ) : Effect.succeed(parseWindowsProcessRows(result.stdout)), ), @@ -385,7 +470,13 @@ function assertDescendantPid( pid: number, ): Effect.Effect { if (pid === process.pid) { - return Effect.fail(toProcessDiagnosticsError("Refusing to signal the T3 server process.")); + return Effect.fail( + new ProcessSignalRejectedError({ + message: "Refusing to signal the T3 server process.", + pid, + reason: "self", + }), + ); } return readProcessRows().pipe( @@ -397,7 +488,11 @@ function assertDescendantPid( return descendant ? Effect.void : Effect.fail( - toProcessDiagnosticsError(`Process ${pid} is not a live descendant of the T3 server.`), + new ProcessSignalRejectedError({ + message: `Process ${pid} is not a live descendant of the T3 server.`, + pid, + reason: "not-descendant", + }), ); }), ); @@ -438,10 +533,12 @@ export const make = Effect.fn("makeProcessDiagnostics")(function* () { }; }, catch: (cause) => - toProcessDiagnosticsError( - `Failed to signal process ${input.pid} with ${input.signal}.`, + new ProcessSignalFailedError({ + message: `Failed to signal process ${input.pid} with ${input.signal}.`, + pid: input.pid, + signal: input.signal, cause, - ), + }), }), ), Effect.catch((error: ProcessDiagnosticsError) => diff --git a/apps/server/src/diagnostics/TraceDiagnostics.test.ts b/apps/server/src/diagnostics/TraceDiagnostics.test.ts index d4ffa4a5fc..4c83c3a37a 100644 --- a/apps/server/src/diagnostics/TraceDiagnostics.test.ts +++ b/apps/server/src/diagnostics/TraceDiagnostics.test.ts @@ -5,9 +5,12 @@ import * as FileSystem from "effect/FileSystem"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as PlatformError from "effect/PlatformError"; +import * as Schema from "effect/Schema"; import * as TraceDiagnostics from "./TraceDiagnostics.ts"; +const encodeTraceRecordJson = Schema.encodeUnknownSync(Schema.UnknownFromJsonString); + function ns(ms: number): string { return String(BigInt(ms) * 1_000_000n); } @@ -21,7 +24,7 @@ function record(input: { readonly exit?: { readonly _tag: "Success" | "Failure" | "Interrupted"; readonly cause?: string }; readonly events?: ReadonlyArray; }) { - return JSON.stringify({ + return encodeTraceRecordJson({ type: "effect-span", name: input.name, traceId: input.traceId, diff --git a/apps/server/src/diagnostics/TraceDiagnostics.ts b/apps/server/src/diagnostics/TraceDiagnostics.ts index ff63410b9b..4ec3306421 100644 --- a/apps/server/src/diagnostics/TraceDiagnostics.ts +++ b/apps/server/src/diagnostics/TraceDiagnostics.ts @@ -14,23 +14,7 @@ import * as FileSystem from "effect/FileSystem"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as PlatformError from "effect/PlatformError"; - -interface TraceRecordLike { - readonly name?: unknown; - readonly traceId?: unknown; - readonly spanId?: unknown; - readonly startTimeUnixNano?: unknown; - readonly endTimeUnixNano?: unknown; - readonly durationMs?: unknown; - readonly exit?: unknown; - readonly events?: unknown; -} - -interface TraceEventLike { - readonly name?: unknown; - readonly timeUnixNano?: unknown; - readonly attributes?: unknown; -} +import * as Schema from "effect/Schema"; export interface TraceDiagnosticsOptions { readonly traceFilePath: string; @@ -65,6 +49,35 @@ interface TraceDiagnosticsErrorSummary { const DEFAULT_SLOW_SPAN_THRESHOLD_MS = 1_000; const TOP_LIMIT = 10; const RECENT_LIMIT = 20; + +const TraceExitSchema = Schema.Struct({ + _tag: Schema.optional(Schema.Unknown), + cause: Schema.optional(Schema.Unknown), +}); +type TraceExit = typeof TraceExitSchema.Type; +const decodeTraceExitOption = Schema.decodeUnknownOption(TraceExitSchema); + +const TraceEventSchema = Schema.Struct({ + name: Schema.optional(Schema.Unknown), + timeUnixNano: Schema.optional(Schema.Unknown), + attributes: Schema.optional(Schema.Record(Schema.String, Schema.Unknown)), +}); +const decodeTraceEventOption = Schema.decodeUnknownOption(TraceEventSchema); + +const TraceRecordJsonSchema = Schema.fromJsonString( + Schema.Struct({ + name: Schema.optional(Schema.Unknown), + traceId: Schema.optional(Schema.Unknown), + spanId: Schema.optional(Schema.Unknown), + startTimeUnixNano: Schema.optional(Schema.Unknown), + endTimeUnixNano: Schema.optional(Schema.Unknown), + durationMs: Schema.optional(Schema.Unknown), + exit: Schema.optional(Schema.Unknown), + events: Schema.optional(Schema.Array(Schema.Unknown)), + }), +); +const decodeTraceRecordJsonOption = Schema.decodeUnknownOption(TraceRecordJsonSchema); + function toRotatedTracePaths(traceFilePath: string, maxFiles: number): ReadonlyArray { const backupCount = Math.max(0, Math.floor(maxFiles)); const backups = Array.from( @@ -74,47 +87,55 @@ function toRotatedTracePaths(traceFilePath: string, maxFiles: number): ReadonlyA return [...backups, traceFilePath]; } -function isRecordObject(value: unknown): value is TraceRecordLike { - return typeof value === "object" && value !== null; +function toStringOption(value: unknown): Option.Option { + return typeof value === "string" && value.trim().length > 0 ? Option.some(value) : Option.none(); } -function toStringValue(value: unknown): string | null { - return typeof value === "string" && value.trim().length > 0 ? value : null; +function toNumberOption(value: unknown): Option.Option { + return typeof value === "number" && Number.isFinite(value) ? Option.some(value) : Option.none(); } -function toNumberValue(value: unknown): number | null { - return typeof value === "number" && Number.isFinite(value) ? value : null; -} - -function unixNanoToDateTime(value: unknown): DateTime.Utc | null { - const text = toStringValue(value); - if (!text) return null; +function unixNanoToDateTime(value: unknown): Option.Option { + const text = toStringOption(value); + if (Option.isNone(text)) return Option.none(); try { - const millis = Number(BigInt(text) / 1_000_000n); - return Option.getOrNull(DateTime.make(millis)); + const millis = Number(BigInt(text.value) / 1_000_000n); + return DateTime.make(millis); } catch { - return null; + return Option.none(); } } -function readExitTag(exit: unknown): string | null { - if (!isRecordObject(exit) || !("_tag" in exit)) return null; - return toStringValue(exit._tag); +function decodeTraceExit(exit: unknown): Option.Option { + return decodeTraceExitOption(exit); +} + +function readExitTag(exit: unknown): Option.Option { + return Option.flatMap(decodeTraceExit(exit), (parsedExit) => toStringOption(parsedExit._tag)); } function readExitCause(exit: unknown): string { - if (!isRecordObject(exit) || !("cause" in exit)) return "Failure"; - return toStringValue(exit.cause)?.trim() ?? "Failure"; + return Option.getOrElse( + Option.flatMap(decodeTraceExit(exit), (parsedExit) => toStringOption(parsedExit.cause)), + () => "Failure", + ).trim(); } -function isTraceEvent(value: unknown): value is TraceEventLike { - return typeof value === "object" && value !== null; +function minDateTimeOption( + current: Option.Option, + candidate: Option.Option, +): Option.Option { + if (Option.isNone(candidate)) return current; + if (Option.isNone(current)) return candidate; + return DateTime.isLessThan(candidate.value, current.value) ? candidate : current; } -function readEventAttributes(event: TraceEventLike): Readonly> { - return typeof event.attributes === "object" && event.attributes !== null - ? (event.attributes as Readonly>) - : {}; +function maxDateTime( + current: Option.Option, + candidate: DateTime.Utc, +): Option.Option { + if (Option.isNone(current)) return Option.some(candidate); + return DateTime.isGreaterThan(candidate, current.value) ? Option.some(candidate) : current; } function makeEmptyDiagnostics(input: { @@ -199,8 +220,8 @@ export function aggregateTraceDiagnostics( let failureCount = 0; let interruptionCount = 0; let slowSpanCount = 0; - let firstSpanAt: DateTime.Utc | null = null; - let lastSpanAt: DateTime.Utc | null = null; + let firstSpanAt = Option.none(); + let lastSpanAt = Option.none(); const spansByName = new Map< string, @@ -217,42 +238,42 @@ export function aggregateTraceDiagnostics( for (const line of lines) { if (line.trim().length === 0) continue; - let parsed: unknown; - try { - parsed = JSON.parse(line); - } catch { - parseErrorCount += 1; - continue; - } - - if (!isRecordObject(parsed)) { + const decodedRecord = decodeTraceRecordJsonOption(line); + if (Option.isNone(decodedRecord)) { parseErrorCount += 1; continue; } + const parsed = decodedRecord.value; - const name = toStringValue(parsed.name); - const traceId = toStringValue(parsed.traceId); - const spanId = toStringValue(parsed.spanId); - const durationMs = toNumberValue(parsed.durationMs); + const nameOption = toStringOption(parsed.name); + const traceIdOption = toStringOption(parsed.traceId); + const spanIdOption = toStringOption(parsed.spanId); + const durationMsOption = toNumberOption(parsed.durationMs); const endedAt = unixNanoToDateTime(parsed.endTimeUnixNano); const startedAt = unixNanoToDateTime(parsed.startTimeUnixNano); - if (!name || !traceId || !spanId || durationMs === null || !endedAt) { + if ( + Option.isNone(nameOption) || + Option.isNone(traceIdOption) || + Option.isNone(spanIdOption) || + Option.isNone(durationMsOption) || + Option.isNone(endedAt) + ) { parseErrorCount += 1; continue; } + const name = nameOption.value; + const traceId = traceIdOption.value; + const spanId = spanIdOption.value; + const durationMs = durationMsOption.value; recordCount += 1; - firstSpanAt = - startedAt && (firstSpanAt === null || DateTime.isLessThan(startedAt, firstSpanAt)) - ? startedAt - : firstSpanAt; - lastSpanAt = - lastSpanAt === null || DateTime.isGreaterThan(endedAt, lastSpanAt) ? endedAt : lastSpanAt; + firstSpanAt = minDateTimeOption(firstSpanAt, startedAt); + lastSpanAt = maxDateTime(lastSpanAt, endedAt.value); const exitTag = readExitTag(parsed.exit); - const isFailure = exitTag === "Failure"; - const isInterrupted = exitTag === "Interrupted"; + const isFailure = Option.contains(exitTag, "Failure"); + const isInterrupted = Option.contains(exitTag, "Interrupted"); if (isFailure) failureCount += 1; if (isInterrupted) interruptionCount += 1; @@ -268,7 +289,7 @@ export function aggregateTraceDiagnostics( if (isFailure) spanSummary.failureCount += 1; spansByName.set(name, spanSummary); - const spanItem = { name, durationMs, endedAt, traceId, spanId }; + const spanItem = { name, durationMs, endedAt: endedAt.value, traceId, spanId }; if (durationMs >= slowSpanThresholdMs) { slowSpanCount += 1; } @@ -280,12 +301,13 @@ export function aggregateTraceDiagnostics( const failureKey = `${name}\0${cause}`; const existing = failuresByKey.get(failureKey); - const isLatestFailure = !existing || DateTime.isGreaterThan(endedAt, existing.lastSeenAt); + const isLatestFailure = + !existing || DateTime.isGreaterThan(endedAt.value, existing.lastSeenAt); failuresByKey.set(failureKey, { name, cause, count: (existing?.count ?? 0) + 1, - lastSeenAt: isLatestFailure ? endedAt : existing!.lastSeenAt, + lastSeenAt: isLatestFailure ? endedAt.value : existing!.lastSeenAt, traceId: isLatestFailure ? traceId : existing!.traceId, spanId: isLatestFailure ? spanId : existing!.spanId, }); @@ -293,10 +315,15 @@ export function aggregateTraceDiagnostics( if (Array.isArray(parsed.events)) { for (const rawEvent of parsed.events) { - if (!isTraceEvent(rawEvent)) continue; - const attributes = readEventAttributes(rawEvent); - const level = toStringValue(attributes["effect.logLevel"]); - if (!level) continue; + const event = decodeTraceEventOption(rawEvent); + if (Option.isNone(event)) continue; + const attributes = Option.getOrElse( + Option.fromUndefinedOr(event.value.attributes), + () => ({}) as Readonly>, + ); + const levelOption = toStringOption(attributes["effect.logLevel"]); + if (Option.isNone(levelOption)) continue; + const level = levelOption.value; logLevelCounts[level] = (logLevelCounts[level] ?? 0) + 1; const normalizedLevel = level.toLowerCase(); @@ -309,8 +336,14 @@ export function aggregateTraceDiagnostics( continue; } - const seenAt = unixNanoToDateTime(rawEvent.timeUnixNano) ?? endedAt; - const message = toStringValue(rawEvent.name)?.trim() ?? "Log event"; + const seenAt = Option.getOrElse( + unixNanoToDateTime(event.value.timeUnixNano), + () => endedAt.value, + ); + const message = Option.getOrElse( + Option.map(toStringOption(event.value.name), (name) => name.trim()), + () => "Log event", + ); latestWarningAndErrorLogs.push({ spanName: name, level, @@ -342,8 +375,8 @@ export function aggregateTraceDiagnostics( readAt, recordCount, parseErrorCount, - firstSpanAt: Option.fromNullishOr(firstSpanAt), - lastSpanAt: Option.fromNullishOr(lastSpanAt), + firstSpanAt, + lastSpanAt, failureCount, interruptionCount, slowSpanThresholdMs,