Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
"@openpanel/logger": "workspace:*",
"@openpanel/payments": "workspace:*",
"@openpanel/queue": "workspace:*",
"groupmq": "catalog:",
"@openpanel/redis": "workspace:*",
"@openpanel/trpc": "workspace:*",
"@openpanel/validation": "workspace:*",
Expand All @@ -40,6 +39,7 @@
"fastify": "^5.6.1",
"fastify-metrics": "^12.1.0",
"fastify-raw-body": "^5.0.0",
"groupmq": "catalog:",
"jsonwebtoken": "^9.0.2",
"ramda": "^0.29.1",
"sharp": "^0.33.5",
Expand Down
9 changes: 3 additions & 6 deletions apps/api/src/bots/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { cacheable, cacheableLru } from '@openpanel/redis';
import { cacheable } from '@openpanel/redis';
import bots from './bots';

// Pre-compile regex patterns at module load time
Expand All @@ -15,7 +15,7 @@ const compiledBots = bots.map((bot) => {
const regexBots = compiledBots.filter((bot) => 'compiledRegex' in bot);
const includesBots = compiledBots.filter((bot) => 'includes' in bot);

export const isBot = cacheableLru(
export const isBot = cacheable(
'is-bot',
(ua: string) => {
// Check simple string patterns first (fast)
Expand All @@ -40,8 +40,5 @@ export const isBot = cacheableLru(

return null;
},
{
maxSize: 1000,
ttl: 60 * 5,
},
60 * 5
);
95 changes: 42 additions & 53 deletions apps/api/src/controllers/live.controller.ts
Original file line number Diff line number Diff line change
@@ -1,51 +1,52 @@
import type { FastifyRequest } from 'fastify';
import superjson from 'superjson';

import type { WebSocket } from '@fastify/websocket';
import {
eventBuffer,
getProfileById,
transformMinimalEvent,
} from '@openpanel/db';
import { eventBuffer } from '@openpanel/db';
import { setSuperJson } from '@openpanel/json';
import {
psubscribeToPublishedEvent,
subscribeToPublishedEvent,
} from '@openpanel/redis';
import { getProjectAccess } from '@openpanel/trpc';
import { getOrganizationAccess } from '@openpanel/trpc/src/access';

export function getLiveEventInfo(key: string) {
return key.split(':').slice(2) as [string, string];
}
import type { FastifyRequest } from 'fastify';

export function wsVisitors(
socket: WebSocket,
req: FastifyRequest<{
Params: {
projectId: string;
};
}>,
}>
) {
const { params } = req;
const unsubscribe = subscribeToPublishedEvent('events', 'saved', (event) => {
if (event?.projectId === params.projectId) {
eventBuffer.getActiveVisitorCount(params.projectId).then((count) => {
const sendCount = () => {
eventBuffer
.getActiveVisitorCount(params.projectId)
.then((count) => {
socket.send(String(count));
})
.catch(() => {
socket.send('0');
});
};

const unsubscribe = subscribeToPublishedEvent(
'events',
'batch',
({ projectId }) => {
if (projectId === params.projectId) {
sendCount();
}
}
});
);

const punsubscribe = psubscribeToPublishedEvent(
'__keyevent@0__:expired',
(key) => {
const [projectId] = getLiveEventInfo(key);
if (projectId && projectId === params.projectId) {
eventBuffer.getActiveVisitorCount(params.projectId).then((count) => {
socket.send(String(count));
});
const [, , projectId] = key.split(':');
if (projectId === params.projectId) {
sendCount();
}
},
}
);

socket.on('close', () => {
Expand All @@ -62,18 +63,10 @@ export async function wsProjectEvents(
};
Querystring: {
token?: string;
type?: 'saved' | 'received';
};
}>,
}>
) {
const { params, query } = req;
const type = query.type || 'saved';

if (!['saved', 'received'].includes(type)) {
socket.send('Invalid type');
socket.close();
return;
}
const { params } = req;

const userId = req.session?.userId;
if (!userId) {
Expand All @@ -87,24 +80,20 @@ export async function wsProjectEvents(
projectId: params.projectId,
});

if (!access) {
socket.send('No access');
socket.close();
return;
}
Comment on lines +83 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Keep the websocket wire format consistent on access denial.

This branch sends a raw string on an endpoint whose normal messages are SuperJSON. That makes denied connections speak a different protocol than successful ones, which can break client-side parsing. Prefer closing with a policy code/reason, or send a structured SuperJSON error payload before closing.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/api/src/controllers/live.controller.ts` around lines 78 - 82, The
access-denied branch sends a raw string while normal websocket messages use
SuperJSON; update the branch in live.controller.ts where you check "if
(!access)" to produce a consistent wire format by either (a) serializing a
structured error payload with the same SuperJSON format used elsewhere (import
SuperJSON and send SuperJSON.stringify({ type: 'error', message: 'No access' })
via socket.send) or (b) closing the socket with a proper WebSocket policy
code/reason (e.g., socket.close(1008, 'No access')) so clients can handle it
uniformly; change the socket.send/socket.close calls accordingly and ensure the
error shape matches other message handlers.


const unsubscribe = subscribeToPublishedEvent(
'events',
type,
async (event) => {
if (event.projectId === params.projectId) {
const profile = await getProfileById(event.profileId, event.projectId);
socket.send(
superjson.stringify(
access
? {
...event,
profile,
}
: transformMinimalEvent(event),
),
);
'batch',
({ projectId, count }) => {
if (projectId === params.projectId) {
socket.send(setSuperJson({ count }));
}
},
}
);

socket.on('close', () => unsubscribe());
Expand All @@ -116,7 +105,7 @@ export async function wsProjectNotifications(
Params: {
projectId: string;
};
}>,
}>
) {
const { params } = req;
const userId = req.session?.userId;
Expand All @@ -143,9 +132,9 @@ export async function wsProjectNotifications(
'created',
(notification) => {
if (notification.projectId === params.projectId) {
socket.send(superjson.stringify(notification));
socket.send(setSuperJson(notification));
}
},
}
);

socket.on('close', () => unsubscribe());
Expand All @@ -157,7 +146,7 @@ export async function wsOrganizationEvents(
Params: {
organizationId: string;
};
}>,
}>
) {
const { params } = req;
const userId = req.session?.userId;
Expand All @@ -184,7 +173,7 @@ export async function wsOrganizationEvents(
'subscription_updated',
(message) => {
socket.send(setSuperJson(message));
},
}
);

socket.on('close', () => unsubscribe());
Expand Down
42 changes: 18 additions & 24 deletions apps/api/src/controllers/manage.controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import crypto from 'node:crypto';
import { HttpError } from '@/utils/errors';
import { stripTrailingSlash } from '@openpanel/common';
import { hashPassword } from '@openpanel/common/server';
import {
Expand All @@ -10,6 +9,7 @@ import {
} from '@openpanel/db';
import type { FastifyReply, FastifyRequest } from 'fastify';
import { z } from 'zod';
import { HttpError } from '@/utils/errors';

// Validation schemas
const zCreateProject = z.object({
Expand Down Expand Up @@ -57,7 +57,7 @@ const zUpdateReference = z.object({
// Projects CRUD
export async function listProjects(
request: FastifyRequest,
reply: FastifyReply,
reply: FastifyReply
) {
const projects = await db.project.findMany({
where: {
Expand All @@ -74,7 +74,7 @@ export async function listProjects(

export async function getProject(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const project = await db.project.findFirst({
where: {
Expand All @@ -92,7 +92,7 @@ export async function getProject(

export async function createProject(
request: FastifyRequest<{ Body: z.infer<typeof zCreateProject> }>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zCreateProject.safeParse(request.body);

Expand Down Expand Up @@ -139,12 +139,9 @@ export async function createProject(
},
});

// Clear cache
await Promise.all([
getProjectByIdCached.clear(project.id),
project.clients.map((client) => {
getClientByIdCached.clear(client.id);
}),
...project.clients.map((client) => getClientByIdCached.clear(client.id)),
]);

reply.send({
Expand All @@ -165,7 +162,7 @@ export async function updateProject(
Params: { id: string };
Body: z.infer<typeof zUpdateProject>;
}>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zUpdateProject.safeParse(request.body);

Expand Down Expand Up @@ -223,20 +220,17 @@ export async function updateProject(
data: updateData,
});

// Clear cache
await Promise.all([
getProjectByIdCached.clear(project.id),
existing.clients.map((client) => {
getClientByIdCached.clear(client.id);
}),
...existing.clients.map((client) => getClientByIdCached.clear(client.id)),
]);

reply.send({ data: project });
}

export async function deleteProject(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const project = await db.project.findFirst({
where: {
Expand Down Expand Up @@ -266,7 +260,7 @@ export async function deleteProject(
// Clients CRUD
export async function listClients(
request: FastifyRequest<{ Querystring: { projectId?: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const where: any = {
organizationId: request.client!.organizationId,
Expand Down Expand Up @@ -300,7 +294,7 @@ export async function listClients(

export async function getClient(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const client = await db.client.findFirst({
where: {
Expand All @@ -318,7 +312,7 @@ export async function getClient(

export async function createClient(
request: FastifyRequest<{ Body: z.infer<typeof zCreateClient> }>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zCreateClient.safeParse(request.body);

Expand Down Expand Up @@ -374,7 +368,7 @@ export async function updateClient(
Params: { id: string };
Body: z.infer<typeof zUpdateClient>;
}>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zUpdateClient.safeParse(request.body);

Expand Down Expand Up @@ -417,7 +411,7 @@ export async function updateClient(

export async function deleteClient(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const client = await db.client.findFirst({
where: {
Expand All @@ -444,7 +438,7 @@ export async function deleteClient(
// References CRUD
export async function listReferences(
request: FastifyRequest<{ Querystring: { projectId?: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const where: any = {};

Expand Down Expand Up @@ -488,7 +482,7 @@ export async function listReferences(

export async function getReference(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const reference = await db.reference.findUnique({
where: {
Expand Down Expand Up @@ -516,7 +510,7 @@ export async function getReference(

export async function createReference(
request: FastifyRequest<{ Body: z.infer<typeof zCreateReference> }>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zCreateReference.safeParse(request.body);

Expand Down Expand Up @@ -559,7 +553,7 @@ export async function updateReference(
Params: { id: string };
Body: z.infer<typeof zUpdateReference>;
}>,
reply: FastifyReply,
reply: FastifyReply
) {
const parsed = zUpdateReference.safeParse(request.body);

Expand Down Expand Up @@ -616,7 +610,7 @@ export async function updateReference(

export async function deleteReference(
request: FastifyRequest<{ Params: { id: string } }>,
reply: FastifyReply,
reply: FastifyReply
) {
const reference = await db.reference.findUnique({
where: {
Expand Down
Loading
Loading