Skip to content

Commit d456a64

Browse files
committed
feat(webapp): pause environments when a billing limit is reached
Converge billable environments to paused via webhook and a reconciliation worker; block manual resume.
1 parent 732725d commit d456a64

21 files changed

Lines changed: 886 additions & 6 deletions

apps/webapp/app/entry.server.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { PassThrough } from "stream";
1010
import * as Worker from "~/services/worker.server";
1111
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
1212
import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server";
13+
import "~/v3/billingLimitWorker.server";
1314
import { bootstrap } from "./bootstrap";
1415
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
1516
import {

apps/webapp/app/env.server.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,6 +1502,37 @@ const EnvironmentSchema = z
15021502
ALERTS_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
15031503
ALERTS_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
15041504

1505+
BILLING_LIMIT_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
1506+
BILLING_LIMIT_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
1507+
BILLING_LIMIT_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
1508+
BILLING_LIMIT_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
1509+
BILLING_LIMIT_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
1510+
BILLING_LIMIT_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(20),
1511+
BILLING_LIMIT_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
1512+
BILLING_LIMIT_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
1513+
BILLING_LIMIT_RECONCILE_INTERVAL_MS: z.coerce.number().int().default(90_000),
1514+
BILLING_LIMIT_WORKER_REDIS_HOST: z
1515+
.string()
1516+
.optional()
1517+
.transform((v) => v ?? process.env.REDIS_HOST),
1518+
BILLING_LIMIT_WORKER_REDIS_PORT: z.coerce
1519+
.number()
1520+
.optional()
1521+
.transform(
1522+
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)
1523+
),
1524+
BILLING_LIMIT_WORKER_REDIS_USERNAME: z
1525+
.string()
1526+
.optional()
1527+
.transform((v) => v ?? process.env.REDIS_USERNAME),
1528+
BILLING_LIMIT_WORKER_REDIS_PASSWORD: z
1529+
.string()
1530+
.optional()
1531+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
1532+
BILLING_LIMIT_WORKER_REDIS_TLS_DISABLED: z
1533+
.string()
1534+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
1535+
15051536
SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
15061537
SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
15071538
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),

apps/webapp/app/models/organization.server.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ import { featuresForUrl } from "~/features.server";
1515
import { createApiKeyForEnv, createPkApiKeyForEnv, envSlug } from "./api-key.server";
1616
import { getDefaultEnvironmentConcurrencyLimit } from "~/services/platform.v3.server";
1717
import { enqueueAttioWorkspaceSync } from "~/services/attio.server";
18+
import {
19+
applyBillingLimitPauseAfterEnvCreate,
20+
getInitialEnvPauseStateForBillingLimit,
21+
} from "~/v3/services/billingLimit/getInitialEnvPauseStateForBillingLimit.server";
1822
export type { Organization };
1923

