From 8afcf55154bafe1e5553bad1fd98fb1c88da405d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 6 Feb 2026 12:41:41 +0000 Subject: [PATCH 1/6] fix: how we fetch profiles in the buffer --- packages/db/src/buffers/profile-buffer.ts | 40 +++++++++++------------ 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/packages/db/src/buffers/profile-buffer.ts b/packages/db/src/buffers/profile-buffer.ts index de80bd422..d568bb23c 100644 --- a/packages/db/src/buffers/profile-buffer.ts +++ b/packages/db/src/buffers/profile-buffer.ts @@ -1,11 +1,11 @@ import { deepMergeObjects } from '@openpanel/common'; import { getSafeJson } from '@openpanel/json'; import type { ILogger } from '@openpanel/logger'; -import { type Redis, getRedisCache } from '@openpanel/redis'; +import { getRedisCache, type Redis } from '@openpanel/redis'; import shallowEqual from 'fast-deep-equal'; import { omit } from 'ramda'; import sqlstring from 'sqlstring'; -import { TABLE_NAMES, ch, chQuery } from '../clickhouse/client'; +import { ch, chQuery, TABLE_NAMES } from '../clickhouse/client'; import type { IClickhouseProfile } from '../services/profile.service'; import { BaseBuffer } from './base-buffer'; @@ -89,7 +89,7 @@ export class ProfileBuffer extends BaseBuffer { 'os_version', 'browser_version', ], - profile.properties, + profile.properties ); } @@ -97,16 +97,16 @@ export class ProfileBuffer extends BaseBuffer { ? deepMergeObjects(existingProfile, omit(['created_at'], profile)) : profile; - if (profile && existingProfile) { - if ( - shallowEqual( - omit(['created_at'], existingProfile), - omit(['created_at'], mergedProfile), - ) - ) { - this.logger.debug('Profile not changed, skipping'); - return; - } + if ( + profile && + existingProfile && + shallowEqual( + omit(['created_at'], existingProfile), + omit(['created_at'], mergedProfile) + ) + ) { + this.logger.debug('Profile not changed, skipping'); + return; } this.logger.debug('Merged profile will be inserted', { @@ -151,11 +151,11 @@ export class ProfileBuffer extends BaseBuffer { private async fetchProfile( profile: IClickhouseProfile, - logger: ILogger, + logger: ILogger ): Promise { const existingProfile = await this.fetchFromCache( profile.id, - profile.project_id, + profile.project_id ); if (existingProfile) { logger.debug('Profile found in Redis'); @@ -167,7 +167,7 @@ export class ProfileBuffer extends BaseBuffer { public async fetchFromCache( profileId: string, - projectId: string, + projectId: string ): Promise { const cacheKey = this.getProfileCacheKey({ profileId, @@ -182,7 +182,7 @@ export class ProfileBuffer extends BaseBuffer { private async fetchFromClickhouse( profile: IClickhouseProfile, - logger: ILogger, + logger: ILogger ): Promise { logger.debug('Fetching profile from Clickhouse'); const result = await chQuery( @@ -207,7 +207,7 @@ export class ProfileBuffer extends BaseBuffer { } GROUP BY id, project_id ORDER BY created_at DESC - LIMIT 1`, + LIMIT 1` ); logger.debug('Clickhouse fetch result', { found: !!result[0], @@ -221,7 +221,7 @@ export class ProfileBuffer extends BaseBuffer { const profiles = await this.redis.lrange( this.redisKey, 0, - this.batchSize - 1, + this.batchSize - 1 ); if (profiles.length === 0) { @@ -231,7 +231,7 @@ export class ProfileBuffer extends BaseBuffer { this.logger.debug(`Processing ${profiles.length} profiles in buffer`); const parsedProfiles = profiles.map((p) => - getSafeJson(p), + getSafeJson(p) ); for (const chunk of this.chunks(parsedProfiles, this.chunkSize)) { From 664f1abe0a3b3a8fd46b857c0f7cc2c24bd8d3a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Thu, 22 Jan 2026 09:44:30 +0100 Subject: [PATCH 2/6] perf: optimize event buffer --- apps/api/src/controllers/live.controller.ts | 22 +- packages/db/src/buffers/event-buffer.test.ts | 69 ++-- packages/db/src/buffers/event-buffer.ts | 326 ++++++++++++++----- 3 files changed, 292 insertions(+), 125 deletions(-) diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index 29931aa84..cd7afe914 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -8,17 +8,10 @@ import { transformMinimalEvent, } from '@openpanel/db'; import { setSuperJson } from '@openpanel/json'; -import { - psubscribeToPublishedEvent, - subscribeToPublishedEvent, -} from '@openpanel/redis'; +import { subscribeToPublishedEvent } from '@openpanel/redis'; import { getProjectAccess } from '@openpanel/trpc'; import { getOrganizationAccess } from '@openpanel/trpc/src/access'; -export function getLiveEventInfo(key: string) { - return key.split(':').slice(2) as [string, string]; -} - export function wsVisitors( socket: WebSocket, req: FastifyRequest<{ @@ -36,21 +29,8 @@ export function wsVisitors( } }); - const punsubscribe = psubscribeToPublishedEvent( - '__keyevent@0__:expired', - (key) => { - const [projectId] = getLiveEventInfo(key); - if (projectId && projectId === params.projectId) { - eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { - socket.send(String(count)); - }); - } - }, - ); - socket.on('close', () => { unsubscribe(); - punsubscribe(); }); } diff --git a/packages/db/src/buffers/event-buffer.test.ts b/packages/db/src/buffers/event-buffer.test.ts index 95852bd29..50600c704 100644 --- a/packages/db/src/buffers/event-buffer.test.ts +++ b/packages/db/src/buffers/event-buffer.test.ts @@ -71,8 +71,9 @@ describe('EventBuffer', () => { // Get initial count const initialCount = await eventBuffer.getBufferSize(); - // Add event - await eventBuffer.add(event); + // Add event and flush (events are micro-batched) + eventBuffer.add(event); + await eventBuffer.flush(); // Buffer counter should increase by 1 const newCount = await eventBuffer.getBufferSize(); @@ -109,7 +110,8 @@ describe('EventBuffer', () => { // Add first screen_view const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(view1); + eventBuffer.add(view1); + await eventBuffer.flush(); // Should be stored as "last" but NOT in queue yet const count2 = await eventBuffer.getBufferSize(); @@ -124,7 +126,8 @@ describe('EventBuffer', () => { expect(last1!.createdAt.toISOString()).toBe(view1.created_at); // Add second screen_view - await eventBuffer.add(view2); + eventBuffer.add(view2); + await eventBuffer.flush(); // Now view1 should be in buffer const count3 = await eventBuffer.getBufferSize(); @@ -138,7 +141,8 @@ describe('EventBuffer', () => { expect(last2!.createdAt.toISOString()).toBe(view2.created_at); // Add third screen_view - await eventBuffer.add(view3); + eventBuffer.add(view3); + await eventBuffer.flush(); // Now view2 should also be in buffer const count4 = await eventBuffer.getBufferSize(); @@ -174,14 +178,16 @@ describe('EventBuffer', () => { // Add screen_view const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Should be stored as "last", not in buffer yet const count2 = await eventBuffer.getBufferSize(); expect(count2).toBe(count1); // Add session_end - await eventBuffer.add(sessionEnd); + eventBuffer.add(sessionEnd); + await eventBuffer.flush(); // Both should now be in buffer (+2) const count3 = await eventBuffer.getBufferSize(); @@ -207,7 +213,8 @@ describe('EventBuffer', () => { } as any; const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(sessionEnd); + eventBuffer.add(sessionEnd); + await eventBuffer.flush(); // Only session_end should be in buffer (+1) const count2 = await eventBuffer.getBufferSize(); @@ -224,7 +231,8 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any; - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Query by profileId const result = await eventBuffer.getLastScreenView({ @@ -248,7 +256,8 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any; - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Query by sessionId const result = await eventBuffer.getLastScreenView({ @@ -275,43 +284,47 @@ describe('EventBuffer', () => { expect(await eventBuffer.getBufferSize()).toBe(0); // Add regular event - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', name: 'event1', created_at: new Date().toISOString(), } as any); + await eventBuffer.flush(); expect(await eventBuffer.getBufferSize()).toBe(1); // Add another regular event - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', name: 'event2', created_at: new Date().toISOString(), } as any); + await eventBuffer.flush(); expect(await eventBuffer.getBufferSize()).toBe(2); // Add screen_view (not counted until flushed) - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', profile_id: 'u6', session_id: 'session_6', name: 'screen_view', created_at: new Date().toISOString(), } as any); + await eventBuffer.flush(); // Still 2 (screen_view is pending) expect(await eventBuffer.getBufferSize()).toBe(2); // Add another screen_view (first one gets flushed) - await eventBuffer.add({ + eventBuffer.add({ project_id: 'p6', profile_id: 'u6', session_id: 'session_6', name: 'screen_view', created_at: new Date(Date.now() + 1000).toISOString(), } as any); + await eventBuffer.flush(); // Now 3 (2 regular + 1 flushed screen_view) expect(await eventBuffer.getBufferSize()).toBe(3); @@ -330,8 +343,9 @@ describe('EventBuffer', () => { created_at: new Date(Date.now() + 1000).toISOString(), } as any; - await eventBuffer.add(event1); - await eventBuffer.add(event2); + eventBuffer.add(event1); + eventBuffer.add(event2); + await eventBuffer.flush(); expect(await eventBuffer.getBufferSize()).toBe(2); @@ -361,12 +375,13 @@ describe('EventBuffer', () => { // Add 4 events for (let i = 0; i < 4; i++) { - await eb.add({ + eb.add({ project_id: 'p8', name: `event${i}`, created_at: new Date(Date.now() + i).toISOString(), } as any); } + await eb.flush(); const insertSpy = vi .spyOn(ch, 'insert') @@ -396,7 +411,8 @@ describe('EventBuffer', () => { created_at: new Date().toISOString(), } as any; - await eventBuffer.add(event); + eventBuffer.add(event); + await eventBuffer.flush(); const count = await eventBuffer.getActiveVisitorCount('p9'); expect(count).toBeGreaterThanOrEqual(1); @@ -439,10 +455,11 @@ describe('EventBuffer', () => { created_at: new Date(t0 + 2000).toISOString(), } as any; - await eventBuffer.add(view1a); - await eventBuffer.add(view2a); - await eventBuffer.add(view1b); // Flushes view1a - await eventBuffer.add(view2b); // Flushes view2a + eventBuffer.add(view1a); + eventBuffer.add(view2a); + eventBuffer.add(view1b); // Flushes view1a + eventBuffer.add(view2b); // Flushes view2a + await eventBuffer.flush(); // Should have 2 events in buffer (one from each session) expect(await eventBuffer.getBufferSize()).toBe(2); @@ -470,7 +487,8 @@ describe('EventBuffer', () => { } as any; const count1 = await eventBuffer.getBufferSize(); - await eventBuffer.add(view); + eventBuffer.add(view); + await eventBuffer.flush(); // Should go directly to buffer (no session_id) const count2 = await eventBuffer.getBufferSize(); @@ -498,8 +516,9 @@ describe('EventBuffer', () => { created_at: new Date(t0 + 1000).toISOString(), } as any; - await eventBuffer.add(view1); - await eventBuffer.add(view2); + eventBuffer.add(view1); + eventBuffer.add(view2); + await eventBuffer.flush(); // Both sessions should have their own "last" const lastSession1 = await eventBuffer.getLastScreenView({ diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index d305372aa..15d29b5b4 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -25,8 +25,21 @@ import { BaseBuffer } from './base-buffer'; * - Retrieve the last screen_view (don't modify it) * - Push both screen_view and session_end to buffer * 4. Flush: Simply process all events from the list buffer + * + * Optimizations: + * - Micro-batching: Events are buffered locally and flushed every 10ms to reduce Redis round-trips + * - Batched publishes: All PUBLISH commands are included in the multi pipeline + * - Simplified active visitor tracking: Only uses ZADD (removed redundant heartbeat SET) */ +// Pending event for local buffer +interface PendingEvent { + event: IClickhouseEvent; + eventJson: string; + eventWithTimestamp?: string; + type: 'regular' | 'screen_view' | 'session_end'; +} + export class EventBuffer extends BaseBuffer { // Configurable limits private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE @@ -36,6 +49,27 @@ export class EventBuffer extends BaseBuffer { ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) : 1000; + // Micro-batching configuration + private microBatchIntervalMs = process.env.EVENT_BUFFER_MICRO_BATCH_MS + ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_MS, 10) + : 10; // Flush every 10ms by default + private microBatchMaxSize = process.env.EVENT_BUFFER_MICRO_BATCH_SIZE + ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_SIZE, 10) + : 100; // Or when we hit 100 events + + // Local event buffer for micro-batching + private pendingEvents: PendingEvent[] = []; + private flushTimer: ReturnType | null = null; + private isFlushing = false; + + // Throttled publish configuration + private publishThrottleMs = process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS + ? Number.parseInt(process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS, 10) + : 1000; // Publish at most once per second + private lastPublishTime = 0; + private pendingPublishEvent: IClickhouseEvent | null = null; + private publishTimer: ReturnType | null = null; + private activeVisitorsExpiration = 60 * 5; // 5 minutes // LIST - Stores all events ready to be flushed @@ -190,98 +224,228 @@ return added } bulkAdd(events: IClickhouseEvent[]) { - const redis = getRedisCache(); - const multi = redis.multi(); + // Add all events to local buffer - they will be flushed together for (const event of events) { - this.add(event, multi); + this.add(event); } - return multi.exec(); } /** - * Add an event into Redis buffer. + * Add an event into the local buffer for micro-batching. + * + * Events are buffered locally and flushed to Redis every microBatchIntervalMs + * or when microBatchMaxSize is reached. This dramatically reduces Redis round-trips. * * Logic: * - screen_view: Store as "last" for session, flush previous if exists * - session_end: Flush last screen_view + session_end * - Other events: Add directly to queue */ - async add(event: IClickhouseEvent, _multi?: ReturnType) { + add(event: IClickhouseEvent, _multi?: ReturnType) { + const eventJson = JSON.stringify(event); + + // Determine event type and prepare data + let type: PendingEvent['type'] = 'regular'; + let eventWithTimestamp: string | undefined; + + if (event.session_id && event.name === 'screen_view') { + type = 'screen_view'; + const timestamp = new Date(event.created_at || Date.now()).getTime(); + eventWithTimestamp = JSON.stringify({ + event: event, + ts: timestamp, + }); + } else if (event.session_id && event.name === 'session_end') { + type = 'session_end'; + } + + const pendingEvent: PendingEvent = { + event, + eventJson, + eventWithTimestamp, + type, + }; + + // If a multi was provided (legacy bulkAdd pattern), add directly without batching + if (_multi) { + this.addToMulti(_multi, pendingEvent); + return; + } + + // Add to local buffer for micro-batching + this.pendingEvents.push(pendingEvent); + + // Check if we should flush immediately due to size + if (this.pendingEvents.length >= this.microBatchMaxSize) { + this.flushLocalBuffer(); + return; + } + + // Schedule flush if not already scheduled + if (!this.flushTimer) { + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + this.flushLocalBuffer(); + }, this.microBatchIntervalMs); + } + } + + /** + * Add a single pending event to a multi pipeline. + * Used both for legacy _multi pattern and during batch flush. + */ + private addToMulti(multi: ReturnType, pending: PendingEvent) { + const { event, eventJson, eventWithTimestamp, type } = pending; + + if (type === 'screen_view' && event.session_id) { + const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); + const profileKey = event.profile_id + ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id) + : ''; + + this.evalScript( + multi, + 'addScreenView', + this.addScreenViewScript, + 4, + sessionKey, + profileKey, + this.queueKey, + this.bufferCounterKey, + eventWithTimestamp!, + '3600', + ); + } else if (type === 'session_end' && event.session_id) { + const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); + const profileKey = event.profile_id + ? this.getLastScreenViewKeyByProfile(event.project_id, event.profile_id) + : ''; + + this.evalScript( + multi, + 'addSessionEnd', + this.addSessionEndScript, + 4, + sessionKey, + profileKey, + this.queueKey, + this.bufferCounterKey, + eventJson, + ); + } else { + // Regular events go directly to queue + multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey); + } + + // Active visitor tracking (simplified - only ZADD, no redundant SET) + if (event.profile_id) { + this.incrementActiveVisitorCount( + multi, + event.project_id, + event.profile_id, + ); + } + } + + /** + * Force flush all pending events from local buffer to Redis immediately. + * Useful for testing or when you need to ensure all events are persisted. + */ + public async flush() { + // Clear any pending timer + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + await this.flushLocalBuffer(); + } + + /** + * Flush all pending events from local buffer to Redis in a single pipeline. + * This is the core optimization - batching many events into one round-trip. + */ + private async flushLocalBuffer() { + if (this.isFlushing || this.pendingEvents.length === 0) { + return; + } + + this.isFlushing = true; + + // Grab current pending events and clear buffer + const eventsToFlush = this.pendingEvents; + this.pendingEvents = []; + try { const redis = getRedisCache(); - const eventJson = JSON.stringify(event); - const multi = _multi || redis.multi(); - - if (event.session_id && event.name === 'screen_view') { - // Handle screen_view - const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); - const profileKey = event.profile_id - ? this.getLastScreenViewKeyByProfile( - event.project_id, - event.profile_id, - ) - : ''; - const timestamp = new Date(event.created_at || Date.now()).getTime(); - - // Combine event and timestamp into single JSON for atomic operations - const eventWithTimestamp = JSON.stringify({ - event: event, - ts: timestamp, - }); + const multi = redis.multi(); - this.evalScript( - multi, - 'addScreenView', - this.addScreenViewScript, - 4, - sessionKey, - profileKey, - this.queueKey, - this.bufferCounterKey, - eventWithTimestamp, - '3600', // 1 hour TTL - ); - } else if (event.session_id && event.name === 'session_end') { - // Handle session_end - const sessionKey = this.getLastScreenViewKeyBySession(event.session_id); - const profileKey = event.profile_id - ? this.getLastScreenViewKeyByProfile( - event.project_id, - event.profile_id, - ) - : ''; - - this.evalScript( - multi, - 'addSessionEnd', - this.addSessionEndScript, - 4, - sessionKey, - profileKey, - this.queueKey, - this.bufferCounterKey, - eventJson, - ); - } else { - // All other events go directly to queue - multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey); + // Add all events to the pipeline + for (const pending of eventsToFlush) { + this.addToMulti(multi, pending); } - if (event.profile_id) { - this.incrementActiveVisitorCount( - multi, - event.project_id, - event.profile_id, - ); - } + await multi.exec(); - if (!_multi) { - await multi.exec(); + // Throttled publish - just signal that events were received + // Store the last event for publishing (we only need one to signal activity) + const lastEvent = eventsToFlush[eventsToFlush.length - 1]; + if (lastEvent) { + this.scheduleThrottledPublish(lastEvent.event); } - - await publishEvent('events', 'received', transformEvent(event)); } catch (error) { - this.logger.error('Failed to add event to Redis buffer', { error }); + this.logger.error('Failed to flush local buffer to Redis', { + error, + eventCount: eventsToFlush.length, + }); + } finally { + this.isFlushing = false; + } + } + + /** + * Throttled publish - publishes at most once per publishThrottleMs. + * Instead of publishing every event, we just signal that events were received. + * This reduces pub/sub load from 3000/s to 1/s. + */ + private scheduleThrottledPublish(event: IClickhouseEvent) { + // Always keep the latest event + this.pendingPublishEvent = event; + + const now = Date.now(); + const timeSinceLastPublish = now - this.lastPublishTime; + + // If enough time has passed, publish immediately + if (timeSinceLastPublish >= this.publishThrottleMs) { + this.executeThrottledPublish(); + return; + } + + // Otherwise, schedule a publish if not already scheduled + if (!this.publishTimer) { + const delay = this.publishThrottleMs - timeSinceLastPublish; + this.publishTimer = setTimeout(() => { + this.publishTimer = null; + this.executeThrottledPublish(); + }, delay); + } + } + + /** + * Execute the throttled publish with the latest pending event. + */ + private executeThrottledPublish() { + if (!this.pendingPublishEvent) { + return; + } + + const event = this.pendingPublishEvent; + this.pendingPublishEvent = null; + this.lastPublishTime = Date.now(); + + // Fire-and-forget publish (no multi = returns Promise) + const result = publishEvent('events', 'received', transformEvent(event)); + if (result instanceof Promise) { + result.catch(() => {}); } } @@ -440,18 +604,22 @@ return added }); } - private async incrementActiveVisitorCount( + /** + * Track active visitors using ZADD only. + * + * Optimization: Removed redundant heartbeat SET key. + * The ZADD score (timestamp) already tracks when a visitor was last seen. + * We use ZRANGEBYSCORE in getActiveVisitorCount to filter active visitors. + */ + private incrementActiveVisitorCount( multi: ReturnType, projectId: string, profileId: string, ) { - // Track active visitors and emit expiry events when inactive for TTL const now = Date.now(); const zsetKey = `live:visitors:${projectId}`; - const heartbeatKey = `live:visitor:${projectId}:${profileId}`; - return multi - .zadd(zsetKey, now, profileId) - .set(heartbeatKey, '1', 'EX', this.activeVisitorsExpiration); + // Only ZADD - the score is the timestamp, no need for separate heartbeat key + return multi.zadd(zsetKey, now, profileId); } public async getActiveVisitorCount(projectId: string): Promise { From bf39804767f30cb50478f183e978121d93d1bae8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 6 Feb 2026 13:03:14 +0000 Subject: [PATCH 3/6] remove unused file --- .../payments/scripts/assign-product-to-org.ts | 221 ------------------ 1 file changed, 221 deletions(-) delete mode 100644 packages/payments/scripts/assign-product-to-org.ts diff --git a/packages/payments/scripts/assign-product-to-org.ts b/packages/payments/scripts/assign-product-to-org.ts deleted file mode 100644 index 862c8ee39..000000000 --- a/packages/payments/scripts/assign-product-to-org.ts +++ /dev/null @@ -1,221 +0,0 @@ -import { db } from '@openpanel/db'; -import { Polar } from '@polar-sh/sdk'; -import inquirer from 'inquirer'; -import inquirerAutocomplete from 'inquirer-autocomplete-prompt'; -import { getSuccessUrl } from '..'; - -// Register the autocomplete prompt -inquirer.registerPrompt('autocomplete', inquirerAutocomplete); - -interface Answers { - isProduction: boolean; - polarApiKey: string; - productId: string; - organizationId: string; -} - -async function promptForInput() { - // Get all organizations first - const organizations = await db.organization.findMany({ - select: { - id: true, - name: true, - }, - }); - - // Step 1: Collect Polar credentials first - const polarCredentials = await inquirer.prompt<{ - isProduction: boolean; - polarApiKey: string; - polarOrganizationId: string; - }>([ - { - type: 'list', - name: 'isProduction', - message: 'Is this for production?', - choices: [ - { name: 'Yes', value: true }, - { name: 'No', value: false }, - ], - default: true, - }, - { - type: 'string', - name: 'polarApiKey', - message: 'Enter your Polar API key:', - validate: (input: string) => { - if (!input) return 'API key is required'; - return true; - }, - }, - ]); - - // Step 2: Initialize Polar client and fetch products - const polar = new Polar({ - accessToken: polarCredentials.polarApiKey, - server: polarCredentials.isProduction ? 'production' : 'sandbox', - }); - - console.log('Fetching products from Polar...'); - const productsResponse = await polar.products.list({ - limit: 100, - isArchived: false, - sorting: ['price_amount'], - }); - - const products = productsResponse.result.items; - - if (products.length === 0) { - throw new Error('No products found in Polar'); - } - - // Step 3: Continue with product selection and organization selection - const restOfAnswers = await inquirer.prompt<{ - productId: string; - organizationId: string; - }>([ - { - type: 'autocomplete', - name: 'productId', - message: 'Select product:', - source: (answersSoFar: any, input = '') => { - return products - .filter( - (product) => - product.name.toLowerCase().includes(input.toLowerCase()) || - product.id.toLowerCase().includes(input.toLowerCase()), - ) - .map((product) => { - const price = product.prices?.[0]; - const priceStr = - price && 'priceAmount' in price && price.priceAmount - ? `$${(price.priceAmount / 100).toFixed(2)}/${price.recurringInterval || 'month'}` - : 'No price'; - return { - name: `${product.name} (${priceStr})`, - value: product.id, - }; - }); - }, - }, - { - type: 'autocomplete', - name: 'organizationId', - message: 'Select organization:', - source: (answersSoFar: any, input = '') => { - return organizations - .filter( - (org) => - org.name.toLowerCase().includes(input.toLowerCase()) || - org.id.toLowerCase().includes(input.toLowerCase()), - ) - .map((org) => ({ - name: `${org.name} (${org.id})`, - value: org.id, - })); - }, - }, - ]); - - return { - ...polarCredentials, - ...restOfAnswers, - }; -} - -async function main() { - console.log('Assigning existing product to organization...'); - const input = await promptForInput(); - - const polar = new Polar({ - accessToken: input.polarApiKey, - server: input.isProduction ? 'production' : 'sandbox', - }); - - const organization = await db.organization.findUniqueOrThrow({ - where: { - id: input.organizationId, - }, - select: { - id: true, - name: true, - createdBy: { - select: { - id: true, - email: true, - firstName: true, - lastName: true, - }, - }, - projects: { - select: { - id: true, - }, - }, - }, - }); - - if (!organization.createdBy) { - throw new Error( - `Organization ${organization.name} does not have a creator. Cannot proceed.`, - ); - } - - const user = organization.createdBy; - - // Fetch product details for review - const product = await polar.products.get({ id: input.productId }); - const price = product.prices?.[0]; - const priceStr = - price && 'priceAmount' in price && price.priceAmount - ? `$${(price.priceAmount / 100).toFixed(2)}/${price.recurringInterval || 'month'}` - : 'No price'; - - console.log('\nReview the following settings:'); - console.table({ - product: product.name, - price: priceStr, - organization: organization.name, - email: user.email, - name: - [user.firstName, user.lastName].filter(Boolean).join(' ') || 'No name', - }); - - const { confirmed } = await inquirer.prompt([ - { - type: 'confirm', - name: 'confirmed', - message: 'Do you want to proceed?', - default: false, - }, - ]); - - if (!confirmed) { - console.log('Operation canceled'); - return; - } - - const checkoutLink = await polar.checkoutLinks.create({ - paymentProcessor: 'stripe', - productId: input.productId, - allowDiscountCodes: false, - metadata: { - organizationId: organization.id, - userId: user.id, - }, - successUrl: getSuccessUrl( - input.isProduction - ? 'https://dashboard.openpanel.dev' - : 'http://localhost:3000', - organization.id, - ), - }); - - console.log('\nCheckout link created:'); - console.table(checkoutLink); - console.log('\nProduct assigned successfully!'); -} - -main() - .catch(console.error) - .finally(() => db.$disconnect()); From bc08566cd4aae5af1968e82340752727df2d6481 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 6 Feb 2026 13:11:54 +0000 Subject: [PATCH 4/6] fix --- packages/db/src/buffers/event-buffer.ts | 150 ++++-------------------- 1 file changed, 20 insertions(+), 130 deletions(-) diff --git a/packages/db/src/buffers/event-buffer.ts b/packages/db/src/buffers/event-buffer.ts index 15d29b5b4..9c1e5eb92 100644 --- a/packages/db/src/buffers/event-buffer.ts +++ b/packages/db/src/buffers/event-buffer.ts @@ -14,9 +14,8 @@ import { import { BaseBuffer } from './base-buffer'; /** - * Simplified Event Buffer + * Event Buffer * - * Rules: * 1. All events go into a single list buffer (event_buffer:queue) * 2. screen_view events are handled specially: * - Store current screen_view as "last" for the session @@ -24,15 +23,8 @@ import { BaseBuffer } from './base-buffer'; * 3. session_end events: * - Retrieve the last screen_view (don't modify it) * - Push both screen_view and session_end to buffer - * 4. Flush: Simply process all events from the list buffer - * - * Optimizations: - * - Micro-batching: Events are buffered locally and flushed every 10ms to reduce Redis round-trips - * - Batched publishes: All PUBLISH commands are included in the multi pipeline - * - Simplified active visitor tracking: Only uses ZADD (removed redundant heartbeat SET) + * 4. Flush: Process all events from the list buffer */ - -// Pending event for local buffer interface PendingEvent { event: IClickhouseEvent; eventJson: string; @@ -41,7 +33,6 @@ interface PendingEvent { } export class EventBuffer extends BaseBuffer { - // Configurable limits private batchSize = process.env.EVENT_BUFFER_BATCH_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_BATCH_SIZE, 10) : 4000; @@ -49,58 +40,48 @@ export class EventBuffer extends BaseBuffer { ? Number.parseInt(process.env.EVENT_BUFFER_CHUNK_SIZE, 10) : 1000; - // Micro-batching configuration private microBatchIntervalMs = process.env.EVENT_BUFFER_MICRO_BATCH_MS ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_MS, 10) - : 10; // Flush every 10ms by default + : 10; private microBatchMaxSize = process.env.EVENT_BUFFER_MICRO_BATCH_SIZE ? Number.parseInt(process.env.EVENT_BUFFER_MICRO_BATCH_SIZE, 10) - : 100; // Or when we hit 100 events + : 100; - // Local event buffer for micro-batching private pendingEvents: PendingEvent[] = []; private flushTimer: ReturnType | null = null; private isFlushing = false; + /** Tracks consecutive flush failures for observability; reset on success. */ + private flushRetryCount = 0; - // Throttled publish configuration private publishThrottleMs = process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS ? Number.parseInt(process.env.EVENT_BUFFER_PUBLISH_THROTTLE_MS, 10) - : 1000; // Publish at most once per second + : 1000; private lastPublishTime = 0; private pendingPublishEvent: IClickhouseEvent | null = null; private publishTimer: ReturnType | null = null; private activeVisitorsExpiration = 60 * 5; // 5 minutes - - // LIST - Stores all events ready to be flushed private queueKey = 'event_buffer:queue'; - - // STRING - Tracks total buffer size incrementally protected bufferCounterKey = 'event_buffer:total_count'; - // Script SHAs for loaded Lua scripts private scriptShas: { addScreenView?: string; addSessionEnd?: string; } = {}; - // Hash key for storing last screen_view per session private getLastScreenViewKeyBySession(sessionId: string) { return `event_buffer:last_screen_view:session:${sessionId}`; } - // Hash key for storing last screen_view per profile private getLastScreenViewKeyByProfile(projectId: string, profileId: string) { return `event_buffer:last_screen_view:profile:${projectId}:${profileId}`; } /** - * Lua script for handling screen_view addition - RACE-CONDITION SAFE without GroupMQ + * Lua script for screen_view addition. + * Uses GETDEL for atomic get-and-delete to prevent race conditions. * - * Strategy: Use Redis GETDEL (atomic get-and-delete) to ensure only ONE thread - * can process the "last" screen_view at a time. - * - * KEYS[1] = last screen_view key (by session) - stores both event and timestamp as JSON + * KEYS[1] = last screen_view key (by session) * KEYS[2] = last screen_view key (by profile, may be empty) * KEYS[3] = queue key * KEYS[4] = buffer counter key @@ -115,24 +96,18 @@ local counterKey = KEYS[4] local newEventData = ARGV[1] local ttl = tonumber(ARGV[2]) --- GETDEL is atomic: get previous and delete in one operation --- This ensures only ONE thread gets the previous event local previousEventData = redis.call("GETDEL", sessionKey) --- Store new screen_view as last for session redis.call("SET", sessionKey, newEventData, "EX", ttl) --- Store new screen_view as last for profile (if key provided) if profileKey and profileKey ~= "" then redis.call("SET", profileKey, newEventData, "EX", ttl) end --- If there was a previous screen_view, add it to queue with calculated duration if previousEventData then local prev = cjson.decode(previousEventData) local curr = cjson.decode(newEventData) - -- Calculate duration (ensure non-negative to handle clock skew) if prev.ts and curr.ts then prev.event.duration = math.max(0, curr.ts - prev.ts) end @@ -146,9 +121,8 @@ return 0 `; /** - * Lua script for handling session_end - RACE-CONDITION SAFE - * - * Uses GETDEL to atomically retrieve and delete the last screen_view + * Lua script for session_end. + * Uses GETDEL to atomically retrieve and delete the last screen_view. * * KEYS[1] = last screen_view key (by session) * KEYS[2] = last screen_view key (by profile, may be empty) @@ -163,11 +137,9 @@ local queueKey = KEYS[3] local counterKey = KEYS[4] local sessionEndJson = ARGV[1] --- GETDEL is atomic: only ONE thread gets the last screen_view local previousEventData = redis.call("GETDEL", sessionKey) local added = 0 --- If there was a previous screen_view, add it to queue if previousEventData then local prev = cjson.decode(previousEventData) redis.call("RPUSH", queueKey, cjson.encode(prev.event)) @@ -175,12 +147,10 @@ if previousEventData then added = added + 1 end --- Add session_end to queue redis.call("RPUSH", queueKey, sessionEndJson) redis.call("INCR", counterKey) added = added + 1 --- Delete profile key if profileKey and profileKey ~= "" then redis.call("DEL", profileKey) end @@ -195,14 +165,9 @@ return added await this.processBuffer(); }, }); - // Load Lua scripts into Redis on startup this.loadScripts(); } - /** - * Load Lua scripts into Redis and cache their SHAs. - * This avoids sending the entire script on every call. - */ private async loadScripts() { try { const redis = getRedisCache(); @@ -224,27 +189,14 @@ return added } bulkAdd(events: IClickhouseEvent[]) { - // Add all events to local buffer - they will be flushed together for (const event of events) { this.add(event); } } - /** - * Add an event into the local buffer for micro-batching. - * - * Events are buffered locally and flushed to Redis every microBatchIntervalMs - * or when microBatchMaxSize is reached. This dramatically reduces Redis round-trips. - * - * Logic: - * - screen_view: Store as "last" for session, flush previous if exists - * - session_end: Flush last screen_view + session_end - * - Other events: Add directly to queue - */ add(event: IClickhouseEvent, _multi?: ReturnType) { const eventJson = JSON.stringify(event); - // Determine event type and prepare data let type: PendingEvent['type'] = 'regular'; let eventWithTimestamp: string | undefined; @@ -266,22 +218,18 @@ return added type, }; - // If a multi was provided (legacy bulkAdd pattern), add directly without batching if (_multi) { this.addToMulti(_multi, pendingEvent); return; } - // Add to local buffer for micro-batching this.pendingEvents.push(pendingEvent); - // Check if we should flush immediately due to size if (this.pendingEvents.length >= this.microBatchMaxSize) { this.flushLocalBuffer(); return; } - // Schedule flush if not already scheduled if (!this.flushTimer) { this.flushTimer = setTimeout(() => { this.flushTimer = null; @@ -290,10 +238,6 @@ return added } } - /** - * Add a single pending event to a multi pipeline. - * Used both for legacy _multi pattern and during batch flush. - */ private addToMulti(multi: ReturnType, pending: PendingEvent) { const { event, eventJson, eventWithTimestamp, type } = pending; @@ -333,11 +277,9 @@ return added eventJson, ); } else { - // Regular events go directly to queue multi.rpush(this.queueKey, eventJson).incr(this.bufferCounterKey); } - // Active visitor tracking (simplified - only ZADD, no redundant SET) if (event.profile_id) { this.incrementActiveVisitorCount( multi, @@ -347,12 +289,7 @@ return added } } - /** - * Force flush all pending events from local buffer to Redis immediately. - * Useful for testing or when you need to ensure all events are persisted. - */ public async flush() { - // Clear any pending timer if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; @@ -360,10 +297,6 @@ return added await this.flushLocalBuffer(); } - /** - * Flush all pending events from local buffer to Redis in a single pipeline. - * This is the core optimization - batching many events into one round-trip. - */ private async flushLocalBuffer() { if (this.isFlushing || this.pendingEvents.length === 0) { return; @@ -371,7 +304,6 @@ return added this.isFlushing = true; - // Grab current pending events and clear buffer const eventsToFlush = this.pendingEvents; this.pendingEvents = []; @@ -379,48 +311,44 @@ return added const redis = getRedisCache(); const multi = redis.multi(); - // Add all events to the pipeline for (const pending of eventsToFlush) { this.addToMulti(multi, pending); } await multi.exec(); - // Throttled publish - just signal that events were received - // Store the last event for publishing (we only need one to signal activity) + this.flushRetryCount = 0; + const lastEvent = eventsToFlush[eventsToFlush.length - 1]; if (lastEvent) { this.scheduleThrottledPublish(lastEvent.event); } } catch (error) { - this.logger.error('Failed to flush local buffer to Redis', { + // Re-queue failed events at the front to preserve order and avoid data loss + this.pendingEvents = eventsToFlush.concat(this.pendingEvents); + + this.flushRetryCount += 1; + this.logger.warn('Failed to flush local buffer to Redis; events re-queued', { error, eventCount: eventsToFlush.length, + flushRetryCount: this.flushRetryCount, }); } finally { this.isFlushing = false; } } - /** - * Throttled publish - publishes at most once per publishThrottleMs. - * Instead of publishing every event, we just signal that events were received. - * This reduces pub/sub load from 3000/s to 1/s. - */ private scheduleThrottledPublish(event: IClickhouseEvent) { - // Always keep the latest event this.pendingPublishEvent = event; const now = Date.now(); const timeSinceLastPublish = now - this.lastPublishTime; - // If enough time has passed, publish immediately if (timeSinceLastPublish >= this.publishThrottleMs) { this.executeThrottledPublish(); return; } - // Otherwise, schedule a publish if not already scheduled if (!this.publishTimer) { const delay = this.publishThrottleMs - timeSinceLastPublish; this.publishTimer = setTimeout(() => { @@ -430,9 +358,6 @@ return added } } - /** - * Execute the throttled publish with the latest pending event. - */ private executeThrottledPublish() { if (!this.pendingPublishEvent) { return; @@ -442,17 +367,12 @@ return added this.pendingPublishEvent = null; this.lastPublishTime = Date.now(); - // Fire-and-forget publish (no multi = returns Promise) const result = publishEvent('events', 'received', transformEvent(event)); if (result instanceof Promise) { result.catch(() => {}); } } - /** - * Execute a Lua script using EVALSHA (cached) or fallback to EVAL. - * This avoids sending the entire script on every call. - */ private evalScript( multi: ReturnType, scriptName: keyof typeof this.scriptShas, @@ -463,32 +383,18 @@ return added const sha = this.scriptShas[scriptName]; if (sha) { - // Use EVALSHA with cached SHA multi.evalsha(sha, numKeys, ...args); } else { - // Fallback to EVAL and try to reload script multi.eval(scriptContent, numKeys, ...args); this.logger.warn(`Script ${scriptName} not loaded, using EVAL fallback`); - // Attempt to reload scripts in background this.loadScripts(); } } - /** - * Process the Redis buffer - simplified version. - * - * Simply: - * 1. Fetch events from the queue (up to batchSize) - * 2. Parse and sort them - * 3. Insert into ClickHouse in chunks - * 4. Publish saved events - * 5. Clean up processed events from queue - */ async processBuffer() { const redis = getRedisCache(); try { - // Fetch events from queue const queueEvents = await redis.lrange( this.queueKey, 0, @@ -500,7 +406,6 @@ return added return; } - // Parse events const eventsToClickhouse: IClickhouseEvent[] = []; for (const eventStr of queueEvents) { const event = getSafeJson(eventStr); @@ -514,14 +419,12 @@ return added return; } - // Sort events by creation time eventsToClickhouse.sort( (a, b) => new Date(a.created_at || 0).getTime() - new Date(b.created_at || 0).getTime(), ); - // Insert events into ClickHouse in chunks this.logger.info('Inserting events into ClickHouse', { totalEvents: eventsToClickhouse.length, chunks: Math.ceil(eventsToClickhouse.length / this.chunkSize), @@ -535,14 +438,12 @@ return added }); } - // Publish "saved" events const pubMulti = getRedisPub().multi(); for (const event of eventsToClickhouse) { await publishEvent('events', 'saved', transformEvent(event), pubMulti); } await pubMulti.exec(); - // Clean up processed events from queue await redis .multi() .ltrim(this.queueKey, queueEvents.length, -1) @@ -558,9 +459,6 @@ return added } } - /** - * Retrieve the latest screen_view event for a given session or profile - */ public async getLastScreenView( params: | { @@ -604,13 +502,6 @@ return added }); } - /** - * Track active visitors using ZADD only. - * - * Optimization: Removed redundant heartbeat SET key. - * The ZADD score (timestamp) already tracks when a visitor was last seen. - * We use ZRANGEBYSCORE in getActiveVisitorCount to filter active visitors. - */ private incrementActiveVisitorCount( multi: ReturnType, projectId: string, @@ -618,7 +509,6 @@ return added ) { const now = Date.now(); const zsetKey = `live:visitors:${projectId}`; - // Only ZADD - the score is the timestamp, no need for separate heartbeat key return multi.zadd(zsetKey, now, profileId); } From a672b73947f875560dd7961a7c86318e0a135c25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Wed, 11 Mar 2026 23:28:20 +0100 Subject: [PATCH 5/6] wip --- apps/api/src/controllers/live.controller.ts | 96 +++++++++--------- .../src/components/events/event-listener.tsx | 37 ++----- .../src/components/events/table/index.tsx | 9 +- .../onboarding/onboarding-verify-listener.tsx | 32 ++---- .../realtime/realtime-active-sessions.tsx | 98 ++++++++----------- ...ationId.$projectId.events._tabs.events.tsx | 2 +- ...pp.$organizationId.$projectId.realtime.tsx | 23 +++-- .../_steps.onboarding.$projectId.verify.tsx | 28 ++---- packages/db/src/buffers/event-buffer.ts | 96 +++++++----------- packages/redis/publisher.ts | 3 +- packages/trpc/src/routers/realtime.ts | 50 ++++------ 11 files changed, 196 insertions(+), 278 deletions(-) diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index cd7afe914..332968ad6 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -1,16 +1,13 @@ -import type { FastifyRequest } from 'fastify'; -import superjson from 'superjson'; - import type { WebSocket } from '@fastify/websocket'; -import { - eventBuffer, - getProfileById, - transformMinimalEvent, -} from '@openpanel/db'; +import { eventBuffer } from '@openpanel/db'; import { setSuperJson } from '@openpanel/json'; -import { subscribeToPublishedEvent } from '@openpanel/redis'; +import { + psubscribeToPublishedEvent, + subscribeToPublishedEvent, +} from '@openpanel/redis'; import { getProjectAccess } from '@openpanel/trpc'; import { getOrganizationAccess } from '@openpanel/trpc/src/access'; +import type { FastifyRequest } from 'fastify'; export function wsVisitors( socket: WebSocket, @@ -18,19 +15,38 @@ export function wsVisitors( Params: { projectId: string; }; - }>, + }> ) { const { params } = req; - const unsubscribe = subscribeToPublishedEvent('events', 'saved', (event) => { - if (event?.projectId === params.projectId) { - eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { - socket.send(String(count)); - }); + const sendCount = () => { + eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { + socket.send(String(count)); + }); + }; + + const unsubscribe = subscribeToPublishedEvent( + 'events', + 'batch', + ({ projectId }) => { + if (projectId === params.projectId) { + sendCount(); + } } - }); + ); + + const punsubscribe = psubscribeToPublishedEvent( + '__keyevent@0__:expired', + (key) => { + const [, , projectId] = key.split(':'); + if (projectId === params.projectId) { + sendCount(); + } + } + ); socket.on('close', () => { unsubscribe(); + punsubscribe(); }); } @@ -42,18 +58,10 @@ export async function wsProjectEvents( }; Querystring: { token?: string; - type?: 'saved' | 'received'; }; - }>, + }> ) { - const { params, query } = req; - const type = query.type || 'saved'; - - if (!['saved', 'received'].includes(type)) { - socket.send('Invalid type'); - socket.close(); - return; - } + const { params } = req; const userId = req.session?.userId; if (!userId) { @@ -67,24 +75,20 @@ export async function wsProjectEvents( projectId: params.projectId, }); + if (!access) { + socket.send('No access'); + socket.close(); + return; + } + const unsubscribe = subscribeToPublishedEvent( 'events', - type, - async (event) => { - if (event.projectId === params.projectId) { - const profile = await getProfileById(event.profileId, event.projectId); - socket.send( - superjson.stringify( - access - ? { - ...event, - profile, - } - : transformMinimalEvent(event), - ), - ); + 'batch', + ({ projectId, count }) => { + if (projectId === params.projectId) { + socket.send(setSuperJson({ count })); } - }, + } ); socket.on('close', () => unsubscribe()); @@ -96,7 +100,7 @@ export async function wsProjectNotifications( Params: { projectId: string; }; - }>, + }> ) { const { params } = req; const userId = req.session?.userId; @@ -123,9 +127,9 @@ export async function wsProjectNotifications( 'created', (notification) => { if (notification.projectId === params.projectId) { - socket.send(superjson.stringify(notification)); + socket.send(setSuperJson(notification)); } - }, + } ); socket.on('close', () => unsubscribe()); @@ -137,7 +141,7 @@ export async function wsOrganizationEvents( Params: { organizationId: string; }; - }>, + }> ) { const { params } = req; const userId = req.session?.userId; @@ -164,7 +168,7 @@ export async function wsOrganizationEvents( 'subscription_updated', (message) => { socket.send(setSuperJson(message)); - }, + } ); socket.on('close', () => unsubscribe()); diff --git a/apps/start/src/components/events/event-listener.tsx b/apps/start/src/components/events/event-listener.tsx index defabb7d1..08df9b61d 100644 --- a/apps/start/src/components/events/event-listener.tsx +++ b/apps/start/src/components/events/event-listener.tsx @@ -1,3 +1,4 @@ +import { AnimatedNumber } from '../animated-number'; import { Tooltip, TooltipContent, @@ -8,71 +9,53 @@ import { useDebounceState } from '@/hooks/use-debounce-state'; import useWS from '@/hooks/use-ws'; import { cn } from '@/utils/cn'; -import type { IServiceEvent, IServiceEventMinimal } from '@openpanel/db'; -import { useParams } from '@tanstack/react-router'; -import { AnimatedNumber } from '../animated-number'; - export default function EventListener({ onRefresh, }: { onRefresh: () => void; }) { - const params = useParams({ - strict: false, - }); const { projectId } = useAppParams(); const counter = useDebounceState(0, 1000); - useWS( + useWS<{ count: number }>( `/live/events/${projectId}`, - (event) => { - if (event) { - const isProfilePage = !!params?.profileId; - if (isProfilePage) { - const profile = 'profile' in event ? event.profile : null; - if (profile?.id === params?.profileId) { - counter.set((prev) => prev + 1); - } - return; - } - - counter.set((prev) => prev + 1); - } + ({ count }) => { + counter.set((prev) => prev + count); }, { debounce: { delay: 1000, maxWait: 5000, }, - }, + } ); return ( diff --git a/apps/start/src/components/events/table/index.tsx b/apps/start/src/components/events/table/index.tsx index 300713997..95ed8e2d7 100644 --- a/apps/start/src/components/events/table/index.tsx +++ b/apps/start/src/components/events/table/index.tsx @@ -35,6 +35,7 @@ type Props = { >, unknown >; + showEventListener?: boolean; }; const LOADING_DATA = [{}, {}, {}, {}, {}, {}, {}, {}, {}] as IServiceEvent[]; @@ -215,7 +216,7 @@ const VirtualizedEventsTable = ({ ); }; -export const EventsTable = ({ query }: Props) => { +export const EventsTable = ({ query, showEventListener = false }: Props) => { const { isLoading } = query; const columns = useColumns(); @@ -272,7 +273,7 @@ export const EventsTable = ({ query }: Props) => { return ( <> - +
{ function EventsTableToolbar({ query, table, + showEventListener, }: { query: Props['query']; table: Table; + showEventListener: boolean; }) { const { projectId } = useAppParams(); const [startDate, setStartDate] = useQueryState( @@ -305,7 +308,7 @@ function EventsTableToolbar({ return (
- query.refetch()} /> + {showEventListener && query.refetch()} />}