Skip to content

Commit 0829e7a

Browse files
committed
perf: optimize event buffer
1 parent deb3c3d commit 0829e7a

3 files changed

Lines changed: 292 additions & 125 deletions

File tree

apps/api/src/controllers/live.controller.ts

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,10 @@ import {
88
transformMinimalEvent,
99
} from '@openpanel/db';
1010
import { setSuperJson } from '@openpanel/json';
11-
import {
12-
psubscribeToPublishedEvent,
13-
subscribeToPublishedEvent,
14-
} from '@openpanel/redis';
11+
import { subscribeToPublishedEvent } from '@openpanel/redis';
1512
import { getProjectAccess } from '@openpanel/trpc';
1613
import { getOrganizationAccess } from '@openpanel/trpc/src/access';
1714

18-
export function getLiveEventInfo(key: string) {
19-
return key.split(':').slice(2) as [string, string];
20-
}
21-
2215
export function wsVisitors(
2316
socket: WebSocket,
2417
req: FastifyRequest<{
@@ -36,21 +29,8 @@ export function wsVisitors(
3629
}
3730
});
3831

39-
const punsubscribe = psubscribeToPublishedEvent(
40-
'__keyevent@0__:expired',
41-
(key) => {
42-
const [projectId] = getLiveEventInfo(key);
43-
if (projectId && projectId === params.projectId) {
44-
eventBuffer.getActiveVisitorCount(params.projectId).then((count) => {
45-
socket.send(String(count));
46-
});
47-
}
48-
},
49-
);
50-
5132
socket.on('close', () => {
5233
unsubscribe();
53-
punsubscribe();
5434
});
5535
}
5636

