Skip to content
This repository was archived by the owner on Mar 19, 2026. It is now read-only.

Commit 82f659d

Browse files
committed
feat(memory): implement buffer window memory for chat message history management
1 parent 9bdde4e commit 82f659d

5 files changed

Lines changed: 266 additions & 11 deletions

File tree

packages/core/src/generate/stream-text.ts

Lines changed: 209 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import {
1111
import {
1212
AssistantMessage,
1313
BaseMessage,
14+
BaseMessageChunk,
15+
createMessageChunk,
1416
UserMessage
1517
} from '../messages/messages.ts'
16-
import { ToolCallPart, ToolResultPart } from '../messages/part.ts'
18+
import { TextPart, ToolCallPart, ToolResultPart } from '../messages/part.ts'
1719
import { BaseTool } from '../tools/index.ts'
1820
import {
1921
AsyncIterableStream,
@@ -56,6 +58,200 @@ function createInitialState(): StreamState {
5658
}
5759
}
5860

61+
/**
62+
* Creates a stream of BaseMessageChunk objects from an input source
63+
* @param source The source stream of text or message parts
64+
* @param role The role of the message chunks (default: 'assistant')
65+
* @returns An AsyncIterableStream of BaseMessageChunk objects
66+
*/
67+
export function streamMessageChunks(
68+
source:
69+
| ReadableStream<TextStreamPart>
70+
| TransformStream<unknown, TextStreamPart>,
71+
role: 'assistant' | 'user' | 'system' | 'tool' = 'assistant'
72+
): AsyncIterableStream<BaseMessageChunk> {
73+
const outputStream = new TransformStream<unknown, BaseMessageChunk>()
74+
const writer = outputStream.writable.getWriter()
75+
76+
// Create a reader for the source stream
77+
const readableStream =
78+
source instanceof TransformStream ? source.readable : source
79+
const reader = readableStream.getReader()
80+
81+
// Process the stream
82+
let accumulatedText = ''
83+
let reasoningText = ''
84+
const currentToolCalls: ToolCallPart[] = []
85+
const currentToolResults: ToolResultPart[] = []
86+
87+
// Process the stream asynchronously
88+
;(async () => {
89+
try {
90+
while (true) {
91+
const { done, value } = await reader.read()
92+
93+
if (done) {
94+
// When the stream is done, emit a final message chunk if there's accumulated content
95+
if (
96+
accumulatedText ||
97+
currentToolCalls.length > 0 ||
98+
currentToolResults.length > 0
99+
) {
100+
const content = createFinalContent(
101+
accumulatedText,
102+
currentToolCalls,
103+
currentToolResults
104+
)
105+
const chunk = createMessageChunk({
106+
role,
107+
content,
108+
metadata: reasoningText
109+
? { reasoning: reasoningText }
110+
: undefined
111+
})
112+
await writer.write(chunk)
113+
}
114+
break
115+
}
116+
117+
// Process different types of stream parts
118+
if (value.type === 'text-delta') {
119+
accumulatedText += value.textDelta
120+
121+
// Create a message chunk with the current text
122+
const chunk = createMessageChunk({
123+
role,
124+
content: accumulatedText,
125+
metadata: reasoningText
126+
? { reasoning: reasoningText }
127+
: undefined
128+
})
129+
130+
await writer.write(chunk)
131+
} else if (value.type === 'reasoning') {
132+
// Accumulate reasoning text
133+
reasoningText += value.textDelta
134+
135+
// Create a message chunk with the current content and updated reasoning
136+
const chunk = createMessageChunk({
137+
role,
138+
content: accumulatedText,
139+
metadata: { reasoning: reasoningText }
140+
})
141+
142+
await writer.write(chunk)
143+
} else if (value.type === 'tool-call') {
144+
// Add a new tool call
145+
const toolCall: ToolCallPart = {
146+
type: 'tool-call',
147+
toolCallId: value.toolCallId,
148+
toolName: value.toolName,
149+
args: JSON.parse(value.args)
150+
}
151+
152+
currentToolCalls.push(toolCall)
153+
154+
// Create a message chunk with the current content
155+
const content = createFinalContent(
156+
accumulatedText,
157+
currentToolCalls,
158+
currentToolResults
159+
)
160+
const chunk = createMessageChunk({
161+
role,
162+
content,
163+
metadata: reasoningText
164+
? { reasoning: reasoningText }
165+
: undefined
166+
})
167+
168+
await writer.write(chunk)
169+
} else if (value.type === 'tool-result') {
170+
// Add a new tool result
171+
const toolResult: ToolResultPart = {
172+
type: 'tool-result',
173+
toolCallId: value.toolCallId,
174+
toolName: '', // This will be filled in by matching with tool calls
175+
result: JSON.parse(value.toolResult)
176+
}
177+
178+
// Try to find matching tool call to get the tool name
179+
const matchingToolCall = currentToolCalls.find(
180+
(tc) => tc.toolCallId === value.toolCallId
181+
)
182+
if (matchingToolCall) {
183+
toolResult.toolName = matchingToolCall.toolName
184+
}
185+
186+
currentToolResults.push(toolResult)
187+
188+
// Create a message chunk with the current content
189+
const content = createFinalContent(
190+
accumulatedText,
191+
currentToolCalls,
192+
currentToolResults
193+
)
194+
const chunk = createMessageChunk({
195+
role,
196+
content,
197+
metadata: reasoningText
198+
? { reasoning: reasoningText }
199+
: undefined
200+
})
201+
202+
await writer.write(chunk)
203+
}
204+
// Other types (source, etc.) are ignored for message chunks
205+
}
206+
207+
await writer.close()
208+
} catch (error) {
209+
await writer.abort(error)
210+
throw error
211+
} finally {
212+
reader.releaseLock()
213+
}
214+
})()
215+
216+
return createAsyncIterableStream(outputStream)
217+
}
218+
219+
/**
220+
* Helper function to create the final content for a message chunk
221+
*/
222+
function createFinalContent(
223+
text: string,
224+
toolCalls: ToolCallPart[],
225+
toolResults: ToolResultPart[]
226+
): string | (TextPart | ToolCallPart | ToolResultPart)[] {
227+
if (!text && toolCalls.length === 0 && toolResults.length === 0) {
228+
return ''
229+
}
230+
231+
const parts: (TextPart | ToolCallPart | ToolResultPart)[] = []
232+
233+
// Add text part if there's text
234+
if (text) {
235+
parts.push({
236+
type: 'text',
237+
text
238+
})
239+
}
240+
241+
// Add tool calls
242+
parts.push(...toolCalls)
243+
244+
// Add tool results
245+
parts.push(...toolResults)
246+
247+
// If there's only text and no tool calls/results, return as string
248+
if (parts.length === 1 && parts[0].type === 'text') {
249+
return parts[0].text
250+
}
251+
252+
return parts
253+
}
254+
59255
export function streamText({
60256
model,
61257
prompt,
@@ -236,6 +432,12 @@ export function streamText({
236432
}
237433
state.currentStepState = 'done'
238434
writer.write(value)
435+
callback?.onLLMEnd?.({
436+
text: state.fullText,
437+
modelId: model.model,
438+
...settings,
439+
...state
440+
})
239441
break
240442
case 'error':
241443
callback?.onError?.(value.error as Error, {
@@ -365,12 +567,6 @@ export function streamText({
365567
promises.steps.resolve(state.steps)
366568
promises.metadata.resolve(state.metadata)
367569

368-
callback?.onLLMEnd?.({
369-
text: state.fullText,
370-
modelId: model.model,
371-
...settings
372-
})
373-
374570
await writer.close()
375571
} catch (error) {
376572
writer.abort(error)
@@ -409,6 +605,10 @@ export function streamText({
409605
)
410606
},
411607

608+
get messageStream() {
609+
return streamMessageChunks(baseStream, 'assistant')
610+
},
611+
412612
get fullStream() {
413613
return createAsyncIterableStream<TextStreamPart>(baseStream)
414614
},
@@ -472,6 +672,8 @@ export interface StreamTextResult {
472672

473673
readonly textStream: AsyncIterableStream<string>
474674

675+
readonly messageStream: AsyncIterableStream<BaseMessageChunk>
676+
475677
readonly fullStream: AsyncIterableStream<TextStreamPart>
476678
}
477679

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { BaseMessage } from '../index.ts'
2+
import { BaseChatMessageHistory } from './index.ts'
3+
4+
export function bufferWindowMemory(windowSize: number): BaseChatMessageHistory {
5+
const messages: BaseMessage[] = []
6+
return {
7+
getMessages: async () => messages.slice(-windowSize),
8+
addMessage: async (message: BaseMessage) => {
9+
messages.push(message)
10+
if (messages.length > windowSize) {
11+
messages.shift()
12+
}
13+
},
14+
addUserMessage: async function (message: string) {
15+
this.addMessage({
16+
content: message,
17+
role: 'user'
18+
})
19+
},
20+
addAssistantChatMessage: async function (message: string) {
21+
this.addMessage({
22+
content: message,
23+
role: 'assistant'
24+
})
25+
},
26+
addMessages: async function (messages: BaseMessage[]) {
27+
for (const message of messages) {
28+
this.addMessage(message)
29+
}
30+
},
31+
clear: async function () {
32+
messages.length = 0
33+
}
34+
}
35+
}

packages/core/src/memorys/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { BaseMessage } from '../messages/index.ts'
2-
2+
export * from './buffer-window-memory.ts'
33
export interface BaseChatMessageHistory {
44
getMessages(): Promise<BaseMessage[]>
55

packages/core/src/prompts/base.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { generatateText } from '../generate/index.ts'
1+
import { generatateText, streamText } from '../generate/index.ts'
22
import { parseTemplate, renderTemplate } from './template.ts'
33
import { InputValues, PartialValues } from './types.ts'
44
import {
@@ -65,7 +65,10 @@ export function promptTemplate(template: string): BasePromptTemplate {
6565
}
6666

6767
export function bindPromptTemplate<
68-
T extends typeof generatateText | typeof generatateObject<R>,
68+
T extends
69+
| typeof generatateText
70+
| typeof streamText
71+
| typeof generatateObject<R>,
6972
R = unknown
7073
>(
7174
template:

packages/core/src/services/cortex-luna-service.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,20 @@ export class CortexLunaService extends Service {
1313
}
1414

1515
registerProvider({ id, provider }: { id: string; provider: Provider }) {
16-
const dispose = this._registry.registerProvider({
16+
const disposed = this._registry.registerProvider({
1717
id,
1818
provider
1919
})
2020

21+
const dispose = () => {
22+
return () => {
23+
disposed()
24+
this.ctx.emit('cortexluna/provider-updated', this)
25+
}
26+
}
27+
28+
this.ctx.emit('cortexluna/provider-updated', this)
29+
2130
return this.ctx.effect(() => dispose)
2231
}
2332

@@ -44,4 +53,10 @@ declare module 'cordis' {
4453
interface Context {
4554
cortex_luna: CortexLunaService
4655
}
56+
57+
interface Events {
58+
'cortexluna/provider-updated': (
59+
service: CortexLunaService
60+
) => Promise<void>
61+
}
4762
}

0 commit comments

Comments
 (0)