diff --git a/apps/api/package.json b/apps/api/package.json index 3cb8b99f5..5f63bc5d9 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -30,7 +30,6 @@ "@openpanel/logger": "workspace:*", "@openpanel/payments": "workspace:*", "@openpanel/queue": "workspace:*", - "groupmq": "catalog:", "@openpanel/redis": "workspace:*", "@openpanel/trpc": "workspace:*", "@openpanel/validation": "workspace:*", @@ -40,6 +39,7 @@ "fastify": "^5.6.1", "fastify-metrics": "^12.1.0", "fastify-raw-body": "^5.0.0", + "groupmq": "catalog:", "jsonwebtoken": "^9.0.2", "ramda": "^0.29.1", "sharp": "^0.33.5", diff --git a/apps/api/src/bots/index.ts b/apps/api/src/bots/index.ts index 12d6b7387..391aa2900 100644 --- a/apps/api/src/bots/index.ts +++ b/apps/api/src/bots/index.ts @@ -1,4 +1,4 @@ -import { cacheable, cacheableLru } from '@openpanel/redis'; +import { cacheable } from '@openpanel/redis'; import bots from './bots'; // Pre-compile regex patterns at module load time @@ -15,7 +15,7 @@ const compiledBots = bots.map((bot) => { const regexBots = compiledBots.filter((bot) => 'compiledRegex' in bot); const includesBots = compiledBots.filter((bot) => 'includes' in bot); -export const isBot = cacheableLru( +export const isBot = cacheable( 'is-bot', (ua: string) => { // Check simple string patterns first (fast) @@ -40,8 +40,5 @@ export const isBot = cacheableLru( return null; }, - { - maxSize: 1000, - ttl: 60 * 5, - }, + 60 * 5 ); diff --git a/apps/api/src/controllers/live.controller.ts b/apps/api/src/controllers/live.controller.ts index 29931aa84..488e67138 100644 --- a/apps/api/src/controllers/live.controller.ts +++ b/apps/api/src/controllers/live.controller.ts @@ -1,12 +1,5 @@ -import type { FastifyRequest } from 'fastify'; -import superjson from 'superjson'; - import type { WebSocket } from '@fastify/websocket'; -import { - eventBuffer, - getProfileById, - transformMinimalEvent, -} from '@openpanel/db'; +import { eventBuffer } from '@openpanel/db'; import { setSuperJson } from '@openpanel/json'; import { psubscribeToPublishedEvent, @@ -14,10 +7,7 @@ import { } from '@openpanel/redis'; import { getProjectAccess } from '@openpanel/trpc'; import { getOrganizationAccess } from '@openpanel/trpc/src/access'; - -export function getLiveEventInfo(key: string) { - return key.split(':').slice(2) as [string, string]; -} +import type { FastifyRequest } from 'fastify'; export function wsVisitors( socket: WebSocket, @@ -25,27 +15,38 @@ export function wsVisitors( Params: { projectId: string; }; - }>, + }> ) { const { params } = req; - const unsubscribe = subscribeToPublishedEvent('events', 'saved', (event) => { - if (event?.projectId === params.projectId) { - eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { + const sendCount = () => { + eventBuffer + .getActiveVisitorCount(params.projectId) + .then((count) => { socket.send(String(count)); + }) + .catch(() => { + socket.send('0'); }); + }; + + const unsubscribe = subscribeToPublishedEvent( + 'events', + 'batch', + ({ projectId }) => { + if (projectId === params.projectId) { + sendCount(); + } } - }); + ); const punsubscribe = psubscribeToPublishedEvent( '__keyevent@0__:expired', (key) => { - const [projectId] = getLiveEventInfo(key); - if (projectId && projectId === params.projectId) { - eventBuffer.getActiveVisitorCount(params.projectId).then((count) => { - socket.send(String(count)); - }); + const [, , projectId] = key.split(':'); + if (projectId === params.projectId) { + sendCount(); } - }, + } ); socket.on('close', () => { @@ -62,18 +63,10 @@ export async function wsProjectEvents( }; Querystring: { token?: string; - type?: 'saved' | 'received'; }; - }>, + }> ) { - const { params, query } = req; - const type = query.type || 'saved'; - - if (!['saved', 'received'].includes(type)) { - socket.send('Invalid type'); - socket.close(); - return; - } + const { params } = req; const userId = req.session?.userId; if (!userId) { @@ -87,24 +80,20 @@ export async function wsProjectEvents( projectId: params.projectId, }); + if (!access) { + socket.send('No access'); + socket.close(); + return; + } + const unsubscribe = subscribeToPublishedEvent( 'events', - type, - async (event) => { - if (event.projectId === params.projectId) { - const profile = await getProfileById(event.profileId, event.projectId); - socket.send( - superjson.stringify( - access - ? { - ...event, - profile, - } - : transformMinimalEvent(event), - ), - ); + 'batch', + ({ projectId, count }) => { + if (projectId === params.projectId) { + socket.send(setSuperJson({ count })); } - }, + } ); socket.on('close', () => unsubscribe()); @@ -116,7 +105,7 @@ export async function wsProjectNotifications( Params: { projectId: string; }; - }>, + }> ) { const { params } = req; const userId = req.session?.userId; @@ -143,9 +132,9 @@ export async function wsProjectNotifications( 'created', (notification) => { if (notification.projectId === params.projectId) { - socket.send(superjson.stringify(notification)); + socket.send(setSuperJson(notification)); } - }, + } ); socket.on('close', () => unsubscribe()); @@ -157,7 +146,7 @@ export async function wsOrganizationEvents( Params: { organizationId: string; }; - }>, + }> ) { const { params } = req; const userId = req.session?.userId; @@ -184,7 +173,7 @@ export async function wsOrganizationEvents( 'subscription_updated', (message) => { socket.send(setSuperJson(message)); - }, + } ); socket.on('close', () => unsubscribe()); diff --git a/apps/api/src/controllers/manage.controller.ts b/apps/api/src/controllers/manage.controller.ts index e1d3bf67f..1d162851a 100644 --- a/apps/api/src/controllers/manage.controller.ts +++ b/apps/api/src/controllers/manage.controller.ts @@ -1,5 +1,4 @@ import crypto from 'node:crypto'; -import { HttpError } from '@/utils/errors'; import { stripTrailingSlash } from '@openpanel/common'; import { hashPassword } from '@openpanel/common/server'; import { @@ -10,6 +9,7 @@ import { } from '@openpanel/db'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { z } from 'zod'; +import { HttpError } from '@/utils/errors'; // Validation schemas const zCreateProject = z.object({ @@ -57,7 +57,7 @@ const zUpdateReference = z.object({ // Projects CRUD export async function listProjects( request: FastifyRequest, - reply: FastifyReply, + reply: FastifyReply ) { const projects = await db.project.findMany({ where: { @@ -74,7 +74,7 @@ export async function listProjects( export async function getProject( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const project = await db.project.findFirst({ where: { @@ -92,7 +92,7 @@ export async function getProject( export async function createProject( request: FastifyRequest<{ Body: z.infer }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zCreateProject.safeParse(request.body); @@ -139,12 +139,9 @@ export async function createProject( }, }); - // Clear cache await Promise.all([ getProjectByIdCached.clear(project.id), - project.clients.map((client) => { - getClientByIdCached.clear(client.id); - }), + ...project.clients.map((client) => getClientByIdCached.clear(client.id)), ]); reply.send({ @@ -165,7 +162,7 @@ export async function updateProject( Params: { id: string }; Body: z.infer; }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zUpdateProject.safeParse(request.body); @@ -223,12 +220,9 @@ export async function updateProject( data: updateData, }); - // Clear cache await Promise.all([ getProjectByIdCached.clear(project.id), - existing.clients.map((client) => { - getClientByIdCached.clear(client.id); - }), + ...existing.clients.map((client) => getClientByIdCached.clear(client.id)), ]); reply.send({ data: project }); @@ -236,7 +230,7 @@ export async function updateProject( export async function deleteProject( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const project = await db.project.findFirst({ where: { @@ -266,7 +260,7 @@ export async function deleteProject( // Clients CRUD export async function listClients( request: FastifyRequest<{ Querystring: { projectId?: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const where: any = { organizationId: request.client!.organizationId, @@ -300,7 +294,7 @@ export async function listClients( export async function getClient( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const client = await db.client.findFirst({ where: { @@ -318,7 +312,7 @@ export async function getClient( export async function createClient( request: FastifyRequest<{ Body: z.infer }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zCreateClient.safeParse(request.body); @@ -374,7 +368,7 @@ export async function updateClient( Params: { id: string }; Body: z.infer; }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zUpdateClient.safeParse(request.body); @@ -417,7 +411,7 @@ export async function updateClient( export async function deleteClient( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const client = await db.client.findFirst({ where: { @@ -444,7 +438,7 @@ export async function deleteClient( // References CRUD export async function listReferences( request: FastifyRequest<{ Querystring: { projectId?: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const where: any = {}; @@ -488,7 +482,7 @@ export async function listReferences( export async function getReference( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const reference = await db.reference.findUnique({ where: { @@ -516,7 +510,7 @@ export async function getReference( export async function createReference( request: FastifyRequest<{ Body: z.infer }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zCreateReference.safeParse(request.body); @@ -559,7 +553,7 @@ export async function updateReference( Params: { id: string }; Body: z.infer; }>, - reply: FastifyReply, + reply: FastifyReply ) { const parsed = zUpdateReference.safeParse(request.body); @@ -616,7 +610,7 @@ export async function updateReference( export async function deleteReference( request: FastifyRequest<{ Params: { id: string } }>, - reply: FastifyReply, + reply: FastifyReply ) { const reference = await db.reference.findUnique({ where: { diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index f7a3c61f0..0e530f933 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -7,7 +7,10 @@ import { upsertProfile, } from '@openpanel/db'; import { type GeoLocation, getGeoLocation } from '@openpanel/geo'; -import { getEventsGroupQueueShard } from '@openpanel/queue'; +import { + type EventsQueuePayloadIncomingEvent, + getEventsGroupQueueShard, +} from '@openpanel/queue'; import { getRedisCache } from '@openpanel/redis'; import { type IDecrementPayload, @@ -112,6 +115,7 @@ interface TrackContext { identity?: IIdentifyPayload; deviceId: string; sessionId: string; + session?: EventsQueuePayloadIncomingEvent['payload']['session']; geo: GeoLocation; } @@ -141,19 +145,21 @@ async function buildContext( validatedBody.payload.profileId = profileId; } + const overrideDeviceId = + validatedBody.type === 'track' && + typeof validatedBody.payload?.properties?.__deviceId === 'string' + ? validatedBody.payload?.properties.__deviceId + : undefined; + // Get geo location (needed for track and identify) const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]); - const { deviceId, sessionId } = await getDeviceId({ + const deviceIdResult = await getDeviceId({ projectId, ip, ua, salts, - overrideDeviceId: - validatedBody.type === 'track' && - typeof validatedBody.payload?.properties?.__deviceId === 'string' - ? validatedBody.payload?.properties.__deviceId - : undefined, + overrideDeviceId, }); return { @@ -166,8 +172,9 @@ async function buildContext( isFromPast: timestamp.isTimestampFromThePast, }, identity, - deviceId, - sessionId, + deviceId: deviceIdResult.deviceId, + sessionId: deviceIdResult.sessionId, + session: deviceIdResult.session, geo, }; } @@ -176,13 +183,14 @@ async function handleTrack( payload: ITrackPayload, context: TrackContext ): Promise { - const { projectId, deviceId, geo, headers, timestamp, sessionId } = context; + const { projectId, deviceId, geo, headers, timestamp, sessionId, session } = + context; const uaInfo = parseUserAgent(headers['user-agent'], payload.properties); const groupId = uaInfo.isServer ? payload.profileId ? `${projectId}:${payload.profileId}` - : `${projectId}:${generateId()}` + : undefined : deviceId; const jobId = [ slug(payload.name), @@ -203,7 +211,7 @@ async function handleTrack( } promises.push( - getEventsGroupQueueShard(groupId).add({ + getEventsGroupQueueShard(groupId || generateId()).add({ orderMs: timestamp.value, data: { projectId, @@ -217,6 +225,7 @@ async function handleTrack( geo, deviceId, sessionId, + session, }, groupId, jobId, diff --git a/apps/api/src/hooks/is-bot.hook.ts b/apps/api/src/hooks/is-bot.hook.ts index c01a231a6..09346fbde 100644 --- a/apps/api/src/hooks/is-bot.hook.ts +++ b/apps/api/src/hooks/is-bot.hook.ts @@ -1,20 +1,19 @@ -import { isBot } from '@/bots'; import { createBotEvent } from '@openpanel/db'; import type { DeprecatedPostEventPayload, ITrackHandlerPayload, } from '@openpanel/validation'; - import type { FastifyReply, FastifyRequest } from 'fastify'; +import { isBot } from '@/bots'; export async function isBotHook( req: FastifyRequest<{ Body: ITrackHandlerPayload | DeprecatedPostEventPayload; }>, - reply: FastifyReply, + reply: FastifyReply ) { const bot = req.headers['user-agent'] - ? isBot(req.headers['user-agent']) + ? await isBot(req.headers['user-agent']) : null; if (bot && req.client?.projectId) { @@ -44,6 +43,6 @@ export async function isBotHook( } } - return reply.status(202).send(); + return reply.status(202).send({ bot }); } } diff --git a/apps/api/src/routes/track.router.ts b/apps/api/src/routes/track.router.ts index 2f94a8eb4..1bb04c4b9 100644 --- a/apps/api/src/routes/track.router.ts +++ b/apps/api/src/routes/track.router.ts @@ -1,6 +1,5 @@ -import { fetchDeviceId, handler } from '@/controllers/track.controller'; import type { FastifyPluginCallback } from 'fastify'; - +import { fetchDeviceId, handler } from '@/controllers/track.controller'; import { clientHook } from '@/hooks/client.hook'; import { duplicateHook } from '@/hooks/duplicate.hook'; import { isBotHook } from '@/hooks/is-bot.hook'; @@ -13,7 +12,7 @@ const trackRouter: FastifyPluginCallback = async (fastify) => { fastify.route({ method: 'POST', url: '/', - handler: handler, + handler, }); fastify.route({ diff --git a/apps/api/src/utils/ids.ts b/apps/api/src/utils/ids.ts index db6b9aa5e..f2bd9a453 100644 --- a/apps/api/src/utils/ids.ts +++ b/apps/api/src/utils/ids.ts @@ -1,7 +1,12 @@ import crypto from 'node:crypto'; import { generateDeviceId } from '@openpanel/common/server'; import { getSafeJson } from '@openpanel/json'; +import type { + EventsQueuePayloadCreateSessionEnd, + EventsQueuePayloadIncomingEvent, +} from '@openpanel/queue'; import { getRedisCache } from '@openpanel/redis'; +import { pick } from 'ramda'; export async function getDeviceId({ projectId, @@ -37,14 +42,20 @@ export async function getDeviceId({ ua, }); - return await getDeviceIdFromSession({ + return await getInfoFromSession({ projectId, currentDeviceId, previousDeviceId, }); } -async function getDeviceIdFromSession({ +interface DeviceIdResult { + deviceId: string; + sessionId: string; + session?: EventsQueuePayloadIncomingEvent['payload']['session']; +} + +async function getInfoFromSession({ projectId, currentDeviceId, previousDeviceId, @@ -52,7 +63,7 @@ async function getDeviceIdFromSession({ projectId: string; currentDeviceId: string; previousDeviceId: string; -}) { +}): Promise { try { const multi = getRedisCache().multi(); multi.hget( @@ -65,21 +76,33 @@ async function getDeviceIdFromSession({ ); const res = await multi.exec(); if (res?.[0]?.[1]) { - const data = getSafeJson<{ payload: { sessionId: string } }>( + const data = getSafeJson( (res?.[0]?.[1] as string) ?? '' ); if (data) { - const sessionId = data.payload.sessionId; - return { deviceId: currentDeviceId, sessionId }; + return { + deviceId: currentDeviceId, + sessionId: data.payload.sessionId, + session: pick( + ['referrer', 'referrerName', 'referrerType'], + data.payload + ), + }; } } if (res?.[1]?.[1]) { - const data = getSafeJson<{ payload: { sessionId: string } }>( + const data = getSafeJson( (res?.[1]?.[1] as string) ?? '' ); if (data) { - const sessionId = data.payload.sessionId; - return { deviceId: previousDeviceId, sessionId }; + return { + deviceId: previousDeviceId, + sessionId: data.payload.sessionId, + session: pick( + ['referrer', 'referrerName', 'referrerType'], + data.payload + ), + }; } } } catch (error) { diff --git a/apps/start/src/components/events/event-listener.tsx b/apps/start/src/components/events/event-listener.tsx index defabb7d1..08df9b61d 100644 --- a/apps/start/src/components/events/event-listener.tsx +++ b/apps/start/src/components/events/event-listener.tsx @@ -1,3 +1,4 @@ +import { AnimatedNumber } from '../animated-number'; import { Tooltip, TooltipContent, @@ -8,71 +9,53 @@ import { useDebounceState } from '@/hooks/use-debounce-state'; import useWS from '@/hooks/use-ws'; import { cn } from '@/utils/cn'; -import type { IServiceEvent, IServiceEventMinimal } from '@openpanel/db'; -import { useParams } from '@tanstack/react-router'; -import { AnimatedNumber } from '../animated-number'; - export default function EventListener({ onRefresh, }: { onRefresh: () => void; }) { - const params = useParams({ - strict: false, - }); const { projectId } = useAppParams(); const counter = useDebounceState(0, 1000); - useWS( + useWS<{ count: number }>( `/live/events/${projectId}`, - (event) => { - if (event) { - const isProfilePage = !!params?.profileId; - if (isProfilePage) { - const profile = 'profile' in event ? event.profile : null; - if (profile?.id === params?.profileId) { - counter.set((prev) => prev + 1); - } - return; - } - - counter.set((prev) => prev + 1); - } + ({ count }) => { + counter.set((prev) => prev + count); }, { debounce: { delay: 1000, maxWait: 5000, }, - }, + } ); return ( diff --git a/apps/start/src/components/events/table/index.tsx b/apps/start/src/components/events/table/index.tsx index 300713997..95ed8e2d7 100644 --- a/apps/start/src/components/events/table/index.tsx +++ b/apps/start/src/components/events/table/index.tsx @@ -35,6 +35,7 @@ type Props = { >, unknown >; + showEventListener?: boolean; }; const LOADING_DATA = [{}, {}, {}, {}, {}, {}, {}, {}, {}] as IServiceEvent[]; @@ -215,7 +216,7 @@ const VirtualizedEventsTable = ({ ); }; -export const EventsTable = ({ query }: Props) => { +export const EventsTable = ({ query, showEventListener = false }: Props) => { const { isLoading } = query; const columns = useColumns(); @@ -272,7 +273,7 @@ export const EventsTable = ({ query }: Props) => { return ( <> - +
{ function EventsTableToolbar({ query, table, + showEventListener, }: { query: Props['query']; table: Table; + showEventListener: boolean; }) { const { projectId } = useAppParams(); const [startDate, setStartDate] = useQueryState( @@ -305,7 +308,7 @@ function EventsTableToolbar({ return (
- query.refetch()} /> + {showEventListener && query.refetch()} />}