packages/db/src/buffers/event-buffer.test.ts

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ describe('EventBuffer', () => {
7171
// Get initial count
7272
const initialCount = await eventBuffer.getBufferSize();
7373

74-
// Add event
75-
await eventBuffer.add(event);
74+
// Add event and flush (events are micro-batched)
75+
eventBuffer.add(event);
76+
await eventBuffer.flush();
7677

7778
// Buffer counter should increase by 1
7879
const newCount = await eventBuffer.getBufferSize();
@@ -109,7 +110,8 @@ describe('EventBuffer', () => {
109110

110111
// Add first screen_view
111112
const count1 = await eventBuffer.getBufferSize();
112-
await eventBuffer.add(view1);
113+
eventBuffer.add(view1);
114+
await eventBuffer.flush();
113115

114116
// Should be stored as "last" but NOT in queue yet
115117
const count2 = await eventBuffer.getBufferSize();
@@ -124,7 +126,8 @@ describe('EventBuffer', () => {
124126
expect(last1!.createdAt.toISOString()).toBe(view1.created_at);
125127

126128
// Add second screen_view
127-
await eventBuffer.add(view2);
129+
eventBuffer.add(view2);
130+
await eventBuffer.flush();
128131

129132
// Now view1 should be in buffer
130133
const count3 = await eventBuffer.getBufferSize();
@@ -138,7 +141,8 @@ describe('EventBuffer', () => {
138141
expect(last2!.createdAt.toISOString()).toBe(view2.created_at);
139142

140143
// Add third screen_view
141-
await eventBuffer.add(view3);
144+
eventBuffer.add(view3);
145+
await eventBuffer.flush();
142146

143147
// Now view2 should also be in buffer
144148
const count4 = await eventBuffer.getBufferSize();
@@ -174,14 +178,16 @@ describe('EventBuffer', () => {
174178

175179
// Add screen_view
176180
const count1 = await eventBuffer.getBufferSize();
177-
await eventBuffer.add(view);
181+
eventBuffer.add(view);
182+
await eventBuffer.flush();
178183

179184
// Should be stored as "last", not in buffer yet
180185
const count2 = await eventBuffer.getBufferSize();
181186
expect(count2).toBe(count1);
182187

183188
// Add session_end
184-
await eventBuffer.add(sessionEnd);
189+
eventBuffer.add(sessionEnd);
190+
await eventBuffer.flush();
185191

186192
// Both should now be in buffer (+2)
187193
const count3 = await eventBuffer.getBufferSize();
@@ -207,7 +213,8 @@ describe('EventBuffer', () => {
207213
} as any;
208214

209215
const count1 = await eventBuffer.getBufferSize();
210-
await eventBuffer.add(sessionEnd);
216+
eventBuffer.add(sessionEnd);
217+
await eventBuffer.flush();
211218

212219
// Only session_end should be in buffer (+1)
213220
const count2 = await eventBuffer.getBufferSize();
@@ -224,7 +231,8 @@ describe('EventBuffer', () => {
224231
created_at: new Date().toISOString(),
225232
} as any;
226233

227-
await eventBuffer.add(view);
234+
eventBuffer.add(view);
235+
await eventBuffer.flush();
228236

229237
// Query by profileId
230238
const result = await eventBuffer.getLastScreenView({
@@ -248,7 +256,8 @@ describe('EventBuffer', () => {
248256
created_at: new Date().toISOString(),
249257
} as any;
250258

251-
await eventBuffer.add(view);
259+
eventBuffer.add(view);
260+
await eventBuffer.flush();
252261

253262
// Query by sessionId
254263
const result = await eventBuffer.getLastScreenView({
@@ -275,43 +284,47 @@ describe('EventBuffer', () => {
275284
expect(await eventBuffer.getBufferSize()).toBe(0);
276285

277286
// Add regular event
278-
await eventBuffer.add({
287+
eventBuffer.add({
279288
project_id: 'p6',
280289
name: 'event1',
281290
created_at: new Date().toISOString(),
282291
} as any);
292+
await eventBuffer.flush();
283293

284294
expect(await eventBuffer.getBufferSize()).toBe(1);
285295

286296
// Add another regular event
287-
await eventBuffer.add({
297+
eventBuffer.add({
288298
project_id: 'p6',
289299
name: 'event2',
290300
created_at: new Date().toISOString(),
291301
} as any);
302+
await eventBuffer.flush();
292303

293304
expect(await eventBuffer.getBufferSize()).toBe(2);
294305

295306
// Add screen_view (not counted until flushed)
296-
await eventBuffer.add({
307+
eventBuffer.add({
297308
project_id: 'p6',
298309
profile_id: 'u6',
299310
session_id: 'session_6',
300311
name: 'screen_view',
301312
created_at: new Date().toISOString(),
302313
} as any);
314+
await eventBuffer.flush();
303315

304316
// Still 2 (screen_view is pending)
305317
expect(await eventBuffer.getBufferSize()).toBe(2);
306318

307319
// Add another screen_view (first one gets flushed)
308-
await eventBuffer.add({
320+
eventBuffer.add({
309321
project_id: 'p6',
310322
profile_id: 'u6',
311323
session_id: 'session_6',
312324
name: 'screen_view',
313325
created_at: new Date(Date.now() + 1000).toISOString(),
314326
} as any);
327+
await eventBuffer.flush();
315328

316329
// Now 3 (2 regular + 1 flushed screen_view)
317330
expect(await eventBuffer.getBufferSize()).toBe(3);
@@ -330,8 +343,9 @@ describe('EventBuffer', () => {
330343
created_at: new Date(Date.now() + 1000).toISOString(),
331344
} as any;
332345

333-
await eventBuffer.add(event1);
334-
await eventBuffer.add(event2);
346+
eventBuffer.add(event1);
347+
eventBuffer.add(event2);
348+
await eventBuffer.flush();
335349

336350
expect(await eventBuffer.getBufferSize()).toBe(2);
337351

@@ -361,12 +375,13 @@ describe('EventBuffer', () => {
361375

362376
// Add 4 events
363377
for (let i = 0; i < 4; i++) {
364-
await eb.add({
378+
eb.add({
365379
project_id: 'p8',
366380
name: `event${i}`,
367381
created_at: new Date(Date.now() + i).toISOString(),
368382
} as any);
369383
}
384+
await eb.flush();
370385

371386
const insertSpy = vi
372387
.spyOn(ch, 'insert')
@@ -396,7 +411,8 @@ describe('EventBuffer', () => {
396411
created_at: new Date().toISOString(),
397412
} as any;
398413

399-
await eventBuffer.add(event);
414+
eventBuffer.add(event);
415+
await eventBuffer.flush();
400416

401417
const count = await eventBuffer.getActiveVisitorCount('p9');
402418
expect(count).toBeGreaterThanOrEqual(1);
@@ -439,10 +455,11 @@ describe('EventBuffer', () => {
439455
created_at: new Date(t0 + 2000).toISOString(),
440456
} as any;
441457

442-
await eventBuffer.add(view1a);
443-
await eventBuffer.add(view2a);
444-
await eventBuffer.add(view1b); // Flushes view1a
445-
await eventBuffer.add(view2b); // Flushes view2a
458+
eventBuffer.add(view1a);
459+
eventBuffer.add(view2a);
460+
eventBuffer.add(view1b); // Flushes view1a
461+
eventBuffer.add(view2b); // Flushes view2a
462+
await eventBuffer.flush();
446463

447464
// Should have 2 events in buffer (one from each session)
448465
expect(await eventBuffer.getBufferSize()).toBe(2);
@@ -470,7 +487,8 @@ describe('EventBuffer', () => {
470487
} as any;
471488

472489
const count1 = await eventBuffer.getBufferSize();
473-
await eventBuffer.add(view);
490+
eventBuffer.add(view);
491+
await eventBuffer.flush();
474492

475493
// Should go directly to buffer (no session_id)
476494
const count2 = await eventBuffer.getBufferSize();
@@ -498,8 +516,9 @@ describe('EventBuffer', () => {
498516
created_at: new Date(t0 + 1000).toISOString(),
499517
} as any;
500518

501-
await eventBuffer.add(view1);
502-
await eventBuffer.add(view2);
519+
eventBuffer.add(view1);
520+
eventBuffer.add(view2);
521+
await eventBuffer.flush();
503522

504523
// Both sessions should have their own "last"
505524
const lastSession1 = await eventBuffer.getLastScreenView({

0 commit comments

Comments
 (0)