2024
const nanoid = customAlphabet("1234567890abcdef", 4);
@@ -139,15 +143,18 @@ export async function createEnvironment({
139143
const shortcode = createShortcode().join("-");
140144

141145
const limit = await getDefaultEnvironmentConcurrencyLimit(organization.id, type);
146+
const billingPause = await getInitialEnvPauseStateForBillingLimit(organization.id, type);
142147

143-
return await prismaClient.runtimeEnvironment.create({
148+
const environment = await prismaClient.runtimeEnvironment.create({
144149
data: {
145150
slug,
146151
apiKey,
147152
pkApiKey,
148153
shortcode,
149154
autoEnableInternalSources: type !== "DEVELOPMENT",
150155
maximumConcurrencyLimit: limit,
156+
paused: billingPause.paused,
157+
pauseSource: billingPause.pauseSource,
151158
organization: {
152159
connect: {
153160
id: organization.id,
@@ -162,7 +169,15 @@ export async function createEnvironment({
162169
type,
163170
isBranchableEnvironment,
164171
},
172+
include: {
173+
organization: true,
174+
project: true,
175+
},
165176
});
177+
178+
await applyBillingLimitPauseAfterEnvCreate(environment);
179+
180+
return environment;
166181
}
167182

168183
function createShortcode() {

apps/webapp/app/presenters/OrganizationsPresenter.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ export class OrganizationsPresenter {
7777
type: true,
7878
slug: true,
7979
paused: true,
80+
pauseSource: true,
8081
isBranchableEnvironment: true,
8182
branchName: true,
8283
parentEnvironmentId: true,
@@ -206,6 +207,7 @@ export class OrganizationsPresenter {
206207
| "type"
207208
| "branchName"
208209
| "paused"
210+
| "pauseSource"
209211
| "parentEnvironmentId"
210212
| "isBranchableEnvironment"
211213
| "archivedAt"

apps/webapp/app/presenters/v3/EnvironmentVariablesPresenter.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { $replica, PrismaClient, PrismaReplicaClient, prisma } from "~/db.server";
2-
import { Project } from "~/models/project.server";
3-
import { User } from "~/models/user.server";
2+
import type { Project } from "~/models/project.server";
3+
import type { User } from "~/models/user.server";
44
import { EnvironmentVariablesRepository } from "~/v3/environmentVariables/environmentVariablesRepository.server";
55
import type { EnvironmentVariableUpdater } from "~/v3/environmentVariables/repository";
66
import {

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { DialogClose } from "@radix-ui/react-dialog";
1111
import { Form, useNavigation, useSearchParams, type MetaFunction } from "@remix-run/react";
1212
import { type ActionFunctionArgs, type LoaderFunctionArgs } from "@remix-run/server-runtime";
13+
import { EnvironmentPauseSource } from "@trigger.dev/database";
1314
import type { RuntimeEnvironmentType } from "@trigger.dev/database";
1415
import type { QueueItem } from "@trigger.dev/core/v3/schemas";
1516
import { useEffect, useState } from "react";
@@ -184,11 +185,21 @@ export const action = async ({ request, params }: ActionFunctionArgs) => {
184185
switch (action) {
185186
case "environment-pause":
186187
const pauseService = new PauseEnvironmentService();
187-
await pauseService.call(environment, "paused");
188+
{
189+
const result = await pauseService.call(environment, "paused");
190+
if (!result.success) {
191+
return redirectWithErrorMessage(redirectPath, request, result.error);
192+
}
193+
}
188194
return redirectWithSuccessMessage(redirectPath, request, "Environment paused");
189195
case "environment-resume":
190196
const resumeService = new PauseEnvironmentService();
191-
await resumeService.call(environment, "resumed");
197+
{
198+
const result = await resumeService.call(environment, "resumed");
199+
if (!result.success) {
200+
return redirectWithErrorMessage(redirectPath, request, result.error);
201+
}
202+
}
192203
return redirectWithSuccessMessage(redirectPath, request, "Environment resumed");
193204
case "queue-pause":
194205
case "queue-resume": {
@@ -346,7 +357,9 @@ export default function Page() {
346357
animate
347358
accessory={
348359
<div className="flex items-start gap-1">
349-
{environment.runsEnabled ? <EnvironmentPauseResumeButton env={env} /> : null}
360+
{environment.runsEnabled && env.pauseSource !== EnvironmentPauseSource.BILLING_LIMIT ? (
361+
<EnvironmentPauseResumeButton env={env} />
362+
) : null}
350363
<LinkButton
351364
variant="secondary/small"
352365
LeadingIcon={RunsIcon}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { BillingLimitHitWebhookBodySchema } from "~/services/billingLimit.schemas";
5+
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
6+
import { bustBillingLimitCaches } from "~/services/platform.v3.server";
7+
import {
8+
enqueueBillingLimitCancelInProgressRuns,
9+
enqueueBillingLimitConverge,
10+
} from "~/v3/billingLimitWorker.server";
11+
import { BillingLimitConvergeEnvironmentsService } from "~/v3/services/billingLimit/billingLimitConvergeEnvironmentsService.server";
12+
import { processBillingLimitHit } from "~/v3/services/billingLimit/billingLimitHit.server";
13+
14+
const ParamsSchema = z.object({
15+
organizationId: z.string(),
16+
});
17+
18+
/** Billing platform webhook: org entered billing limit grace. Idempotent — returns 202. */
19+
export async function action({ request, params }: ActionFunctionArgs) {
20+
await requireAdminApiRequest(request);
21+
22+
if (request.method.toLowerCase() !== "post") {
23+
return json({ error: "Method not allowed" }, { status: 405 });
24+
}
25+
26+
const { organizationId } = ParamsSchema.parse(params);
27+
const body = BillingLimitHitWebhookBodySchema.parse(await request.json());
28+
29+
const organization = await prisma.organization.findFirst({
30+
where: { id: organizationId },
31+
select: { id: true },
32+
});
33+
34+
if (!organization) {
35+
return json({ error: "Organization not found" }, { status: 404 });
36+
}
37+
38+
await processBillingLimitHit(
39+
{
40+
organizationId,
41+
hitAt: body.hitAt,
42+
cancelInProgressRuns: body.cancelInProgressRuns,
43+
},
44+
{
45+
bustCaches: bustBillingLimitCaches,
46+
seedReconcileQueue: BillingLimitConvergeEnvironmentsService.seedReconcileQueue,
47+
enqueueConverge: enqueueBillingLimitConverge,
48+
enqueueCancelInProgressRuns: enqueueBillingLimitCancelInProgressRuns,
49+
}
50+
);
51+
52+
return json({ success: true, accepted: true }, { status: 202 });
53+
}

apps/webapp/app/services/upsertBranch.server.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ import { getCurrentPlan, getLimit } from "./platform.v3.server";
1414
import { type z } from "zod";
1515
import invariant from "tiny-invariant";
1616
import { type CreateBranchOptions } from "~/utils/branches";
17+
import {
18+
applyBillingLimitPauseAfterEnvCreate,
19+
getInitialEnvPauseStateForBillingLimit,
20+
} from "~/v3/services/billingLimit/getInitialEnvPauseStateForBillingLimit.server";
1721

1822
type CreateBranchOptions = z.infer<typeof CreateBranchOptions>;
1923

@@ -138,6 +142,10 @@ export class UpsertBranchService {
138142
const apiKey = createApiKeyForEnv(parentEnvironment.type);
139143
const pkApiKey = createPkApiKeyForEnv(parentEnvironment.type);
140144
const shortcode = branchSlug;
145+
const billingPause = await getInitialEnvPauseStateForBillingLimit(
146+
parentEnvironment.organization.id,
147+
parentEnvironment.type
148+
);
141149

142150
const now = new Date();
143151
const branch = await this.#prismaClient.runtimeEnvironment.upsert({
@@ -153,6 +161,8 @@ export class UpsertBranchService {
153161
pkApiKey,
154162
shortcode,
155163
maximumConcurrencyLimit: parentEnvironment.maximumConcurrencyLimit,
164+
paused: billingPause.paused,
165+
pauseSource: billingPause.pauseSource,
156166
organization: {
157167
connect: {
158168
id: parentEnvironment.organization.id,
@@ -174,10 +184,18 @@ export class UpsertBranchService {
174184
update: {
175185
git: git ?? undefined,
176186
},
187+
include: {
188+
organization: true,
189+
project: true,
190+
},
177191
});
178192

179193
const alreadyExisted = branch.createdAt < now;
180194

195+
if (!alreadyExisted) {
196+
await applyBillingLimitPauseAfterEnvCreate(branch);
197+
}
198+
181199
return {
182200
success: true as const,
183201
alreadyExisted: alreadyExisted,
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { Logger } from "@trigger.dev/core/logger";
2+
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
3+
import { z } from "zod";
4+
import { env } from "~/env.server";
5+
import { logger } from "~/services/logger.server";
6+
import { singleton } from "~/utils/singleton";
7+
import { BillingLimitConvergeEnvironmentsService } from "./services/billingLimit/billingLimitConvergeEnvironmentsService.server";
8+
import type { BillingLimitConvergeTargetState } from "./services/billingLimit/billingLimitConstants";
9+
import { buildBillingLimitInProgressCancelJobId } from "./services/billingLimit/billingLimitConstants";
10+
import { runBillingLimitCancelInProgressRuns } from "./services/billingLimit/billingLimitCancelInProgressRuns.server";
11+
12+
function initializeWorker() {
13+
const redisOptions = {
14+
keyPrefix: "billing-limit:worker:",
15+
host: env.BILLING_LIMIT_WORKER_REDIS_HOST,
16+
port: env.BILLING_LIMIT_WORKER_REDIS_PORT,
17+
username: env.BILLING_LIMIT_WORKER_REDIS_USERNAME,
18+
password: env.BILLING_LIMIT_WORKER_REDIS_PASSWORD,
19+
enableAutoPipelining: true,
20+
...(env.BILLING_LIMIT_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
21+
};
22+
23+
logger.debug(`👨‍🏭 Initializing billing limit worker at host ${env.BILLING_LIMIT_WORKER_REDIS_HOST}`);
24+
25+
const worker = new RedisWorker({
26+
name: "billing-limit-worker",
27+
redisOptions,
28+
catalog: {
29+
"billingLimit.convergeEnvironments": {
30+
schema: z.object({
31+
organizationId: z.string(),
32+
targetState: z.enum(["grace", "rejected", "ok"]),
33+
}),
34+
visibilityTimeoutMs: 60_000 * 10,
35+
retry: {
36+
maxAttempts: 5,
37+
},
38+
},
39+
"billingLimit.reconcileTick": {
40+
schema: z.object({}),
41+
visibilityTimeoutMs: 60_000 * 5,
42+
retry: {
43+
maxAttempts: 3,
44+
},
45+
},
46+
"billingLimit.cancelInProgressRuns": {
47+
schema: z.object({
48+
organizationId: z.string(),
49+
hitAt: z.string(),
50+
}),
51+
visibilityTimeoutMs: 60_000 * 10,
52+
retry: {
53+
maxAttempts: 5,
54+
},
55+
},
56+
},
57+
concurrency: {
58+
workers: env.BILLING_LIMIT_WORKER_CONCURRENCY_WORKERS,
59+
tasksPerWorker: env.BILLING_LIMIT_WORKER_CONCURRENCY_TASKS_PER_WORKER,
60+
limit: env.BILLING_LIMIT_WORKER_CONCURRENCY_LIMIT,
61+
},
62+
pollIntervalMs: env.BILLING_LIMIT_WORKER_POLL_INTERVAL,
63+
immediatePollIntervalMs: env.BILLING_LIMIT_WORKER_IMMEDIATE_POLL_INTERVAL,
64+
shutdownTimeoutMs: env.BILLING_LIMIT_WORKER_SHUTDOWN_TIMEOUT_MS,
65+
logger: new Logger("BillingLimitWorker", env.BILLING_LIMIT_WORKER_LOG_LEVEL),
66+
jobs: {
67+
"billingLimit.convergeEnvironments": async ({ payload }) => {
68+
await BillingLimitConvergeEnvironmentsService.runConverge(payload);
69+
},
70+
"billingLimit.reconcileTick": async () => {
71+
await BillingLimitConvergeEnvironmentsService.runReconcileTick();
72+
await scheduleBillingLimitReconcileTick(worker);
73+
},
74+
"billingLimit.cancelInProgressRuns": async ({ payload }) => {
75+
await runBillingLimitCancelInProgressRuns(payload.organizationId, payload.hitAt);
76+
},
77+
},
78+
});
79+
80+
if (env.BILLING_LIMIT_WORKER_ENABLED === "true") {
81+
logger.debug(
82+
`👨‍🏭 Starting billing limit worker at host ${env.BILLING_LIMIT_WORKER_REDIS_HOST}, reconcileIntervalMs = ${env.BILLING_LIMIT_RECONCILE_INTERVAL_MS}`
83+
);
84+
worker.start();
85+
void scheduleBillingLimitReconcileTick(worker);
86+
}
87+
88+
return worker;
89+
}
90+
91+
async function scheduleBillingLimitReconcileTick(worker: ReturnType<typeof initializeWorker>) {
92+
await worker.enqueue({
93+
id: "billingLimit.reconcileTick",
94+
job: "billingLimit.reconcileTick",
95+
payload: {},
96+
availableAt: new Date(Date.now() + env.BILLING_LIMIT_RECONCILE_INTERVAL_MS),
97+
});
98+
}
99+
100+
export const billingLimitWorker = singleton("billingLimitWorker", initializeWorker);
101+
102+
export async function enqueueBillingLimitConverge(
103+
organizationId: string,
104+
targetState: BillingLimitConvergeTargetState
105+
) {
106+
return billingLimitWorker.enqueue({
107+
id: `billingLimit.converge:${organizationId}:${targetState}`,
108+
job: "billingLimit.convergeEnvironments",
109+
payload: { organizationId, targetState },
110+
});
111+
}
112+
113+
export async function enqueueBillingLimitCancelInProgressRuns(
114+
organizationId: string,
115+
hitAt: string
116+
) {
117+
return billingLimitWorker.enqueue({
118+
id: buildBillingLimitInProgressCancelJobId(organizationId, hitAt),
119+
job: "billingLimit.cancelInProgressRuns",
120+
payload: { organizationId, hitAt },
121+
});
122+
}

0 commit comments

Comments
 (0)