Skip to content

Commit 886a325

Browse files
authored
chore: unify event emitter (#8577)
* chore: unify event emitter * Tweaks * PR comments * Add callbacks for direct callers
1 parent 092ff0e commit 886a325

9 files changed

Lines changed: 172 additions & 222 deletions

File tree

web-common/src/features/chat/core/conversation-manager.ts

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -46,26 +46,17 @@ export class ConversationManager {
4646
private static readonly MAX_CONCURRENT_STREAMS = 3;
4747

4848
private newConversation: Conversation;
49+
private newConversationUnsub: (() => void) | null = null;
4950
private conversations = new Map<string, Conversation>();
5051
private conversationSelector: ConversationSelector;
51-
private agent?: string;
52+
private readonly agent?: string;
5253

5354
constructor(
5455
public readonly instanceId: string,
5556
options: ConversationManagerOptions,
5657
) {
5758
this.agent = options.agent;
58-
this.newConversation = new Conversation(
59-
this.instanceId,
60-
NEW_CONVERSATION_ID,
61-
{
62-
agent: this.agent,
63-
onStreamStart: () => this.enforceMaxConcurrentStreams(),
64-
onConversationCreated: (conversationId: string) => {
65-
this.handleConversationCreated(conversationId);
66-
},
67-
},
68-
);
59+
this.createNewConversation();
6960

7061
switch (options.conversationState) {
7162
case "url":
@@ -121,10 +112,10 @@ export class ConversationManager {
121112
const conversation = new Conversation(
122113
this.instanceId,
123114
$conversationId,
124-
{
125-
agent: this.agent,
126-
onStreamStart: () => this.enforceMaxConcurrentStreams(),
127-
},
115+
this.agent,
116+
);
117+
conversation.on("stream-start", () =>
118+
this.enforceMaxConcurrentStreams(),
128119
);
129120
this.conversations.set($conversationId, conversation);
130121
return conversation;
@@ -162,6 +153,26 @@ export class ConversationManager {
162153

163154
// ===== PRIVATE IMPLEMENTATION =====
164155

156+
private createNewConversation() {
157+
this.newConversationUnsub?.();
158+
this.newConversation = new Conversation(
159+
this.instanceId,
160+
NEW_CONVERSATION_ID,
161+
this.agent,
162+
);
163+
const streamStartUnsub = this.newConversation.on("stream-start", () =>
164+
this.enforceMaxConcurrentStreams(),
165+
);
166+
const conversationStartedUnsub = this.newConversation.on(
167+
"conversation-created",
168+
(conversationId) => this.handleConversationCreated(conversationId),
169+
);
170+
this.newConversationUnsub = () => {
171+
streamStartUnsub();
172+
conversationStartedUnsub();
173+
};
174+
}
175+
165176
// ----- Stream Management -----
166177

167178
/**
@@ -221,17 +232,7 @@ export class ConversationManager {
221232
this.conversations.set(conversationId, this.newConversation);
222233

223234
// Create a fresh "new" conversation instance
224-
this.newConversation = new Conversation(
225-
this.instanceId,
226-
NEW_CONVERSATION_ID,
227-
{
228-
agent: this.agent,
229-
onStreamStart: () => this.enforceMaxConcurrentStreams(),
230-
onConversationCreated: (conversationId: string) => {
231-
this.handleConversationCreated(conversationId);
232-
},
233-
},
234-
);
235+
this.createNewConversation();
235236
}
236237
}
237238

web-common/src/features/chat/core/conversation.ts

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@ import {
2424
invalidateConversationsList,
2525
NEW_CONVERSATION_ID,
2626
} from "./utils";
27+
import { EventEmitter } from "@rilldata/web-common/lib/event-emitter.ts";
28+
29+
type ConversationEvents = {
30+
"conversation-created": string;
31+
"stream-start": void;
32+
message: V1Message;
33+
"stream-complete": string;
34+
error: string;
35+
};
2736

2837
/**
2938
* Individual conversation state management.
@@ -37,25 +46,23 @@ export class Conversation {
3746
public readonly isStreaming = writable(false);
3847
public readonly streamError = writable<string | null>(null);
3948

49+
private readonly events = new EventEmitter<ConversationEvents>();
50+
public readonly on = this.events.on.bind(
51+
this.events,
52+
) as typeof this.events.on;
53+
public readonly once = this.events.once.bind(
54+
this.events,
55+
) as typeof this.events.once;
56+
4057
// Private state
4158
private sseClient: SSEFetchClient | null = null;
4259
private hasReceivedFirstMessage = false;
4360

4461
constructor(
4562
private readonly instanceId: string,
4663
public conversationId: string,
47-
private readonly options: {
48-
agent?: string;
49-
onStreamStart?: () => void;
50-
onConversationCreated?: (conversationId: string) => void;
51-
} = {
52-
agent: ToolName.ANALYST_AGENT, // Hardcoded default for now
53-
},
54-
) {
55-
if (this.options) {
56-
this.options.agent ??= ToolName.ANALYST_AGENT;
57-
}
58-
}
64+
private readonly agent: string = ToolName.ANALYST_AGENT, // Hardcoded default for now
65+
) {}
5966

6067
// ===== PUBLIC API =====
6168

@@ -116,12 +123,7 @@ export class Conversation {
116123
*/
117124
public async sendMessage(
118125
context: RuntimeServiceCompleteBody,
119-
options?: {
120-
onStreamStart?: () => void;
121-
onMessage?: (message: V1Message) => void;
122-
onStreamComplete?: (conversationId: string) => void;
123-
onError?: (error: string) => void;
124-
},
126+
options?: { onStreamStart?: () => void },
125127
): Promise<void> {
126128
// Prevent concurrent message sending
127129
if (get(this.isStreaming)) {
@@ -141,19 +143,16 @@ export class Conversation {
141143
const userMessage = this.addOptimisticUserMessage(prompt);
142144

143145
try {
144-
options?.onStreamStart?.();
146+
options?.onStreamStart?.(); // Callback for direct callers
147+
this.events.emit("stream-start"); // Event for external listeners
145148
// Start streaming - this establishes the connection
146-
const streamPromise = this.startStreaming(
147-
prompt,
148-
context,
149-
options?.onMessage,
150-
);
149+
const streamPromise = this.startStreaming(prompt, context);
151150

152151
// Wait for streaming to complete
153152
await streamPromise;
154153

155154
// Stream has completed successfully
156-
options?.onStreamComplete?.(this.conversationId);
155+
this.events.emit("stream-complete", this.conversationId);
157156

158157
// Temporary fix to make sure the title of the conversation is updated.
159158
void invalidateConversationsList(this.instanceId);
@@ -171,7 +170,7 @@ export class Conversation {
171170
userMessage,
172171
this.hasReceivedFirstMessage,
173172
);
174-
options?.onError?.(this.formatTransportError(error));
173+
this.events.emit("error", this.formatTransportError(error));
175174
} finally {
176175
this.isStreaming.set(false);
177176
}
@@ -200,6 +199,8 @@ export class Conversation {
200199
this.sseClient.cleanup();
201200
this.sseClient = null;
202201
}
202+
203+
this.events.clearListeners();
203204
}
204205

205206
// ===== PRIVATE IMPLEMENTATION =====
@@ -213,7 +214,6 @@ export class Conversation {
213214
private async startStreaming(
214215
prompt: string,
215216
context: RuntimeServiceCompleteBody | undefined,
216-
onMessage: ((message: V1Message) => void) | undefined,
217217
): Promise<void> {
218218
// Initialize SSE client if not already done
219219
if (!this.sseClient) {
@@ -238,7 +238,7 @@ export class Conversation {
238238
message.data,
239239
);
240240
this.processStreamingResponse(response);
241-
if (response.message) onMessage?.(response.message);
241+
if (response.message) this.events.emit("message", response.message);
242242
} catch (error) {
243243
console.error("Failed to parse streaming response:", error);
244244
this.streamError.set("Failed to process server response");
@@ -276,12 +276,12 @@ export class Conversation {
276276
? undefined
277277
: this.conversationId,
278278
prompt,
279-
agent: this.options?.agent,
279+
agent: this.agent,
280280
...context,
281281
};
282282

283283
// Notify that streaming is about to start (for concurrent stream management)
284-
this.options?.onStreamStart?.();
284+
this.events.emit("stream-start");
285285

286286
// Start streaming - this will establish the connection and then stream until completion
287287
await this.sseClient.start(baseUrl, {
@@ -360,7 +360,7 @@ export class Conversation {
360360
this.conversationId = realConversationId;
361361

362362
// Notify that conversation was created
363-
this.options?.onConversationCreated?.(realConversationId);
363+
this.events.emit("conversation-created", realConversationId);
364364
}
365365

366366
// ----- Cache Management -----

web-common/src/features/entity-management/file-and-resource-watcher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ export class FileAndResourceWatcher {
177177

178178
await invalidate("init");
179179

180-
eventBus.emit("rill-yaml-updated", null);
180+
eventBus.emit("rill-yaml-updated");
181181
}
182182
this.seenFiles.add(res.path);
183183
break;

web-common/src/features/sample-data/generate-sample-data.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@ export async function generateSampleData(
6262
},
6363
},
6464
});
65-
const conversation = new Conversation(instanceId, NEW_CONVERSATION_ID, {
66-
agent: ToolName.DEVELOPER_AGENT,
67-
});
65+
const conversation = new Conversation(
66+
instanceId,
67+
NEW_CONVERSATION_ID,
68+
ToolName.DEVELOPER_AGENT,
69+
);
6870
const agentPrompt = `Generate a NEW model with fresh data for the following user prompt: ${userPrompt}`;
6971
conversation.draftMessage.set(agentPrompt);
7072

@@ -124,15 +126,17 @@ export async function generateSampleData(
124126
}
125127
}
126128
};
129+
const handleMessageUnsub = conversation.on("message", handleMessage);
127130

128131
let cancelled = false;
129132

130133
conversation.cancelStream();
131134

132-
await conversation.sendMessage({}, { onMessage: handleMessage });
135+
await conversation.sendMessage({});
133136

134137
await waitUntil(() => !get(conversation.isStreaming));
135138

139+
handleMessageUnsub();
136140
overlay.set(null);
137141
if (cancelled) return;
138142
if (!created) {

web-common/src/lib/actions/modified-click.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export function modified(params: Params) {
2828
}
2929

3030
const event = (modifier ? `${modifier}-click` : "click") as keyof Events;
31-
eventBus.emit(event, null);
31+
eventBus.emit(event);
3232

3333
if (handler) {
3434
await handler(e);

web-common/src/lib/event-bus/event-bus.ts

Lines changed: 7 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,83 +3,20 @@ import type {
33
PageContentResized,
44
NotificationMessage,
55
} from "./events";
6-
7-
class EventBus {
8-
private listeners: EventMap = new Map();
9-
10-
on<Event extends T>(event: Event, callback: Listener<Event>) {
11-
const key = generateUUID();
12-
const eventMap = this.listeners.get(event);
13-
14-
if (!eventMap) {
15-
this.listeners.set(
16-
event,
17-
new Map<string, Listener<T>>([[key, callback]]),
18-
);
19-
} else {
20-
eventMap.set(key, callback);
21-
}
22-
23-
const unsubscribe = () => this.listeners.get(event)?.delete(key);
24-
25-
return unsubscribe;
26-
}
27-
28-
once<Event extends T>(event: Event, callback: Listener<Event>) {
29-
const unsubscribe = this.on(event, (payload) => {
30-
callback(payload);
31-
unsubscribe();
32-
});
33-
34-
return unsubscribe;
35-
}
36-
37-
emit<Event extends T>(event: Event, payload: Events[Event]) {
38-
const listeners = this.listeners.get(event);
39-
40-
listeners?.forEach((cb) => {
41-
cb(payload);
42-
});
43-
}
44-
}
45-
46-
function generateUUID(): string {
47-
// Generate random numbers for the UUID
48-
const randomNumbers: number[] = new Array(16)
49-
.fill(0)
50-
.map(() => Math.floor(Math.random() * 256));
51-
52-
// Set the version and variant bits
53-
randomNumbers[6] = (randomNumbers[6] & 0x0f) | 0x40; // Version 4
54-
randomNumbers[8] = (randomNumbers[8] & 0x3f) | 0x80; // Variant 10
55-
56-
// Convert to hexadecimal and format as a UUID
57-
const hexDigits: string = randomNumbers
58-
.map((b) => b.toString(16).padStart(2, "0"))
59-
.join("");
60-
return `${hexDigits.slice(0, 8)}-${hexDigits.slice(8, 12)}-${hexDigits.slice(12, 16)}-${hexDigits.slice(16, 20)}-${hexDigits.slice(20, 32)}`;
61-
}
62-
63-
export const eventBus = new EventBus();
6+
import { EventEmitter } from "@rilldata/web-common/lib/event-emitter.ts";
647

658
export interface Events {
669
notification: NotificationMessage;
6710
"clear-all-notifications": void;
6811
"add-banner": BannerEvent;
6912
"remove-banner": string;
70-
"shift-click": null;
71-
"command-click": null;
72-
click: null;
73-
"shift-command-click": null;
13+
"shift-click": void;
14+
"command-click": void;
15+
click: void;
16+
"shift-command-click": void;
7417
"page-content-resized": PageContentResized;
7518
"start-chat": string;
76-
"rill-yaml-updated": null;
19+
"rill-yaml-updated": void;
7720
}
7821

79-
type T = keyof Events;
80-
81-
type Listener<EventType extends T> = (e: Events[EventType]) => void;
82-
83-
type EventMap = Map<T, Listeners>;
84-
85-
type Listeners = Map<string, Listener<T>>;
22+
export const eventBus = new EventEmitter<Events>();

0 commit comments

Comments
 (0)