diff --git a/.server-changes/per-org-stream-basins.md b/.server-changes/per-org-stream-basins.md new file mode 100644 index 00000000000..4e45129849c --- /dev/null +++ b/.server-changes/per-org-stream-basins.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Per-org S2 stream basins with retention tied to the org's billing plan, gated by `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED`. Stops basin retention from deleting streams out from under live chat sessions and unlocks per-org cost attribution. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index ff27168445a..13e9e5dacbd 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -3,6 +3,15 @@ import { MachinePresetName } from "@trigger.dev/core/v3"; import { BoolEnv } from "./utils/boolEnv"; import { isValidDatabaseUrl } from "./utils/db"; import { isValidRegex } from "./utils/regex"; +import { isValidDuration } from "./services/realtime/duration.server"; + +// `z.string()` constrained to a `parseDuration`-parseable string (e.g. +// `7d`, `1h`). Validated at boot so a typo'd duration fails fast. +function durationString() { + return z + .string() + .refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y"); +} // Parses a CSV of machine preset names (e.g. "small-1x,small-2x") into a // non-empty array of MachinePresetName. Used by COMPUTE_TEMPLATE_MACHINE_PRESETS @@ -1506,6 +1515,16 @@ const EnvironmentSchema = z REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100), REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10), REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60), + // When "true", provision a dedicated S2 basin per org and stamp + // `streamBasinName` on new rows. Off keeps everything on the single + // basin defined by `REALTIME_STREAMS_S2_BASIN`. + REALTIME_STREAMS_PER_ORG_BASINS_ENABLED: z.enum(["true", "false"]).default("false"), + // Per-org basin name = `{prefix}-{env}-org-{orgId}`. + REALTIME_STREAMS_BASIN_NAME_PREFIX: z.string().default("triggerdotdev"), + REALTIME_STREAMS_BASIN_NAME_ENV: z.string().default("dev"), + REALTIME_STREAMS_BASIN_DEFAULT_RETENTION: durationString().default("30d"), + REALTIME_STREAMS_BASIN_STORAGE_CLASS: z.enum(["express", "standard"]).default("express"), + REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE: durationString().default("1h"), REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"), WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000), diff --git a/apps/webapp/app/routes/admin.api.v1.orgs.$organizationId.stream-basin.ts b/apps/webapp/app/routes/admin.api.v1.orgs.$organizationId.stream-basin.ts new file mode 100644 index 00000000000..67fd457c6b3 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.orgs.$organizationId.stream-basin.ts @@ -0,0 +1,47 @@ +import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { requireAdminApiRequest } from "~/services/personalAccessToken.server"; +import { isValidDuration } from "~/services/realtime/duration.server"; +import { + deprovisionBasinForOrg, + ensureBasinForOrg, +} from "~/services/realtime/streamBasinProvisioner.server"; + +const ParamsSchema = z.object({ organizationId: z.string() }); + +const BodySchema = z.discriminatedUnion("action", [ + z.object({ + action: z.literal("ensure"), + retention: z + .string() + .refine(isValidDuration, "retention must be a duration like 7d, 30d, 365d, 1h, 1y"), + }), + z.object({ action: z.literal("deprovision") }), +]); + +export async function action({ request, params }: ActionFunctionArgs) { + await requireAdminApiRequest(request); + + const { organizationId } = ParamsSchema.parse(params); + + let parsed: z.infer; + try { + const text = await request.text(); + const raw = text.length > 0 ? JSON.parse(text) : {}; + const result = BodySchema.safeParse(raw); + if (!result.success) { + return json({ ok: false, error: result.error.flatten() }, { status: 400 }); + } + parsed = result.data; + } catch { + return json({ ok: false, error: "Invalid JSON body" }, { status: 400 }); + } + + if (parsed.action === "ensure") { + const result = await ensureBasinForOrg(organizationId, parsed.retention); + return json({ ok: true, ...result }); + } + + const result = await deprovisionBasinForOrg(organizationId); + return json({ ok: true, ...result }); +} diff --git a/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts index 8e41e9fe4c8..a0f24f9abd8 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts @@ -40,6 +40,7 @@ const { action, loader } = createActionApiRoute( id: true, friendlyId: true, realtimeStreamsVersion: true, + streamBasinName: true, }, }); @@ -98,7 +99,8 @@ const { action, loader } = createActionApiRoute( try { const realtimeStream = getRealtimeStreamInstance( authentication.environment, - run.realtimeStreamsVersion + run.realtimeStreamsVersion, + { run } ); const records = await realtimeStream.readRecords( diff --git a/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts index 18034caab47..4fbdb454d92 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts @@ -123,12 +123,14 @@ const { action, loader } = createActionApiRoute( // and remove the pending registration. if (!result.isCached) { try { - // Session streams are always v2 (S2) — the writer in - // `appendPartToSessionStream` and the SSE subscribe both - // hardcode "v2", so the race-check reader has to match. - // Don't fall through to the run's own `realtimeStreamsVersion`, - // which only describes the run's run-scoped streams. - const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2"); + // Match the writer's basin resolution exactly: session if the + // row exists, otherwise the org so we look at the same basin a + // fresh row would be stamped with. Mirrors the PUT/GET sister + // routes in `realtime.v1.sessions.$session.$io.ts`. + const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", { + session: maybeSession, + organization: maybeSession ? null : authentication.environment.organization, + }); if (realtimeStream instanceof S2RealtimeStreams) { const records = await realtimeStream.readSessionStreamRecords( diff --git a/apps/webapp/app/routes/api.v1.sessions.ts b/apps/webapp/app/routes/api.v1.sessions.ts index 38270fdfc77..eafb0f7a20c 100644 --- a/apps/webapp/app/routes/api.v1.sessions.ts +++ b/apps/webapp/app/routes/api.v1.sessions.ts @@ -167,6 +167,7 @@ const { action } = createActionApiRoute( runtimeEnvironmentId: authentication.environment.id, environmentType: authentication.environment.type, organizationId: authentication.environment.organizationId, + streamBasinName: authentication.environment.organization.streamBasinName, }, update: { triggerConfig: triggerConfigJson }, }); @@ -186,6 +187,7 @@ const { action } = createActionApiRoute( runtimeEnvironmentId: authentication.environment.id, environmentType: authentication.environment.type, organizationId: authentication.environment.organizationId, + streamBasinName: authentication.environment.organization.streamBasinName, }, }); } diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts index 4251baae91e..45fbde5924b 100644 --- a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts @@ -81,7 +81,9 @@ const { action, loader } = createActionApiRoute( ); } - const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2"); + const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", { + session, + }); if (!(realtimeStream instanceof S2RealtimeStreams)) { return json( diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts index c04992f7f14..37ec58c51ae 100644 --- a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts @@ -59,7 +59,13 @@ const { action } = createActionApiRoute( }); } - const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2"); + // No-row form: resolve via the org so the stream initialised here + // matches what later appends/subscribes will land on once the row + // is created. + const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", { + session: maybeSession, + organization: maybeSession ? null : authentication.environment.organization, + }); if (!(realtimeStream instanceof S2RealtimeStreams)) { return new Response("Session channels require the S2 realtime backend", { @@ -122,7 +128,11 @@ const loader = createLoaderApiRoute( }, }, async ({ params, request, authentication, resource }) => { - const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2"); + // Same no-row fallback as PUT above. + const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", { + session: resource.row, + organization: resource.row ? null : authentication.environment.organization, + }); if (!(realtimeStream instanceof S2RealtimeStreams)) { return new Response("Session channels require the S2 realtime backend", { diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts index aabd83bc9bb..477ce781a20 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts @@ -29,6 +29,7 @@ export async function action({ request, params }: ActionFunctionArgs) { select: { id: true, friendlyId: true, + streamBasinName: true, runtimeEnvironment: { include: { project: true, @@ -64,7 +65,9 @@ export async function action({ request, params }: ActionFunctionArgs) { } // The runtimeEnvironment from the run is already in the correct shape for AuthenticatedEnvironment - const realtimeStream = getRealtimeStreamInstance(run.runtimeEnvironment, streamVersion); + const realtimeStream = getRealtimeStreamInstance(run.runtimeEnvironment, streamVersion, { + run, + }); return realtimeStream.ingestData( request.body, @@ -127,7 +130,8 @@ export const loader = createLoaderApiRoute( const realtimeStream = getRealtimeStreamInstance( authentication.environment, - run.realtimeStreamsVersion + run.realtimeStreamsVersion, + { run } ); return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, getRequestAbortSignal(), { diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts index deefbc20773..ec5800c1f9f 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts @@ -72,6 +72,7 @@ const { action } = createActionApiRoute( realtimeStreamsVersion: true, completedAt: true, id: true, + streamBasinName: true, }, }); @@ -102,7 +103,8 @@ const { action } = createActionApiRoute( const realtimeStream = getRealtimeStreamInstance( authentication.environment, - targetRun.realtimeStreamsVersion + targetRun.realtimeStreamsVersion, + { run: targetRun } ); const partId = request.headers.get("X-Part-Id") ?? nanoid(7); diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts index 2a8d07053d9..9ca8e36f4ef 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts @@ -26,14 +26,17 @@ const { action } = createActionApiRoute( select: { id: true, friendlyId: true, + streamBasinName: true, parentTaskRun: { select: { friendlyId: true, + streamBasinName: true, }, }, rootTaskRun: { select: { friendlyId: true, + streamBasinName: true, }, }, }, @@ -43,17 +46,20 @@ const { action } = createActionApiRoute( return new Response("Run not found", { status: 404 }); } - const targetId = + const targetRun = params.target === "self" - ? run.friendlyId + ? run : params.target === "parent" - ? run.parentTaskRun?.friendlyId - : run.rootTaskRun?.friendlyId; + ? run.parentTaskRun + : run.rootTaskRun; - if (!targetId) { + if (!targetRun?.friendlyId) { return new Response("Target not found", { status: 404 }); } + const targetId = targetRun.friendlyId; + const basinContext = { run: { streamBasinName: targetRun.streamBasinName ?? null } }; + if (request.method === "PUT") { // This is the "create" endpoint const updatedRun = await prisma.taskRun.update({ @@ -80,7 +86,8 @@ const { action } = createActionApiRoute( const realtimeStream = getRealtimeStreamInstance( authentication.environment, - updatedRun.realtimeStreamsVersion + updatedRun.realtimeStreamsVersion, + basinContext ); const { responseHeaders } = await realtimeStream.initializeStream(targetId, params.streamId); @@ -112,7 +119,11 @@ const { action } = createActionApiRoute( resumeFromChunkNumber = parsed; } - const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion); + const realtimeStream = getRealtimeStreamInstance( + authentication.environment, + streamVersion, + basinContext + ); return realtimeStream.ingestData( request.body, @@ -139,14 +150,17 @@ const loader = createLoaderApiRoute( select: { id: true, friendlyId: true, + streamBasinName: true, parentTaskRun: { select: { friendlyId: true, + streamBasinName: true, }, }, rootTaskRun: { select: { friendlyId: true, + streamBasinName: true, }, }, }, @@ -158,17 +172,19 @@ const loader = createLoaderApiRoute( return new Response("Run not found", { status: 404 }); } - const targetId = + const targetRun = params.target === "self" - ? run.friendlyId + ? run : params.target === "parent" - ? run.parentTaskRun?.friendlyId - : run.rootTaskRun?.friendlyId; + ? run.parentTaskRun + : run.rootTaskRun; - if (!targetId) { + if (!targetRun?.friendlyId) { return new Response("Target not found", { status: 404 }); } + const targetId = targetRun.friendlyId; + // Handle HEAD request to get last chunk index if (request.method !== "HEAD") { return new Response("Only HEAD requests are allowed for this endpoint", { status: 405 }); @@ -178,7 +194,11 @@ const loader = createLoaderApiRoute( const clientId = request.headers.get("X-Client-Id") || "default"; const streamVersion = request.headers.get("X-Stream-Version") || "v1"; - const realtimeStream = getRealtimeStreamInstance(authentication.environment, streamVersion); + const realtimeStream = getRealtimeStreamInstance( + authentication.environment, + streamVersion, + { run: { streamBasinName: targetRun.streamBasinName ?? null } } + ); const lastChunkIndex = await realtimeStream.getLastChunkIndex( targetId, diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts index b16b1ca7922..089f2dc55e3 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts @@ -46,6 +46,7 @@ const { action } = createActionApiRoute( friendlyId: true, completedAt: true, realtimeStreamsVersion: true, + streamBasinName: true, }, }); @@ -68,7 +69,8 @@ const { action } = createActionApiRoute( const realtimeStream = getRealtimeStreamInstance( authentication.environment, - run.realtimeStreamsVersion + run.realtimeStreamsVersion, + { run } ); // Build the input stream record (raw user data, no wrapper) @@ -155,7 +157,8 @@ const loader = createLoaderApiRoute( const realtimeStream = getRealtimeStreamInstance( authentication.environment, - run.realtimeStreamsVersion + run.realtimeStreamsVersion, + { run } ); // Read from the internal S2 stream name (prefixed to avoid user stream collisions) diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx index 1295adb7842..b6a72d3aa09 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx @@ -87,7 +87,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { const realtimeStream = getRealtimeStreamInstance( run.runtimeEnvironment, - run.realtimeStreamsVersion + run.realtimeStreamsVersion, + { run } ); return realtimeStream.streamResponse(request, run.friendlyId, streamKey, getRequestAbortSignal(), { diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 610484e67ca..445e0eb155a 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -395,6 +395,7 @@ export class RunEngineTriggerTaskService { bulkActionId: body.options?.bulkActionId, planType, realtimeStreamsVersion: options.realtimeStreamsVersion, + streamBasinName: environment.organization.streamBasinName, debounce: body.options?.debounce, annotations, // When debouncing with triggerAndWait, create a span for the debounced trigger diff --git a/apps/webapp/app/services/platform.v3.server.ts b/apps/webapp/app/services/platform.v3.server.ts index 51075c1b87d..6df93c9c0e9 100644 --- a/apps/webapp/app/services/platform.v3.server.ts +++ b/apps/webapp/app/services/platform.v3.server.ts @@ -44,6 +44,17 @@ function initializeClient() { } const client = singleton("billingClient", initializeClient); + +/** + * `true` when the billing client was instantiated — i.e. we're running + * in a cloud-style install with `BILLING_API_URL` + `BILLING_API_KEY` + * configured. OSS / self-hosted installs return `false` here, which + * lets callers distinguish "no billing wired up, fall back to + * defaults" from "billing wired up but the call failed, retry." + */ +export function isBillingConfigured(): boolean { + return client !== undefined; +} // Failures from @trigger.dev/platform billing client calls are tracked via // this metric (with low-cardinality {function, kind} labels) rather than // logged. Every task invocation hits these paths, so per-call logs were too diff --git a/apps/webapp/app/services/realtime/duration.server.ts b/apps/webapp/app/services/realtime/duration.server.ts new file mode 100644 index 00000000000..c6aab9eb9df --- /dev/null +++ b/apps/webapp/app/services/realtime/duration.server.ts @@ -0,0 +1,49 @@ +/** + * Duration string parsing for stream-basin retention / delete-on-empty + * configuration. Used by `streamBasinProvisioner` (to convert to S2's + * integer-seconds wire format) and by `env.server.ts` (to validate + * duration-shaped env vars at boot rather than at first use). + * + * Accepts the short forms (`7d`, `30d`, `365d`, `1h`, `90m`, `45s`, + * `2w`, `1y`) and the human forms (`7days`, `1week`, `1year`). + */ + +const PATTERN = + /^(\d+)\s*(s|sec|secs|seconds?|m|min|mins|minutes?|h|hour|hours?|d|day|days?|w|week|weeks?|y|year|years?)$/; + +export function isValidDuration(input: string): boolean { + return PATTERN.test(input.trim().toLowerCase()); +} + +/** + * Parse a duration string into seconds. Throws on garbage so a + * misconfigured env var fails loudly. Use {@link isValidDuration} + * for non-throwing validation (e.g. inside a Zod `.refine()`). + */ +export function parseDuration(input: string): number { + const trimmed = input.trim().toLowerCase(); + const match = trimmed.match(PATTERN); + if (!match) { + throw new Error(`Invalid duration string: ${input}`); + } + const value = parseInt(match[1]!, 10); + const unit = match[2]!; + const multiplier = + /^s/.test(unit) + ? 1 + : /^m(?:in|ins|inute|inutes)?$/.test(unit) + ? 60 + : /^h/.test(unit) + ? 3600 + : /^d/.test(unit) + ? 86400 + : /^w/.test(unit) + ? 604800 + : /^y/.test(unit) + ? 31_536_000 + : NaN; + if (!Number.isFinite(multiplier)) { + throw new Error(`Invalid duration unit: ${unit}`); + } + return value * multiplier; +} diff --git a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts index 46c7f3854a1..0295d5a58b6 100644 --- a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts @@ -464,7 +464,10 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { return this.s2IssueAccessToken(id); } - const result = await this.cache.accessToken.swr(this.streamPrefix, async () => { + // Cache key includes basin so per-org basins never collide on + // cached tokens. `${basin}:${prefix}` is unique per (org-basin, env). + const cacheKey = `${this.basin}:${this.streamPrefix}`; + const result = await this.cache.accessToken.swr(cacheKey, async () => { return this.s2IssueAccessToken(id); }); diff --git a/apps/webapp/app/services/realtime/streamBasinProvisioner.server.ts b/apps/webapp/app/services/realtime/streamBasinProvisioner.server.ts new file mode 100644 index 00000000000..e29aeb168fb --- /dev/null +++ b/apps/webapp/app/services/realtime/streamBasinProvisioner.server.ts @@ -0,0 +1,248 @@ +/** + * Per-org S2 basin provisioning. Gated by + * `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED`: when off, all orgs share + * `REALTIME_STREAMS_S2_BASIN` and this module no-ops. + * + * Pure retention-string in / S2-call out. Plan vocabulary lives in the + * cloud billing app, which calls into the admin sync route to drive + * provisioning + reconfiguration. + */ +import type { PrismaClientOrTransaction } from "~/db.server"; +import { prisma } from "~/db.server"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { parseDuration } from "./duration.server"; + +export function isPerOrgBasinsEnabled(): boolean { + return env.REALTIME_STREAMS_PER_ORG_BASINS_ENABLED === "true"; +} + +export function defaultRetention(): string { + return env.REALTIME_STREAMS_BASIN_DEFAULT_RETENTION; +} + +// Org id is a cuid — fixed-length and stable, so the basin name is +// collision-free without truncation. Slugs are user-editable and would +// drift. +export function basinNameForOrg(org: { id: string }): string { + const prefix = env.REALTIME_STREAMS_BASIN_NAME_PREFIX; + const envName = env.REALTIME_STREAMS_BASIN_NAME_ENV; + return `${prefix}-${envName}-org-${org.id}`; +} + +type ProvisionInput = { + id: string; + retention?: string; + streamBasinName: string | null | undefined; +}; + +type ProvisionResult = + | { kind: "skipped"; reason: "feature-disabled" | "already-provisioned"; basin: string | null } + | { kind: "provisioned"; basin: string; retention: string }; + +// Idempotent. Treats S2 409 as success (race with another caller, or +// previous run that crashed after S2 ack but before the column write). +export async function provisionBasinForOrg( + org: ProvisionInput, + prismaClient: PrismaClientOrTransaction = prisma +): Promise { + if (!isPerOrgBasinsEnabled()) { + return { kind: "skipped", reason: "feature-disabled", basin: null }; + } + + if (org.streamBasinName) { + return { kind: "skipped", reason: "already-provisioned", basin: org.streamBasinName }; + } + + const accessToken = env.REALTIME_STREAMS_S2_ACCESS_TOKEN; + if (!accessToken) { + throw new Error( + "REALTIME_STREAMS_S2_ACCESS_TOKEN must be set when REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=true" + ); + } + + const basin = basinNameForOrg(org); + const retention = org.retention ?? defaultRetention(); + + await s2CreateBasin(basin, { + accessToken, + retentionPolicy: retention, + storageClass: env.REALTIME_STREAMS_BASIN_STORAGE_CLASS, + deleteOnEmptyMinAge: env.REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE, + }); + + await prismaClient.organization.update({ + where: { id: org.id }, + data: { streamBasinName: basin }, + }); + + logger.info("[streamBasinProvisioner] provisioned basin for org", { + orgId: org.id, + basin, + retention, + }); + + return { kind: "provisioned", basin, retention }; +} + +export async function reconfigureBasinForOrg( + orgId: string, + retention: string +): Promise { + if (!isPerOrgBasinsEnabled()) return; + + const accessToken = env.REALTIME_STREAMS_S2_ACCESS_TOKEN; + if (!accessToken) { + throw new Error( + "REALTIME_STREAMS_S2_ACCESS_TOKEN must be set when REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=true" + ); + } + + const org = await prisma.organization.findFirst({ + where: { id: orgId }, + select: { id: true, streamBasinName: true }, + }); + if (!org?.streamBasinName) return; + + await s2ReconfigureBasin(org.streamBasinName, { accessToken, retentionPolicy: retention }); + + logger.info("[streamBasinProvisioner] reconfigured basin retention", { + orgId, + basin: org.streamBasinName, + retention, + }); +} + +type EnsureResult = + | { kind: "skipped"; reason: "feature-disabled" | "org-not-found" } + | { kind: "provisioned"; basin: string; retention: string } + | { kind: "reconfigured"; basin: string; retention: string }; + +// Idempotent: provisions if the org has no basin, PATCHes retention if +// it does. The single entrypoint the cloud billing app drives — both +// for the live plan-change path and the bulk backfill. +export async function ensureBasinForOrg( + orgId: string, + retention: string +): Promise { + if (!isPerOrgBasinsEnabled()) { + return { kind: "skipped", reason: "feature-disabled" }; + } + + const org = await prisma.organization.findFirst({ + where: { id: orgId }, + select: { id: true, streamBasinName: true }, + }); + if (!org) return { kind: "skipped", reason: "org-not-found" }; + + if (!org.streamBasinName) { + const result = await provisionBasinForOrg( + { id: org.id, streamBasinName: null, retention } + ); + if (result.kind === "provisioned") { + return { kind: "provisioned", basin: result.basin, retention: result.retention }; + } + return { kind: "skipped", reason: "feature-disabled" }; + } + + await reconfigureBasinForOrg(org.id, retention); + return { kind: "reconfigured", basin: org.streamBasinName, retention }; +} + +// Inverse of ensureBasinForOrg: nulls the column so future runs/sessions +// land in the shared global basin. The S2 basin lingers; existing streams +// age out on their original retention. +export async function deprovisionBasinForOrg( + orgId: string +): Promise<{ kind: "deprovisioned" } | { kind: "skipped"; reason: "no-basin" }> { + const org = await prisma.organization.findFirst({ + where: { id: orgId }, + select: { id: true, streamBasinName: true }, + }); + if (!org?.streamBasinName) return { kind: "skipped", reason: "no-basin" }; + + await prisma.organization.update({ + where: { id: org.id }, + data: { streamBasinName: null }, + }); + + logger.info("[streamBasinProvisioner] deprovisioned basin for org", { + orgId, + previousBasin: org.streamBasinName, + }); + + return { kind: "deprovisioned" }; +} + +// S2 REST: POST /v1/basins to create, PATCH /v1/basins/{name} to +// reconfigure. Wire shape takes integer seconds; we accept human strings +// like "7d" / "1y" as env-var ergonomics and parse them here. + +type CreateBasinOptions = { + accessToken: string; + retentionPolicy: string; + storageClass: "express" | "standard"; + deleteOnEmptyMinAge: string; +}; + +async function s2CreateBasin(name: string, opts: CreateBasinOptions): Promise { + const url = `https://aws.s2.dev/v1/basins`; + const body = { + basin: name, + config: { + create_stream_on_append: true, + create_stream_on_read: true, + default_stream_config: { + storage_class: opts.storageClass, + retention_policy: { age: parseDuration(opts.retentionPolicy) }, + delete_on_empty: { min_age_secs: parseDuration(opts.deleteOnEmptyMinAge) }, + }, + }, + }; + + const res = await fetch(url, { + signal: AbortSignal.timeout(10_000), + method: "POST", + headers: { + Authorization: `Bearer ${opts.accessToken}`, + "Content-Type": "application/json", + }, + body: JSON.stringify(body), + }); + + // 409 = basin already exists; treat as success (idempotent). + if (res.ok || res.status === 409) return; + + const text = await res.text().catch(() => ""); + throw new Error(`S2 createBasin failed: ${res.status} ${res.statusText} ${text}`); +} + +type ReconfigureBasinOptions = { + accessToken: string; + retentionPolicy: string; +}; + +async function s2ReconfigureBasin(name: string, opts: ReconfigureBasinOptions): Promise { + const url = `https://aws.s2.dev/v1/basins/${encodeURIComponent(name)}`; + const body = { + default_stream_config: { + retention_policy: { age: parseDuration(opts.retentionPolicy) }, + }, + }; + + const res = await fetch(url, { + signal: AbortSignal.timeout(10_000), + method: "PATCH", + headers: { + Authorization: `Bearer ${opts.accessToken}`, + "Content-Type": "application/json", + }, + body: JSON.stringify(body), + }); + + if (res.ok) return; + + const text = await res.text().catch(() => ""); + throw new Error(`S2 reconfigureBasin failed: ${res.status} ${res.statusText} ${text}`); +} + diff --git a/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts b/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts index b1bf15b9fed..868294abfde 100644 --- a/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts +++ b/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts @@ -29,41 +29,69 @@ function initializeRedisRealtimeStreams() { export const v1RealtimeStreams = singleton("realtimeStreams", initializeRedisRealtimeStreams); +/** + * Resolve a stream's basin. Precedence: run → session → org → global env. + * Pre-migration rows have `streamBasinName: null` and fall through to + * the global basin (where their streams actually live), so only pass + * `organization` when no run/session row exists at all — otherwise a + * null column would short-circuit to the org's *current* basin. + */ +export type StreamBasinContext = { + run?: { streamBasinName: string | null } | null; + session?: { streamBasinName: string | null } | null; + organization?: { streamBasinName: string | null } | null; +}; + +export function resolveStreamBasin(ctx: StreamBasinContext): string | undefined { + return ( + ctx.run?.streamBasinName ?? + ctx.session?.streamBasinName ?? + ctx.organization?.streamBasinName ?? + env.REALTIME_STREAMS_S2_BASIN ?? + undefined + ); +} + export function getRealtimeStreamInstance( environment: AuthenticatedEnvironment, - streamVersion: string + streamVersion: string, + basinContext?: StreamBasinContext ): StreamIngestor & StreamResponder { if (streamVersion === "v1") { return v1RealtimeStreams; - } else { - if ( - env.REALTIME_STREAMS_S2_BASIN && - (env.REALTIME_STREAMS_S2_ACCESS_TOKEN || - env.REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS === "true") - ) { - return new S2RealtimeStreams({ - basin: env.REALTIME_STREAMS_S2_BASIN, - accessToken: env.REALTIME_STREAMS_S2_ACCESS_TOKEN ?? "", - endpoint: env.REALTIME_STREAMS_S2_ENDPOINT, - skipAccessTokens: env.REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS === "true", - streamPrefix: [ - "org", - environment.organization.id, - "env", - environment.slug, - environment.id, - ].join("/"), - logLevel: env.REALTIME_STREAMS_S2_LOG_LEVEL, - flushIntervalMs: env.REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS, - maxRetries: env.REALTIME_STREAMS_S2_MAX_RETRIES, - s2WaitSeconds: env.REALTIME_STREAMS_S2_WAIT_SECONDS, - accessTokenExpirationInMs: env.REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS, - cache: s2RealtimeStreamsCache, - }); - } + } - throw new Error("Realtime streams v2 is required for this run but S2 configuration is missing"); + const resolvedBasin = resolveStreamBasin(basinContext ?? {}); + if ( + resolvedBasin && + (env.REALTIME_STREAMS_S2_ACCESS_TOKEN || env.REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS === "true") + ) { + return new S2RealtimeStreams({ + basin: resolvedBasin, + accessToken: env.REALTIME_STREAMS_S2_ACCESS_TOKEN ?? "", + endpoint: env.REALTIME_STREAMS_S2_ENDPOINT, + skipAccessTokens: env.REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS === "true", + streamPrefix: streamPrefixFor(environment, resolvedBasin), + logLevel: env.REALTIME_STREAMS_S2_LOG_LEVEL, + flushIntervalMs: env.REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS, + maxRetries: env.REALTIME_STREAMS_S2_MAX_RETRIES, + s2WaitSeconds: env.REALTIME_STREAMS_S2_WAIT_SECONDS, + accessTokenExpirationInMs: env.REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS, + cache: s2RealtimeStreamsCache, + }); } + + throw new Error("Realtime streams v2 is required for this run but S2 configuration is missing"); +} + +// Shared basin needs `org/{orgId}` to namespace; per-org basin already +// isolates so the segment drops. +function streamPrefixFor(environment: AuthenticatedEnvironment, basin: string): string { + const isPerOrgBasin = basin !== env.REALTIME_STREAMS_S2_BASIN; + const segments = isPerOrgBasin + ? ["env", environment.slug, environment.id] + : ["org", environment.organization.id, "env", environment.slug, environment.id]; + return segments.join("/"); } export function determineRealtimeStreamsVersion(streamVersion?: string): "v1" | "v2" { diff --git a/internal-packages/database/prisma/migrations/20260504071227_add_stream_basin_name/migration.sql b/internal-packages/database/prisma/migrations/20260504071227_add_stream_basin_name/migration.sql new file mode 100644 index 00000000000..c346d499e76 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260504071227_add_stream_basin_name/migration.sql @@ -0,0 +1,8 @@ +-- AlterTable +ALTER TABLE "public"."Organization" ADD COLUMN IF NOT EXISTS "streamBasinName" TEXT; + +-- AlterTable +ALTER TABLE "public"."Session" ADD COLUMN IF NOT EXISTS "streamBasinName" TEXT; + +-- AlterTable +ALTER TABLE "public"."TaskRun" ADD COLUMN IF NOT EXISTS "streamBasinName" TEXT; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index c7b5e7ce12b..baf420458b1 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -234,6 +234,13 @@ model Organization { platformNotifications PlatformNotification[] errorGroupStates ErrorGroupState[] + + /// S2 basin that holds this org's realtime streams. Null until the + /// per-org basin has been provisioned (OSS / s2-lite installs leave + /// it null forever; reads fall back to the global basin env var). + /// Set once at provisioning time; retention is reconfigured in-place + /// when the org's plan changes. + streamBasinName String? } model OrgMember { @@ -741,6 +748,12 @@ model Session { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt + /// S2 basin where this session's stream pair lives. Stamped at create + /// time from `Organization.streamBasinName` so reads can resolve the + /// basin without joining org. Null when the org has no per-org basin + /// (OSS, or pre-backfill); reads fall back to the global basin. + streamBasinName String? + runs SessionRun[] /// Idempotency: `(env, externalId)` uniquely identifies a session. @@ -975,6 +988,11 @@ model TaskRun { realtimeStreamsVersion String @default("v1") /// Store the stream keys that are being used by the run realtimeStreams String[] @default([]) + /// S2 basin where this run's realtime streams live. Stamped at create + /// time from `Organization.streamBasinName` so reads can resolve the + /// basin without joining org. Null when the org has no per-org basin + /// (OSS, or pre-backfill); reads fall back to the global basin. + streamBasinName String? @@unique([oneTimeUseToken]) @@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey]) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 0da98c3c835..1725587df45 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -498,6 +498,7 @@ export class RunEngine { bulkActionId, planType, realtimeStreamsVersion, + streamBasinName, debounce, annotations, onDebounced, @@ -660,6 +661,7 @@ export class RunEngine { bulkActionGroupIds: bulkActionId ? [bulkActionId] : undefined, planType, realtimeStreamsVersion, + streamBasinName, debounce: debounce ? { key: debounce.key, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 15e63368d2e..0b17262ba1c 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -259,6 +259,7 @@ export type TriggerParams = { bulkActionId?: string; planType?: string; realtimeStreamsVersion?: string; + streamBasinName?: string | null; debounce?: { key: string; delay: string;