diff --git a/apps/docs/app/[lang]/[[...slug]]/page.tsx b/apps/docs/app/[lang]/[[...slug]]/page.tsx index 8bf0c5fd806..91c37262564 100644 --- a/apps/docs/app/[lang]/[[...slug]]/page.tsx +++ b/apps/docs/app/[lang]/[[...slug]]/page.tsx @@ -48,7 +48,7 @@ const APIPage = createAPIPage(openapi, { {slots.header} {slots.apiPlayground} {slots.authSchemes &&
{slots.authSchemes}
} - {slots.paremeters} + {slots.parameters} {slots.body &&
{slots.body}
} {slots.responses} {slots.callbacks} diff --git a/apps/docs/content/docs/en/enterprise/data-drains.mdx b/apps/docs/content/docs/en/enterprise/data-drains.mdx new file mode 100644 index 00000000000..c1e5ced5876 --- /dev/null +++ b/apps/docs/content/docs/en/enterprise/data-drains.mdx @@ -0,0 +1,185 @@ +--- +title: Data Drains +description: Continuously export workflow logs, audit logs, and Mothership data to your own S3 bucket or HTTPS endpoint on a schedule +--- + +import { FAQ } from '@/components/ui/faq' + +Data Drains let organization owners and admins on Enterprise plans continuously export Sim data to a destination they control — a customer-owned S3 bucket or an HTTPS webhook. A drain runs on a schedule, picks up only new rows since its last successful run, and writes them as NDJSON to the destination. Viewing drain configuration and run history is restricted to owners and admins as well, since destinations expose internal bucket names and webhook URLs. + +Drains are independent of [Data Retention](/enterprise/data-retention) but designed to compose with it — see [Pairing with Data Retention](#pairing-with-data-retention) below. + +--- + +## Setup + +Go to **Settings → Enterprise → Data Drains** in your workspace, then click **New drain**. + +Each drain has four pieces: + +1. A **source** — the category of data to export +2. A **destination** — where the data goes +3. A **schedule** — how often it runs +4. A **name** — unique within your organization + +--- + +## Sources + +A drain exports exactly one source. To export multiple sources, create multiple drains. + +| Source | Description | +|---|---| +| **Workflow logs** | Workflow execution records (one row per execution, only after the run reaches a terminal state). | +| **Job logs** | Background job records (deployed APIs, schedules, webhooks). Only terminal-state rows are exported. | +| **Audit logs** | Organization- and workspace-scoped audit events — logins, permission changes, resource creation/deletion, drain configuration changes. | +| **Copilot chats** | Mothership chat history. | +| **Copilot runs** | Mothership run records (terminal state only). | + +Each row is delivered as a single line of NDJSON. The shape of each row is part of the public schema and stable across versions; every row carries an `id` field that downstream consumers can use to dedupe. + +Drains export each row exactly once based on its creation cursor. Mutable fields on **Copilot chats** (messages, title, `lastSeenAt`) are a point-in-time snapshot and won't be re-emitted if the chat is later updated. Treat the export as append-only and reconstitute current state from your own system of record if you need it. + +--- + +## Destinations + +### Amazon S3 (or any S3-compatible store) + +Writes one NDJSON object per delivered chunk to your bucket. + +- **Bucket** — the bucket name. Must already exist; Sim does not create buckets. +- **Region** — AWS region (e.g. `us-east-1`). +- **Prefix** *(optional)* — folder path inside the bucket. Trailing slash optional. +- **Access key ID / Secret access key** — IAM credentials with `s3:PutObject` on the bucket. The "Test connection" button performs a real write probe to verify, then deletes it. +- **Endpoint** *(optional)* — for non-AWS stores like MinIO, Cloudflare R2, or GCS S3-interop. Leave blank for AWS S3. +- **Force path-style** *(optional)* — required for MinIO/Ceph, must be off for AWS S3 and R2. + +Object keys are deterministic: + +``` +{prefix}/{source}/{drainId}/{yyyy}/{mm}/{dd}/{runId}-{seq}.ndjson +``` + +Objects are written with `AES256` server-side encryption. + +### HTTPS Webhook + +POSTs each chunk as NDJSON to your endpoint. + +- **URL** — must be HTTPS. Sim resolves the hostname and refuses to deliver to private, loopback, or cloud-metadata IPs. The resolved IP is pinned for the duration of a run to prevent DNS rebinding. +- **Signing secret** — shared secret used for HMAC-SHA256 signing. +- **Bearer token** *(optional)* — sent as `Authorization: Bearer `. +- **Signature header name** *(optional)* — defaults to `X-Sim-Signature`. + +Each request includes: + +``` +Content-Type: application/x-ndjson +User-Agent: Sim-DataDrain/1.0 +X-Sim-Timestamp: +X-Sim-Signature-Version: v1 +X-Sim-Signature: t=,v1= +X-Sim-Drain-Id: +X-Sim-Run-Id: +X-Sim-Source: +X-Sim-Sequence: +X-Sim-Row-Count: +Idempotency-Key: - +``` + +The signature is computed as `HMAC-SHA256(secret, "${timestamp}.${body}")` and serialized as `t=,v1=`. Verify by recomputing over the same string and rejecting timestamps older than ~5 minutes — this defends against captured-request replay attacks. + +Failed deliveries retry up to 3 times with exponential backoff (500ms, 1s, 2s with ±20% jitter), respecting `Retry-After` on 429/503. Non-retryable 4xx responses fail the run immediately. + +--- + +## Schedule + +| Cadence | Drain runs | +|---|---| +| **Hourly** | Once per hour. | +| **Daily** | Once per day. | + +You can also disable a drain with the **Enabled** toggle (it stops running but is preserved), or trigger an out-of-schedule run with **Run now** on any drain row. + +--- + +## Delivery semantics + +Drains use an **opaque cursor** that advances only on full success. If a delivery fails partway through a run, the cursor is unchanged and the next run replays from the last successful position. + +This is **at-least-once delivery**. Combined with the `id` field on every row and the `Idempotency-Key` header on every webhook chunk, downstream systems can dedupe deterministically. + +The **last 10 runs** for each drain are visible by expanding its row in the settings page, with status, row count, bytes written, destination locator (`s3://...` or webhook URL), and the error message if it failed. + +--- + +## Security + +- Destination credentials are encrypted at rest using the same key-rotation–aware encryption that protects OAuth tokens. +- Credentials are **never** returned by the Sim API after creation. Updates accept new credentials; omitting them leaves the existing encrypted blob in place. +- Webhook URLs are SSRF-validated: HTTPS-only, no private/loopback/metadata IPs, with the resolved IP pinned to defeat DNS rebinding. +- Every create, update, delete, manual run, and test-connection call is recorded in the [Audit Log](/enterprise/audit-logs). + +--- + +## Pairing with Data Retention + +Drains and [Data Retention](/enterprise/data-retention) are independent modules. Sim does **not** gate retention on drain progress — if a drain is failing, retention will still purge data on its own schedule. This matches the model used by Datadog Archives and AWS CloudWatch + S3 Export: keep the two configurations orthogonal and let the customer pair them deliberately. + +To safely use both together, set the drain cadence shorter than the retention period for the same data category: + +| Drain source | Pairs with retention setting | +|---|---| +| Workflow logs, Job logs | **Log retention** | +| Copilot chats, Copilot runs | **Task cleanup** | +| Audit logs | *(no retention setting today — audit logs are kept indefinitely)* | + +For example, with **Log retention** set to 30 days, set the workflow-logs drain to **Hourly** or **Daily** so every row is exported well before retention purges it from Sim. Monitor recent drain runs in the settings page; if a drain has been failing for longer than your retention window, you may lose rows that retention purges before they are exported. + +After data lands in your bucket or webhook system, archive lifecycle (transitions to Glacier, expiration, GDPR right-to-erasure propagation) is governed by your own infrastructure — Sim has no further visibility into that data once delivery succeeds. + +--- + + + +--- + +## Self-hosted setup + +### Environment variables + +```bash +DATA_DRAINS_ENABLED=true +NEXT_PUBLIC_DATA_DRAINS_ENABLED=true +``` + +`NEXT_PUBLIC_DATA_DRAINS_ENABLED` shows the **Settings → Enterprise → Data Drains** page in the UI. `DATA_DRAINS_ENABLED` gates the server-side mutating endpoints and the cron dispatcher — when unset on a self-hosted deployment, drain create/update/delete/run requests return `404` and the dispatcher is a no-op. Both should be set to `true` together. + +Data Drains otherwise rely on the standard Trigger.dev background job infrastructure used elsewhere in Sim — no additional setup is required. The cron dispatcher runs hourly and fans out due drains as background jobs. diff --git a/apps/docs/content/docs/en/enterprise/index.mdx b/apps/docs/content/docs/en/enterprise/index.mdx index e4f004c62b7..570c8e633f9 100644 --- a/apps/docs/content/docs/en/enterprise/index.mdx +++ b/apps/docs/content/docs/en/enterprise/index.mdx @@ -59,6 +59,12 @@ Configure how long execution logs, soft-deleted resources, and Mothership data a --- +## Data Drains + +Continuously export workflow logs, audit logs, and Mothership data to a customer-owned S3 bucket or HTTPS webhook on a schedule. See the [data drains guide](/docs/enterprise/data-drains). + +--- + + + Open [admin.atlassian.com](https://admin.atlassian.com/) and go to **Directory** → **Service accounts** + + {/* TODO(screenshot): admin.atlassian.com directory page with the "Service accounts" tab highlighted */} + + + Click **Create service account**, give it a name (e.g. `sim-jira-bot`), and finish creation + + + Grant the service account access to the Atlassian sites and products it needs. Open the service account, go to **Product access**, and add Jira and/or Confluence on the relevant site + + {/* TODO(screenshot): service account "Product access" tab showing Jira granted on a site */} + + + + +The service account inherits permissions from the project/space roles you grant it — exactly like a human user. If a workflow needs to write to a specific Jira project, give the service account write access to that project in Jira's project settings. + + +### 2. Create a Scoped API Token + + + + From the service account's page in admin.atlassian.com, open the **API tokens** tab and click **Create API token** + + {/* TODO(screenshot): service account API tokens tab with "Create API token" button */} + + + Choose **API token** as the authentication type (not OAuth 2.0 — Sim uses the API token flow) + +
+ Atlassian admin — Choose authentication type with API token selected +
+
+ + Select the scopes the token needs. The minimum set Sim's Jira and Confluence blocks expect is: + + **Jira (granular):** + ``` + read:jira-user + read:jira-work + write:jira-work + ``` + + **Confluence (granular):** + ``` + read:confluence-content.all + read:confluence-space.summary + write:confluence-content + read:page:confluence + write:page:confluence + ``` + + Add more scopes only if you need the corresponding operations (delete, manage webhooks, etc.). The full list of scopes Sim's blocks may use is documented in [Atlassian's developer reference](https://developer.atlassian.com/cloud/jira/platform/scopes-for-oauth-2-3LO-and-forge-apps/). + +
+ Atlassian token scope picker filtered to App: Jira and Scope type: Classic +
+ + + Use the **App** and **Scope type** filters to narrow the list to the scopes you need. Filter by `App: Jira` (or `Confluence`) and `Scope type: Classic` to find the three core Jira scopes; switch to **Granular** if your org doesn't expose Classic. + +
+ + Copy the token when it's shown. Atlassian only displays it once — if you close the dialog, you'll have to create a new token. + +
+ + +The API token is bearer credentials for the service account. Treat it like a password — do not commit it to source control or share it publicly. Sim encrypts the token at rest. + + +### 3. Find Your Site Domain + +Your Atlassian site domain is the URL you use to access Jira or Confluence in your browser — for example, `your-team.atlassian.net`. Open Jira or Confluence, look at the address bar, and copy the part before the first `/`. + +## Adding the Service Account to Sim + + + + Open your workspace **Settings** and go to the **Integrations** tab + + + Search for "Atlassian Service Account" and click it + + {/* TODO(screenshot): Integrations page with "Atlassian Service Account" in the service list */} + + + Paste the API token, enter the site domain (e.g. `your-team.atlassian.net`), and optionally set a display name and description + +
+ Add Atlassian Service Account dialog with API token and site domain filled in +
+
+ + Click **Add Service Account**. Sim verifies the token by calling Atlassian's `/myself` endpoint through the gateway — if it fails, you'll see a specific error explaining what went wrong. + +
+ +The token, domain, and discovered cloudId are encrypted before being stored. + +## Using the Service Account in Workflows + +Add a Jira or Confluence block to your workflow. In the credential dropdown, your Atlassian service account appears alongside any OAuth credentials. Select it and configure the block as you normally would. + +
+ Jira block in a workflow with the Atlassian service account selected as the credential +
+ +The block calls Atlassian's API gateway (`api.atlassian.com/ex/jira/{cloudId}/...`) using the service account's token. There's no impersonation step — the service account acts as itself, with whatever permissions you granted it in admin.atlassian.com. + + diff --git a/apps/docs/content/docs/en/integrations/meta.json b/apps/docs/content/docs/en/integrations/meta.json index 282504513b3..424b4ce6d4f 100644 --- a/apps/docs/content/docs/en/integrations/meta.json +++ b/apps/docs/content/docs/en/integrations/meta.json @@ -1,5 +1,5 @@ { "title": "Integrations", - "pages": ["index", "google-service-account"], + "pages": ["index", "google-service-account", "atlassian-service-account"], "defaultOpen": false } diff --git a/apps/docs/package.json b/apps/docs/package.json index 8c314fb40bf..d9d9a53f519 100644 --- a/apps/docs/package.json +++ b/apps/docs/package.json @@ -21,10 +21,10 @@ "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "drizzle-orm": "^0.45.2", - "fumadocs-core": "16.6.7", - "fumadocs-mdx": "14.2.8", - "fumadocs-openapi": "10.3.13", - "fumadocs-ui": "16.6.7", + "fumadocs-core": "16.8.5", + "fumadocs-mdx": "14.3.2", + "fumadocs-openapi": "10.8.1", + "fumadocs-ui": "16.8.5", "lucide-react": "^0.511.0", "next": "16.1.6", "next-themes": "^0.4.6", diff --git a/apps/docs/public/static/credentials/atlassian/admin-auth-type-picker.png b/apps/docs/public/static/credentials/atlassian/admin-auth-type-picker.png new file mode 100644 index 00000000000..4232828137b Binary files /dev/null and b/apps/docs/public/static/credentials/atlassian/admin-auth-type-picker.png differ diff --git a/apps/docs/public/static/credentials/atlassian/admin-scope-picker.png b/apps/docs/public/static/credentials/atlassian/admin-scope-picker.png new file mode 100644 index 00000000000..27b160adb16 Binary files /dev/null and b/apps/docs/public/static/credentials/atlassian/admin-scope-picker.png differ diff --git a/apps/docs/public/static/credentials/atlassian/sim-add-modal.png b/apps/docs/public/static/credentials/atlassian/sim-add-modal.png new file mode 100644 index 00000000000..881f43f60e8 Binary files /dev/null and b/apps/docs/public/static/credentials/atlassian/sim-add-modal.png differ diff --git a/apps/docs/public/static/credentials/atlassian/sim-jira-block-credential.png b/apps/docs/public/static/credentials/atlassian/sim-jira-block-credential.png new file mode 100644 index 00000000000..04c2a98960b Binary files /dev/null and b/apps/docs/public/static/credentials/atlassian/sim-jira-block-credential.png differ diff --git a/apps/sim/app/api/auth/oauth/token/route.ts b/apps/sim/app/api/auth/oauth/token/route.ts index 1ab26f84159..adc517ceff9 100644 --- a/apps/sim/app/api/auth/oauth/token/route.ts +++ b/apps/sim/app/api/auth/oauth/token/route.ts @@ -9,7 +9,9 @@ import { authorizeCredentialUse } from '@/lib/auth/credential-access' import { AuthType, checkSessionOrInternalAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID } from '@/lib/oauth/types' import { + getAtlassianServiceAccountSecret, getCredential, getOAuthToken, getServiceAccountToken, @@ -118,6 +120,17 @@ export const POST = withRouteHandler(async (request: NextRequest) => { } try { + if (resolved.providerId === ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID) { + const secret = await getAtlassianServiceAccountSecret(resolved.credentialId) + return NextResponse.json( + { + accessToken: secret.apiToken, + cloudId: secret.cloudId, + domain: secret.domain, + }, + { status: 200 } + ) + } const accessToken = await getServiceAccountToken( resolved.credentialId, scopes ?? [], diff --git a/apps/sim/app/api/auth/oauth/utils.ts b/apps/sim/app/api/auth/oauth/utils.ts index 38b84a59777..4e33048a83d 100644 --- a/apps/sim/app/api/auth/oauth/utils.ts +++ b/apps/sim/app/api/auth/oauth/utils.ts @@ -11,6 +11,10 @@ import { isMicrosoftProvider, PROACTIVE_REFRESH_THRESHOLD_DAYS, } from '@/lib/oauth/microsoft' +import { + ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID, + ATLASSIAN_SERVICE_ACCOUNT_SECRET_TYPE, +} from '@/lib/oauth/types' const logger = createLogger('OAuthUtilsAPI') @@ -44,6 +48,7 @@ export interface ResolvedCredential { usedCredentialTable: boolean credentialType?: string credentialId?: string + providerId?: string } /** @@ -61,6 +66,7 @@ export async function resolveOAuthAccountId( type: credential.type, accountId: credential.accountId, workspaceId: credential.workspaceId, + providerId: credential.providerId, }) .from(credential) .where(eq(credential.id, credentialId)) @@ -73,6 +79,7 @@ export async function resolveOAuthAccountId( credentialId: credentialRow.id, credentialType: 'service_account', workspaceId: credentialRow.workspaceId, + providerId: credentialRow.providerId ?? undefined, usedCredentialTable: true, } } @@ -208,6 +215,53 @@ export async function getServiceAccountToken( return tokenData.access_token } +interface AtlassianServiceAccountSecret { + type: typeof ATLASSIAN_SERVICE_ACCOUNT_SECRET_TYPE + apiToken: string + domain: string + cloudId: string + atlassianAccountId?: string +} + +/** + * Loads the decrypted Atlassian service account secret blob for a credential. + * Throws if the credential is missing or not an Atlassian service account. + */ +export async function getAtlassianServiceAccountSecret( + credentialId: string +): Promise { + const [credentialRow] = await db + .select({ encryptedServiceAccountKey: credential.encryptedServiceAccountKey }) + .from(credential) + .where(eq(credential.id, credentialId)) + .limit(1) + + if (!credentialRow?.encryptedServiceAccountKey) { + throw new Error('Atlassian service account secret not found') + } + + const { decrypted } = await decryptSecret(credentialRow.encryptedServiceAccountKey) + const parsed = JSON.parse(decrypted) as AtlassianServiceAccountSecret + if ( + parsed.type !== ATLASSIAN_SERVICE_ACCOUNT_SECRET_TYPE || + !parsed.apiToken || + !parsed.cloudId + ) { + throw new Error('Stored Atlassian service account secret is malformed') + } + return parsed +} + +/** + * For Atlassian service accounts, the API token IS the access token — + * blocks call api.atlassian.com/ex/jira/{cloudId}/... with `Authorization: Bearer {apiToken}`. + * No exchange or refresh is needed; we just decrypt and return the raw token. + */ +export async function getAtlassianServiceAccountToken(credentialId: string): Promise { + const secret = await getAtlassianServiceAccountSecret(credentialId) + return secret.apiToken +} + /** * Safely inserts an account record, handling duplicate constraint violations gracefully. * If a duplicate is detected (unique constraint violation), logs a warning and returns success. @@ -374,6 +428,10 @@ export async function refreshAccessTokenIfNeeded( } if (resolved.credentialType === 'service_account' && resolved.credentialId) { + if (resolved.providerId === ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID) { + logger.info(`[${requestId}] Using Atlassian service account token for credential`) + return getAtlassianServiceAccountToken(resolved.credentialId) + } if (!scopes?.length) { throw new Error('Scopes are required for service account credentials') } diff --git a/apps/sim/app/api/credentials/route.ts b/apps/sim/app/api/credentials/route.ts index b4e65a5a845..64a3d3f9511 100644 --- a/apps/sim/app/api/credentials/route.ts +++ b/apps/sim/app/api/credentials/route.ts @@ -2,6 +2,7 @@ import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import { db } from '@sim/db' import { account, credential, credentialMember, workspace } from '@sim/db/schema' import { createLogger } from '@sim/logger' +import { getPostgresErrorCode } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' @@ -16,14 +17,36 @@ import { getSession } from '@/lib/auth' import { encryptSecret } from '@/lib/core/security/encryption' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { + AtlassianValidationError, + normalizeAtlassianDomain, + validateAtlassianServiceAccount, +} from '@/lib/credentials/atlassian-service-account' import { getWorkspaceMemberUserIds } from '@/lib/credentials/environment' import { syncWorkspaceOAuthCredentialsForUser } from '@/lib/credentials/oauth' import { getServiceConfigByProviderId } from '@/lib/oauth' +import { + ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID, + ATLASSIAN_SERVICE_ACCOUNT_SECRET_TYPE, +} from '@/lib/oauth/types' import { captureServerEvent } from '@/lib/posthog/server' import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils' const logger = createLogger('CredentialsAPI') +/** + * Thrown by the inner duplicate guard inside the create transaction when a + * concurrent request slipped a row in between the outer existence check and + * our INSERT. The catch maps this to a 409 with a typed `code` so the UI can + * map to a friendly message. + */ +class DuplicateCredentialError extends Error { + constructor() { + super('duplicate_display_name') + this.name = 'DuplicateCredentialError' + } +} + interface ExistingCredentialSourceParams { workspaceId: string type: 'oauth' | 'env_workspace' | 'env_personal' | 'service_account' @@ -34,11 +57,16 @@ interface ExistingCredentialSourceParams { providerId?: string | null } -async function findExistingCredentialBySource(params: ExistingCredentialSourceParams) { +type DbOrTx = typeof db | Parameters[0]>[0] + +async function findExistingCredentialBySourceWith( + exec: DbOrTx, + params: ExistingCredentialSourceParams +) { const { workspaceId, type, accountId, envKey, envOwnerUserId, displayName, providerId } = params if (type === 'oauth' && accountId) { - const [row] = await db + const [row] = await exec .select() .from(credential) .where( @@ -53,7 +81,7 @@ async function findExistingCredentialBySource(params: ExistingCredentialSourcePa } if (type === 'env_workspace' && envKey) { - const [row] = await db + const [row] = await exec .select() .from(credential) .where( @@ -68,7 +96,7 @@ async function findExistingCredentialBySource(params: ExistingCredentialSourcePa } if (type === 'env_personal' && envKey && envOwnerUserId) { - const [row] = await db + const [row] = await exec .select() .from(credential) .where( @@ -84,7 +112,7 @@ async function findExistingCredentialBySource(params: ExistingCredentialSourcePa } if (type === 'service_account' && displayName && providerId) { - const [row] = await db + const [row] = await exec .select() .from(credential) .where( @@ -102,6 +130,17 @@ async function findExistingCredentialBySource(params: ExistingCredentialSourcePa return null } +async function findExistingCredentialBySource(params: ExistingCredentialSourceParams) { + return findExistingCredentialBySourceWith(db, params) +} + +async function findExistingCredentialBySourceTx( + tx: Parameters[0]>[0], + params: ExistingCredentialSourceParams +) { + return findExistingCredentialBySourceWith(tx, params) +} + export const GET = withRouteHandler(async (request: NextRequest) => { const requestId = generateRequestId() const session = await getSession() @@ -253,6 +292,8 @@ export const POST = withRouteHandler(async (request: NextRequest) => { envKey, envOwnerUserId, serviceAccountJson, + apiToken, + domain, } = parsed.data.body const workspaceAccess = await checkWorkspaceAccess(workspaceId, session.user.id) @@ -267,6 +308,7 @@ export const POST = withRouteHandler(async (request: NextRequest) => { const resolvedEnvKey: string | null = envKey ? normalizeCredentialEnvKey(envKey) : null let resolvedEnvOwnerUserId: string | null = null let resolvedEncryptedServiceAccountKey: string | null = null + const extraAuditMetadata: Record = {} if (type === 'oauth') { const [accountRow] = await db @@ -302,34 +344,69 @@ export const POST = withRouteHandler(async (request: NextRequest) => { getServiceConfigByProviderId(accountRow.providerId)?.name || accountRow.providerId } } else if (type === 'service_account') { - if (!serviceAccountJson) { - return NextResponse.json( - { error: 'serviceAccountJson is required for service account credentials' }, - { status: 400 } - ) - } + if (providerId === ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID) { + if (!apiToken || !domain) { + return NextResponse.json( + { error: 'apiToken and domain are required for Atlassian service account credentials' }, + { status: 400 } + ) + } - const jsonParseResult = serviceAccountJsonSchema.safeParse(serviceAccountJson) - if (!jsonParseResult.success) { - return NextResponse.json( - { - error: getValidationErrorMessage(jsonParseResult.error, 'Invalid service account JSON'), - }, - { status: 400 } - ) - } + const normalizedDomain = normalizeAtlassianDomain(domain) + const validation = await validateAtlassianServiceAccount(apiToken, normalizedDomain) - const parsed = jsonParseResult.data - resolvedProviderId = 'google-service-account' - resolvedAccountId = null - resolvedEnvOwnerUserId = null + resolvedProviderId = ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID + resolvedAccountId = null + resolvedEnvOwnerUserId = null - if (!resolvedDisplayName) { - resolvedDisplayName = parsed.client_email - } + if (!resolvedDisplayName) { + resolvedDisplayName = validation.displayName + } + + const blob = JSON.stringify({ + type: ATLASSIAN_SERVICE_ACCOUNT_SECRET_TYPE, + apiToken, + domain: normalizedDomain, + cloudId: validation.cloudId, + atlassianAccountId: validation.accountId, + }) + const { encrypted } = await encryptSecret(blob) + resolvedEncryptedServiceAccountKey = encrypted + extraAuditMetadata.atlassianDomain = normalizedDomain + extraAuditMetadata.atlassianCloudId = validation.cloudId + } else { + if (!serviceAccountJson) { + return NextResponse.json( + { error: 'serviceAccountJson is required for service account credentials' }, + { status: 400 } + ) + } + + const jsonParseResult = serviceAccountJsonSchema.safeParse(serviceAccountJson) + if (!jsonParseResult.success) { + return NextResponse.json( + { + error: getValidationErrorMessage( + jsonParseResult.error, + 'Invalid service account JSON' + ), + }, + { status: 400 } + ) + } + + const parsedKey = jsonParseResult.data + resolvedProviderId = 'google-service-account' + resolvedAccountId = null + resolvedEnvOwnerUserId = null - const { encrypted } = await encryptSecret(serviceAccountJson) - resolvedEncryptedServiceAccountKey = encrypted + if (!resolvedDisplayName) { + resolvedDisplayName = parsedKey.client_email + } + + const { encrypted } = await encryptSecret(serviceAccountJson) + resolvedEncryptedServiceAccountKey = encrypted + } } else if (type === 'env_personal') { resolvedEnvOwnerUserId = envOwnerUserId ?? session.user.id if (resolvedEnvOwnerUserId !== session.user.id) { @@ -428,6 +505,19 @@ export const POST = withRouteHandler(async (request: NextRequest) => { .limit(1) await db.transaction(async (tx) => { + // service_account has no DB-level unique index on (workspaceId, providerId, + // displayName), so we re-check inside the tx. OAuth/env_* are guarded by + // partial unique indexes and fall through to the 23505 handler below. + if (type === 'service_account') { + const innerExisting = await findExistingCredentialBySourceTx(tx, { + workspaceId, + type, + displayName: resolvedDisplayName, + providerId: resolvedProviderId, + }) + if (innerExisting) throw new DuplicateCredentialError() + } + await tx.insert(credential).values({ id: credentialId, workspaceId, @@ -508,36 +598,57 @@ export const POST = withRouteHandler(async (request: NextRequest) => { metadata: { credentialType: type, providerId: resolvedProviderId, + ...extraAuditMetadata, }, request, }) return NextResponse.json({ credential: created }, { status: 201 }) - } catch (error: any) { - if (error?.code === '23505') { + } catch (error: unknown) { + if (error instanceof AtlassianValidationError) { + logger.warn(`[${requestId}] Atlassian credential rejected: ${error.code}`, { + code: error.code, + upstreamStatus: error.status, + ...error.logDetail, + }) + return NextResponse.json({ code: error.code, error: error.code }, { status: 400 }) + } + if (error instanceof DuplicateCredentialError) { + return NextResponse.json( + { + code: 'duplicate_display_name', + error: 'A credential with that name already exists in this workspace.', + }, + { status: 409 } + ) + } + const pgCode = getPostgresErrorCode(error) + if (pgCode === '23505') { return NextResponse.json( { error: 'A credential with this source already exists' }, { status: 409 } ) } - if (error?.code === '23503') { + if (pgCode === '23503') { return NextResponse.json( { error: 'Invalid credential reference or membership target' }, { status: 400 } ) } - if (error?.code === '23514') { + if (pgCode === '23514') { return NextResponse.json( { error: 'Credential source data failed validation checks' }, { status: 400 } ) } + const errAsRecord = + typeof error === 'object' && error !== null ? (error as Record) : {} logger.error(`[${requestId}] Credential create failure details`, { - code: error?.code, - detail: error?.detail, - constraint: error?.constraint, - table: error?.table, - message: error?.message, + code: pgCode, + detail: errAsRecord.detail, + constraint: errAsRecord.constraint, + table: errAsRecord.table, + message: errAsRecord.message, }) logger.error(`[${requestId}] Failed to create credential`, error) return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) diff --git a/apps/sim/app/api/cron/run-data-drains/route.ts b/apps/sim/app/api/cron/run-data-drains/route.ts new file mode 100644 index 00000000000..939d75419a6 --- /dev/null +++ b/apps/sim/app/api/cron/run-data-drains/route.ts @@ -0,0 +1,30 @@ +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { type NextRequest, NextResponse } from 'next/server' +import { verifyCronAuth } from '@/lib/auth/internal' +import { isBillingEnabled, isDataDrainsEnabled } from '@/lib/core/config/feature-flags' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { dispatchDueDrains } from '@/lib/data-drains/dispatcher' + +const logger = createLogger('CronRunDataDrains') + +export const GET = withRouteHandler(async (request: NextRequest) => { + const authError = verifyCronAuth(request, 'Data drain dispatcher') + if (authError) return authError + + // Self-hosted opt-in: skip dispatch entirely when the deployment hasn't + // enabled drains. Sim Cloud (billing enabled) gates per-org by enterprise + // plan inside the dispatcher's join. + if (!isBillingEnabled && !isDataDrainsEnabled) { + return NextResponse.json({ success: true, dispatched: 0, skipped: 'disabled' }) + } + + try { + const result = await dispatchDueDrains() + logger.info('Data drain dispatcher run complete', result) + return NextResponse.json({ success: true, ...result }) + } catch (error) { + logger.error('Data drain dispatcher run failed', { error: toError(error).message }) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +}) diff --git a/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/route.ts b/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/route.ts new file mode 100644 index 00000000000..b0b291b0807 --- /dev/null +++ b/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/route.ts @@ -0,0 +1,190 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' +import { db } from '@sim/db' +import { dataDrains } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { getPostgresErrorCode } from '@sim/utils/errors' +import { and, eq, ne } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { + deleteDataDrainContract, + getDataDrainContract, + updateDataDrainContract, +} from '@/lib/api/contracts/data-drains' +import { parseRequest, validationErrorResponse } from '@/lib/api/server' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { authorizeDrainAccess, loadDrain } from '@/lib/data-drains/access' +import { getDestination } from '@/lib/data-drains/destinations/registry' +import { encryptCredentials } from '@/lib/data-drains/encryption' +import { serializeDrain } from '@/lib/data-drains/serializers' + +const logger = createLogger('DataDrainAPI') + +type RouteContext = { params: Promise<{ id: string; drainId: string }> } + +export const GET = withRouteHandler(async (request: NextRequest, context: RouteContext) => { + const { id: organizationId, drainId } = await context.params + const access = await authorizeDrainAccess(organizationId, { requireMutating: false }) + if (!access.ok) return access.response + + const parsed = await parseRequest(getDataDrainContract, request, context) + if (!parsed.success) return parsed.response + + const drain = await loadDrain(organizationId, drainId) + if (!drain) { + return NextResponse.json({ error: 'Data drain not found' }, { status: 404 }) + } + return NextResponse.json({ drain: serializeDrain(drain) }) +}) + +export const PUT = withRouteHandler(async (request: NextRequest, context: RouteContext) => { + const { id: organizationId, drainId } = await context.params + const access = await authorizeDrainAccess(organizationId, { requireMutating: true }) + if (!access.ok) return access.response + + const parsed = await parseRequest(updateDataDrainContract, request, context) + if (!parsed.success) return parsed.response + + const body = parsed.data.body + + const drain = await loadDrain(organizationId, drainId) + if (!drain) { + return NextResponse.json({ error: 'Data drain not found' }, { status: 404 }) + } + + if (body.name !== undefined && body.name !== drain.name) { + const [conflict] = await db + .select({ id: dataDrains.id }) + .from(dataDrains) + .where( + and( + eq(dataDrains.organizationId, organizationId), + eq(dataDrains.name, body.name), + ne(dataDrains.id, drainId) + ) + ) + .limit(1) + if (conflict) { + return NextResponse.json( + { error: 'A data drain with this name already exists in this organization' }, + { status: 409 } + ) + } + } + + if (body.source !== undefined && body.source !== drain.source) { + return NextResponse.json({ error: 'source cannot be changed after creation' }, { status: 400 }) + } + + const updates: Partial = { updatedAt: new Date() } + if (body.name !== undefined) updates.name = body.name + if (body.scheduleCadence !== undefined) updates.scheduleCadence = body.scheduleCadence + if (body.enabled !== undefined) updates.enabled = body.enabled + + if (body.destinationType !== undefined && body.destinationType !== drain.destinationType) { + return NextResponse.json( + { error: 'destinationType cannot be changed after creation' }, + { status: 400 } + ) + } + if (body.destinationConfig !== undefined || body.destinationCredentials !== undefined) { + const destination = getDestination(drain.destinationType) + if (body.destinationConfig !== undefined) { + const configResult = destination.configSchema.safeParse(body.destinationConfig) + if (!configResult.success) return validationErrorResponse(configResult.error) + updates.destinationConfig = configResult.data as Record + } + if (body.destinationCredentials !== undefined) { + const credentialsResult = destination.credentialsSchema.safeParse(body.destinationCredentials) + if (!credentialsResult.success) return validationErrorResponse(credentialsResult.error) + updates.destinationCredentials = await encryptCredentials(credentialsResult.data) + } + } + + let updated: typeof dataDrains.$inferSelect | undefined + try { + ;[updated] = await db + .update(dataDrains) + .set(updates) + .where(eq(dataDrains.id, drainId)) + .returning() + } catch (error) { + if (getPostgresErrorCode(error) === '23505') { + return NextResponse.json( + { error: 'A data drain with this name already exists in this organization' }, + { status: 409 } + ) + } + throw error + } + + if (!updated) { + // Concurrent DELETE landed between loadDrain() and this UPDATE. + return NextResponse.json({ error: 'Data drain not found' }, { status: 404 }) + } + + logger.info('Data drain updated', { drainId, organizationId }) + + recordAudit({ + workspaceId: null, + actorId: access.session.user.id, + action: AuditAction.DATA_DRAIN_UPDATED, + resourceType: AuditResourceType.DATA_DRAIN, + resourceId: drainId, + actorName: access.session.user.name ?? undefined, + actorEmail: access.session.user.email ?? undefined, + resourceName: updated.name, + description: `Updated data drain '${updated.name}'`, + metadata: { + organizationId, + changes: { + name: body.name, + source: body.source, + scheduleCadence: body.scheduleCadence, + enabled: body.enabled, + destinationConfigChanged: body.destinationConfig !== undefined, + destinationCredentialsChanged: body.destinationCredentials !== undefined, + }, + }, + request, + }) + + return NextResponse.json({ drain: serializeDrain(updated) }) +}) + +export const DELETE = withRouteHandler(async (request: NextRequest, context: RouteContext) => { + const { id: organizationId, drainId } = await context.params + const access = await authorizeDrainAccess(organizationId, { requireMutating: true }) + if (!access.ok) return access.response + + const parsed = await parseRequest(deleteDataDrainContract, request, context) + if (!parsed.success) return parsed.response + + const drain = await loadDrain(organizationId, drainId) + if (!drain) { + return NextResponse.json({ error: 'Data drain not found' }, { status: 404 }) + } + + await db.delete(dataDrains).where(eq(dataDrains.id, drainId)) + + logger.info('Data drain deleted', { drainId, organizationId }) + + recordAudit({ + workspaceId: null, + actorId: access.session.user.id, + action: AuditAction.DATA_DRAIN_DELETED, + resourceType: AuditResourceType.DATA_DRAIN, + resourceId: drainId, + actorName: access.session.user.name ?? undefined, + actorEmail: access.session.user.email ?? undefined, + resourceName: drain.name, + description: `Deleted data drain '${drain.name}'`, + metadata: { + organizationId, + source: drain.source, + destinationType: drain.destinationType, + }, + request, + }) + + return NextResponse.json({ success: true as const }) +}) diff --git a/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/run/route.ts b/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/run/route.ts new file mode 100644 index 00000000000..433f381143d --- /dev/null +++ b/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/run/route.ts @@ -0,0 +1,76 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' +import { db } from '@sim/db' +import { dataDrainRuns } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { runDataDrainContract } from '@/lib/api/contracts/data-drains' +import { parseRequest } from '@/lib/api/server' +import { getJobQueue } from '@/lib/core/async-jobs' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { authorizeDrainAccess, loadDrain } from '@/lib/data-drains/access' + +const logger = createLogger('DataDrainRunAPI') + +type RouteContext = { params: Promise<{ id: string; drainId: string }> } + +export const POST = withRouteHandler(async (request: NextRequest, context: RouteContext) => { + const { id: organizationId, drainId } = await context.params + const access = await authorizeDrainAccess(organizationId, { requireMutating: true }) + if (!access.ok) return access.response + + const parsed = await parseRequest(runDataDrainContract, request, context) + if (!parsed.success) return parsed.response + + const drain = await loadDrain(organizationId, drainId) + if (!drain) { + return NextResponse.json({ error: 'Data drain not found' }, { status: 404 }) + } + if (!drain.enabled) { + return NextResponse.json( + { error: 'Cannot run a disabled drain. Enable it first.' }, + { status: 400 } + ) + } + + // Reject obvious double-fires up-front. The job-queue concurrencyKey is the + // real serialization barrier (it covers the gap between enqueue and the + // runner inserting the `running` row), but this gives the user immediate + // feedback when a run is already in flight. + const [inFlight] = await db + .select({ id: dataDrainRuns.id }) + .from(dataDrainRuns) + .where(and(eq(dataDrainRuns.drainId, drainId), eq(dataDrainRuns.status, 'running'))) + .limit(1) + if (inFlight) { + return NextResponse.json( + { error: 'A run is already in progress for this drain' }, + { status: 409 } + ) + } + + const queue = await getJobQueue() + const jobId = await queue.enqueue( + 'run-data-drain', + { drainId, trigger: 'manual' }, + { concurrencyKey: `data-drain:${drainId}` } + ) + + logger.info('Manually enqueued data drain run', { drainId, organizationId, jobId }) + + recordAudit({ + workspaceId: null, + actorId: access.session.user.id, + action: AuditAction.DATA_DRAIN_RAN, + resourceType: AuditResourceType.DATA_DRAIN, + resourceId: drainId, + actorName: access.session.user.name ?? undefined, + actorEmail: access.session.user.email ?? undefined, + resourceName: drain.name, + description: `Triggered manual run for data drain '${drain.name}'`, + metadata: { organizationId, jobId, trigger: 'manual' }, + request, + }) + + return NextResponse.json({ jobId }) +}) diff --git a/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/runs/route.ts b/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/runs/route.ts new file mode 100644 index 00000000000..cd7ef667ab5 --- /dev/null +++ b/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/runs/route.ts @@ -0,0 +1,37 @@ +import { db } from '@sim/db' +import { dataDrainRuns } from '@sim/db/schema' +import { desc, eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { listDataDrainRunsContract } from '@/lib/api/contracts/data-drains' +import { parseRequest } from '@/lib/api/server' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { authorizeDrainAccess, loadDrain } from '@/lib/data-drains/access' +import { serializeDrainRun } from '@/lib/data-drains/serializers' + +const DEFAULT_LIMIT = 25 + +type RouteContext = { params: Promise<{ id: string; drainId: string }> } + +export const GET = withRouteHandler(async (request: NextRequest, context: RouteContext) => { + const { id: organizationId, drainId } = await context.params + const access = await authorizeDrainAccess(organizationId, { requireMutating: false }) + if (!access.ok) return access.response + + const parsed = await parseRequest(listDataDrainRunsContract, request, context) + if (!parsed.success) return parsed.response + + const drain = await loadDrain(organizationId, drainId) + if (!drain) { + return NextResponse.json({ error: 'Data drain not found' }, { status: 404 }) + } + + const limit = parsed.data.query?.limit ?? DEFAULT_LIMIT + const runs = await db + .select() + .from(dataDrainRuns) + .where(eq(dataDrainRuns.drainId, drainId)) + .orderBy(desc(dataDrainRuns.startedAt)) + .limit(limit) + + return NextResponse.json({ runs: runs.map(serializeDrainRun) }) +}) diff --git a/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/test/route.ts b/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/test/route.ts new file mode 100644 index 00000000000..5550ff9eb4c --- /dev/null +++ b/apps/sim/app/api/organizations/[id]/data-drains/[drainId]/test/route.ts @@ -0,0 +1,91 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { type NextRequest, NextResponse } from 'next/server' +import { testDataDrainContract } from '@/lib/api/contracts/data-drains' +import { parseRequest } from '@/lib/api/server' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { authorizeDrainAccess, loadDrain } from '@/lib/data-drains/access' +import { getDestination } from '@/lib/data-drains/destinations/registry' +import { decryptCredentials } from '@/lib/data-drains/encryption' + +const logger = createLogger('DataDrainTestAPI') + +const TEST_TIMEOUT_MS = 10_000 + +type RouteContext = { params: Promise<{ id: string; drainId: string }> } + +export const POST = withRouteHandler(async (request: NextRequest, context: RouteContext) => { + const { id: organizationId, drainId } = await context.params + const access = await authorizeDrainAccess(organizationId, { requireMutating: true }) + if (!access.ok) return access.response + + const parsed = await parseRequest(testDataDrainContract, request, context) + if (!parsed.success) return parsed.response + + const drain = await loadDrain(organizationId, drainId) + if (!drain) { + return NextResponse.json({ error: 'Data drain not found' }, { status: 404 }) + } + + const destination = getDestination(drain.destinationType) + if (!destination.test) { + return NextResponse.json( + { error: `Destination '${drain.destinationType}' does not support connection testing` }, + { status: 400 } + ) + } + + const config = destination.configSchema.parse(drain.destinationConfig) + const credentials = destination.credentialsSchema.parse( + await decryptCredentials(drain.destinationCredentials) + ) + + const controller = new AbortController() + const timeout = setTimeout(() => controller.abort(), TEST_TIMEOUT_MS) + try { + await destination.test({ config, credentials, signal: controller.signal }) + recordAudit({ + workspaceId: null, + actorId: access.session.user.id, + action: AuditAction.DATA_DRAIN_TESTED, + resourceType: AuditResourceType.DATA_DRAIN, + resourceId: drainId, + actorName: access.session.user.name ?? undefined, + actorEmail: access.session.user.email ?? undefined, + resourceName: drain.name, + description: `Tested connection for data drain '${drain.name}' (success)`, + metadata: { organizationId, destinationType: drain.destinationType, outcome: 'success' }, + request, + }) + return NextResponse.json({ ok: true as const }) + } catch (error) { + const message = toError(error).message + logger.warn('Data drain test connection failed', { + drainId, + destinationType: drain.destinationType, + error: message, + }) + recordAudit({ + workspaceId: null, + actorId: access.session.user.id, + action: AuditAction.DATA_DRAIN_TESTED, + resourceType: AuditResourceType.DATA_DRAIN, + resourceId: drainId, + actorName: access.session.user.name ?? undefined, + actorEmail: access.session.user.email ?? undefined, + resourceName: drain.name, + description: `Tested connection for data drain '${drain.name}' (failed)`, + metadata: { + organizationId, + destinationType: drain.destinationType, + outcome: 'failed', + error: message, + }, + request, + }) + return NextResponse.json({ error: message }, { status: 400 }) + } finally { + clearTimeout(timeout) + } +}) diff --git a/apps/sim/app/api/organizations/[id]/data-drains/route.ts b/apps/sim/app/api/organizations/[id]/data-drains/route.ts new file mode 100644 index 00000000000..d78655ae28b --- /dev/null +++ b/apps/sim/app/api/organizations/[id]/data-drains/route.ts @@ -0,0 +1,136 @@ +import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' +import { db } from '@sim/db' +import { dataDrains } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { getPostgresErrorCode } from '@sim/utils/errors' +import { generateId } from '@sim/utils/id' +import { and, asc, eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { createDataDrainContract, listDataDrainsContract } from '@/lib/api/contracts/data-drains' +import { parseRequest, validationErrorResponse } from '@/lib/api/server' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { authorizeDrainAccess } from '@/lib/data-drains/access' +import { getDestination } from '@/lib/data-drains/destinations/registry' +import { encryptCredentials } from '@/lib/data-drains/encryption' +import { serializeDrain } from '@/lib/data-drains/serializers' + +const logger = createLogger('DataDrainsAPI') + +type RouteContext = { params: Promise<{ id: string }> } + +export const GET = withRouteHandler(async (request: NextRequest, context: RouteContext) => { + const { id: organizationId } = await context.params + const access = await authorizeDrainAccess(organizationId, { requireMutating: false }) + if (!access.ok) return access.response + + const parsed = await parseRequest(listDataDrainsContract, request, context) + if (!parsed.success) return parsed.response + + const rows = await db + .select() + .from(dataDrains) + .where(eq(dataDrains.organizationId, organizationId)) + .orderBy(asc(dataDrains.createdAt)) + + return NextResponse.json({ drains: rows.map(serializeDrain) }) +}) + +export const POST = withRouteHandler(async (request: NextRequest, context: RouteContext) => { + const { id: organizationId } = await context.params + const access = await authorizeDrainAccess(organizationId, { requireMutating: true }) + if (!access.ok) return access.response + + const parsed = await parseRequest(createDataDrainContract, request, context) + if (!parsed.success) return parsed.response + + const body = parsed.data.body + + if (!body.destinationCredentials) { + return NextResponse.json( + { error: 'destinationCredentials is required when creating a drain' }, + { status: 400 } + ) + } + const destination = getDestination(body.destinationType) + const configResult = destination.configSchema.safeParse(body.destinationConfig) + if (!configResult.success) return validationErrorResponse(configResult.error) + const credentialsResult = destination.credentialsSchema.safeParse(body.destinationCredentials) + if (!credentialsResult.success) return validationErrorResponse(credentialsResult.error) + const encryptedCredentials = await encryptCredentials(credentialsResult.data) + + const [existing] = await db + .select({ id: dataDrains.id }) + .from(dataDrains) + .where(and(eq(dataDrains.organizationId, organizationId), eq(dataDrains.name, body.name))) + .limit(1) + if (existing) { + return NextResponse.json( + { error: 'A data drain with this name already exists in this organization' }, + { status: 409 } + ) + } + + const id = generateId() + const now = new Date() + let inserted: typeof dataDrains.$inferSelect | undefined + try { + ;[inserted] = await db + .insert(dataDrains) + .values({ + id, + organizationId, + name: body.name, + source: body.source, + destinationType: body.destinationType, + destinationConfig: configResult.data as Record, + destinationCredentials: encryptedCredentials, + scheduleCadence: body.scheduleCadence, + enabled: body.enabled ?? true, + cursor: null, + createdBy: access.session.user.id, + createdAt: now, + updatedAt: now, + }) + .returning() + } catch (error) { + if (getPostgresErrorCode(error) === '23505') { + return NextResponse.json( + { error: 'A data drain with this name already exists in this organization' }, + { status: 409 } + ) + } + throw error + } + + if (!inserted) { + throw new Error('Insert returned no row') + } + + logger.info('Data drain created', { + drainId: id, + organizationId, + source: body.source, + destinationType: body.destinationType, + }) + + recordAudit({ + workspaceId: null, + actorId: access.session.user.id, + action: AuditAction.DATA_DRAIN_CREATED, + resourceType: AuditResourceType.DATA_DRAIN, + resourceId: id, + actorName: access.session.user.name ?? undefined, + actorEmail: access.session.user.email ?? undefined, + resourceName: body.name, + description: `Created data drain '${body.name}'`, + metadata: { + organizationId, + source: body.source, + destinationType: body.destinationType, + scheduleCadence: body.scheduleCadence, + }, + request, + }) + + return NextResponse.json({ drain: serializeDrain(inserted) }, { status: 201 }) +}) diff --git a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts index 545731c9bd9..a9bab734a0f 100644 --- a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts +++ b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts @@ -10,7 +10,6 @@ import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { getBaseUrl } from '@/lib/core/utils/urls' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' -import { setExecutionMeta } from '@/lib/execution/event-buffer' import { preprocessExecution } from '@/lib/execution/preprocessing' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' import { createStreamingResponse } from '@/lib/workflows/streaming/streaming' @@ -145,6 +144,7 @@ export const POST = withRouteHandler( contextId, resumeInput, userId, + allowedPauseKinds: ['human'], }) if (enqueueResult.status === 'queued') { @@ -156,12 +156,6 @@ export const POST = withRouteHandler( }) } - await setExecutionMeta(enqueueResult.resumeExecutionId, { - status: 'active', - userId, - workflowId, - }) - const resumeArgs = { resumeEntryId: enqueueResult.resumeEntryId, resumeExecutionId: enqueueResult.resumeExecutionId, @@ -248,6 +242,14 @@ export const POST = withRouteHandler( error: toError(dispatchError).message, resumeExecutionId: enqueueResult.resumeExecutionId, }) + await PauseResumeManager.markResumeAttemptFailed({ + resumeEntryId: enqueueResult.resumeEntryId, + pausedExecutionId: enqueueResult.pausedExecution.id, + parentExecutionId: executionId, + contextId: enqueueResult.contextId, + failureReason: 'Failed to queue async resume execution', + }) + await PauseResumeManager.processQueuedResumes(executionId) return NextResponse.json( { error: 'Failed to queue resume execution. Please try again.' }, { status: 503 } diff --git a/apps/sim/app/api/resume/poll/route.ts b/apps/sim/app/api/resume/poll/route.ts new file mode 100644 index 00000000000..09f76ff7f15 --- /dev/null +++ b/apps/sim/app/api/resume/poll/route.ts @@ -0,0 +1,175 @@ +import { db } from '@sim/db' +import { pausedExecutions } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { generateShortId } from '@sim/utils/id' +import { and, asc, eq, isNotNull, lte } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { verifyCronAuth } from '@/lib/auth/internal' +import { acquireLock, releaseLock } from '@/lib/core/config/redis' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { + computeEarliestResumeAt, + PauseResumeManager, +} from '@/lib/workflows/executor/human-in-the-loop-manager' +import type { PausePoint } from '@/executor/types' + +const logger = createLogger('TimePauseResumePoll') + +export const dynamic = 'force-dynamic' +export const maxDuration = 120 + +const LOCK_KEY = 'time-pause-resume-poll-lock' +const LOCK_TTL_SECONDS = 180 +const POLL_BATCH_LIMIT = 200 + +interface DispatchFailure { + executionId: string + contextId: string + error: string +} + +interface RowResult { + dispatched: number + failures: DispatchFailure[] +} + +export const GET = withRouteHandler(async (request: NextRequest) => { + const requestId = generateShortId() + + const authError = verifyCronAuth(request, 'Time-pause resume poll') + if (authError) return authError + + const lockAcquired = await acquireLock(LOCK_KEY, requestId, LOCK_TTL_SECONDS) + if (!lockAcquired) { + return NextResponse.json( + { success: true, message: 'Polling already in progress – skipped', requestId }, + { status: 202 } + ) + } + + try { + const now = new Date() + + const dueRows = await db + .select({ + id: pausedExecutions.id, + executionId: pausedExecutions.executionId, + workflowId: pausedExecutions.workflowId, + pausePoints: pausedExecutions.pausePoints, + metadata: pausedExecutions.metadata, + }) + .from(pausedExecutions) + .where( + and( + eq(pausedExecutions.status, 'paused'), + isNotNull(pausedExecutions.nextResumeAt), + lte(pausedExecutions.nextResumeAt, now) + ) + ) + .orderBy(asc(pausedExecutions.nextResumeAt)) + .limit(POLL_BATCH_LIMIT) + + const results = await Promise.all(dueRows.map((row) => dispatchRow(row, now))) + const dispatched = results.reduce((sum, r) => sum + r.dispatched, 0) + const failures = results.flatMap((r) => r.failures) + + logger.info('Time-pause resume poll completed', { + requestId, + claimedRows: dueRows.length, + dispatched, + failureCount: failures.length, + }) + + return NextResponse.json({ + success: true, + requestId, + claimedRows: dueRows.length, + dispatched, + failures, + }) + } catch (error) { + const message = toError(error).message + logger.error('Time-pause resume poll failed', { requestId, error: message }) + return NextResponse.json({ success: false, requestId, error: message }, { status: 500 }) + } finally { + await releaseLock(LOCK_KEY, requestId).catch(() => {}) + } +}) + +interface DueRow { + id: string + executionId: string + workflowId: string + pausePoints: unknown + metadata: unknown +} + +async function dispatchRow(row: DueRow, now: Date): Promise { + const points = (row.pausePoints ?? {}) as Record + const metadata = (row.metadata ?? {}) as Record + const userId = typeof metadata.executorUserId === 'string' ? metadata.executorUserId : '' + + const eligiblePoints = Object.values(points).filter( + (point) => + point.pauseKind === 'time' && (!point.resumeStatus || point.resumeStatus === 'paused') + ) + const duePoints = eligiblePoints.filter((point) => { + if (!point.resumeAt) return false + const at = new Date(point.resumeAt) + return !Number.isNaN(at.getTime()) && at <= now + }) + + const failures: DispatchFailure[] = [] + let dispatched = 0 + + for (const point of duePoints) { + if (!point.contextId) continue + try { + const enqueueResult = await PauseResumeManager.enqueueOrStartResume({ + executionId: row.executionId, + contextId: point.contextId, + resumeInput: {}, + userId, + allowedPauseKinds: ['time'], + }) + + if (enqueueResult.status === 'starting') { + PauseResumeManager.startResumeExecution({ + resumeEntryId: enqueueResult.resumeEntryId, + resumeExecutionId: enqueueResult.resumeExecutionId, + pausedExecution: enqueueResult.pausedExecution, + contextId: enqueueResult.contextId, + resumeInput: enqueueResult.resumeInput, + userId: enqueueResult.userId, + }).catch((error) => { + logger.error('Background time-pause resume failed', { + executionId: row.executionId, + contextId: point.contextId, + error: toError(error).message, + }) + }) + } + dispatched++ + } catch (error) { + const message = toError(error).message + logger.warn('Failed to dispatch time-pause resume', { + executionId: row.executionId, + contextId: point.contextId, + error: message, + }) + failures.push({ executionId: row.executionId, contextId: point.contextId, error: message }) + } + } + + // We never auto-retry a failed dispatch: workflow blocks aren't idempotent, and + // an operator must investigate stranded rows by hand. The status='paused' guard + // also prevents clobbering when a concurrent manual resume has already advanced + // the row's state since we read it. + await PauseResumeManager.setNextResumeAt({ + pausedExecutionId: row.id, + nextResumeAt: computeEarliestResumeAt(eligiblePoints, { after: now }), + }) + + return { dispatched, failures } +} diff --git a/apps/sim/app/api/tools/confluence/selector-spaces/route.ts b/apps/sim/app/api/tools/confluence/selector-spaces/route.ts index 59a674df87a..e8a8b032480 100644 --- a/apps/sim/app/api/tools/confluence/selector-spaces/route.ts +++ b/apps/sim/app/api/tools/confluence/selector-spaces/route.ts @@ -6,7 +6,12 @@ import { authorizeCredentialUse } from '@/lib/auth/credential-access' import { validateJiraCloudId } from '@/lib/core/security/input-validation' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' -import { refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils' +import { ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID } from '@/lib/oauth/types' +import { + getAtlassianServiceAccountSecret, + refreshAccessTokenIfNeeded, + resolveOAuthAccountId, +} from '@/app/api/auth/oauth/utils' import { getConfluenceCloudId } from '@/tools/confluence/utils' import { parseAtlassianErrorMessage } from '@/tools/jira/utils' @@ -14,13 +19,33 @@ const logger = createLogger('ConfluenceSelectorSpacesAPI') export const dynamic = 'force-dynamic' +const PAGE_LIMIT = 250 + +type SpaceStatus = 'current' | 'archived' + +/** + * Cursor format: `:`. Empty inner cursor means "first page + * of that status". When current is exhausted we hand back `archived:` so the + * client transparently flips to the archived stream — listing both surfaces + * archived spaces in the dropdown, which would otherwise only be reachable by + * typing the space key manually even though sync works against archived spaces. + */ +function parseCursor(raw: string | undefined): { status: SpaceStatus; inner?: string } { + if (!raw) return { status: 'current' } + const idx = raw.indexOf(':') + if (idx === -1) return { status: 'current' } + const status = raw.slice(0, idx) === 'archived' ? 'archived' : 'current' + const inner = raw.slice(idx + 1) + return { status, inner: inner || undefined } +} + export const POST = withRouteHandler(async (request: NextRequest) => { const requestId = generateRequestId() try { const parsed = await parseRequest(confluenceSpacesSelectorContract, request, {}) if (!parsed.success) return parsed.response - const { credential, workflowId, domain } = parsed.data.body + const { credential, workflowId, domain, cursor } = parsed.data.body if (!credential) { logger.error('Missing credential in request') @@ -39,50 +64,57 @@ export const POST = withRouteHandler(async (request: NextRequest) => { return NextResponse.json({ error: authz.error || 'Unauthorized' }, { status: 403 }) } - const accessToken = await refreshAccessTokenIfNeeded( - credential, - authz.credentialOwnerUserId, - requestId - ) - if (!accessToken) { - logger.error('Failed to get access token', { - credentialId: credential, - userId: authz.credentialOwnerUserId, - }) - return NextResponse.json( - { error: 'Could not retrieve access token', authRequired: true }, - { status: 401 } + const resolved = await resolveOAuthAccountId(credential) + const isAtlassianServiceAccount = + resolved?.providerId === ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID && !!resolved.credentialId + + let accessToken: string | null + let cloudId: string + if (isAtlassianServiceAccount) { + const secret = await getAtlassianServiceAccountSecret(resolved.credentialId!) + accessToken = secret.apiToken + cloudId = secret.cloudId + } else { + accessToken = await refreshAccessTokenIfNeeded( + credential, + authz.credentialOwnerUserId, + requestId ) + if (!accessToken) { + logger.error('Failed to get access token', { + credentialId: credential, + userId: authz.credentialOwnerUserId, + }) + return NextResponse.json( + { error: 'Could not retrieve access token', authRequired: true }, + { status: 401 } + ) + } + cloudId = await getConfluenceCloudId(domain, accessToken) } - const cloudId = await getConfluenceCloudId(domain, accessToken) - const cloudIdValidation = validateJiraCloudId(cloudId, 'cloudId') if (!cloudIdValidation.isValid) { return NextResponse.json({ error: cloudIdValidation.error }, { status: 400 }) } - const url = `https://api.atlassian.com/ex/confluence/${cloudIdValidation.sanitized}/wiki/api/v2/spaces?limit=250` + const baseUrl = `https://api.atlassian.com/ex/confluence/${cloudIdValidation.sanitized}/wiki/api/v2/spaces` + const { status, inner } = parseCursor(cursor) + + const params = new URLSearchParams({ limit: String(PAGE_LIMIT), status }) + if (inner) params.set('cursor', inner) + const url = `${baseUrl}?${params.toString()}` const response = await fetch(url, { method: 'GET', - headers: { - Accept: 'application/json', - Authorization: `Bearer ${accessToken}`, - }, + headers: { Accept: 'application/json', Authorization: `Bearer ${accessToken}` }, }) if (!response.ok) { const errorText = await response.text() - logger.error('Confluence API error response:', { - status: response.status, - statusText: response.statusText, - error: errorText, - }) - return NextResponse.json( - { error: parseAtlassianErrorMessage(response.status, response.statusText, errorText) }, - { status: response.status } - ) + const message = parseAtlassianErrorMessage(response.status, response.statusText, errorText) + logger.error('Confluence API error response', { error: message, status: response.status }) + return NextResponse.json({ error: message }, { status: 502 }) } const data = await response.json() @@ -90,9 +122,27 @@ export const POST = withRouteHandler(async (request: NextRequest) => { id: space.id, name: space.name, key: space.key, + status, })) - return NextResponse.json({ spaces }) + let nextInner: string | undefined + const nextLink = data._links?.next as string | undefined + if (nextLink) { + try { + nextInner = new URL(nextLink, 'https://placeholder').searchParams.get('cursor') || undefined + } catch { + nextInner = undefined + } + } + + let nextCursor: string | undefined + if (nextInner) { + nextCursor = `${status}:${nextInner}` + } else if (status === 'current') { + nextCursor = 'archived:' + } + + return NextResponse.json({ spaces, nextCursor }) } catch (error) { logger.error('Error listing Confluence spaces:', error) return NextResponse.json( diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 24e9038dd55..b0f0a0b1d4d 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -25,7 +25,12 @@ import { SIM_VIA_HEADER, validateCallChain, } from '@/lib/execution/call-chain' -import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer' +import { + createExecutionEventWriter, + flushExecutionStreamReplayBuffer, + initializeExecutionStreamMeta, + type TerminalExecutionStreamStatus, +} from '@/lib/execution/event-buffer' import { processInputFileFields } from '@/lib/execution/files' import { registerManualExecutionAborter, @@ -868,11 +873,17 @@ async function handleExecutePost( let isManualAbortRegistered = false const eventWriter = createExecutionEventWriter(executionId) - setExecutionMeta(executionId, { - status: 'active', + const metaInitialized = await initializeExecutionStreamMeta(executionId, { userId: actorUserId, workflowId, - }).catch(() => {}) + }) + if (!metaInitialized) { + timeoutController.cleanup() + return NextResponse.json( + { error: 'Run buffer temporarily unavailable' }, + { status: 503, headers: { 'X-Execution-Id': executionId } } + ) + } const stream = new ReadableStream({ async start(controller) { @@ -881,12 +892,18 @@ async function handleExecutePost( registerManualExecutionAborter(executionId, timeoutController.abort) isManualAbortRegistered = true - let localEventSeq = 0 - const sendEvent = (event: ExecutionEvent) => { + let terminalEventPublished = false + const sendEvent = async ( + event: ExecutionEvent, + terminalStatus?: TerminalExecutionStreamStatus + ) => { const isBuffered = event.type !== 'stream:chunk' && event.type !== 'stream:done' if (isBuffered) { - localEventSeq++ - event.eventId = localEventSeq + const entry = terminalStatus + ? await eventWriter.writeTerminal(event, terminalStatus) + : await eventWriter.write(event) + event.eventId = entry.eventId + terminalEventPublished ||= Boolean(terminalStatus) } if (!isStreamClosed) { try { @@ -895,15 +912,12 @@ async function handleExecutePost( isStreamClosed = true } } - if (isBuffered) { - eventWriter.write(event).catch(() => {}) - } } try { const startTime = new Date() - sendEvent({ + await sendEvent({ type: 'execution:started', timestamp: startTime.toISOString(), executionId, @@ -922,7 +936,7 @@ async function handleExecutePost( childWorkflowContext?: ChildWorkflowContext ) => { reqLogger.info('onBlockStart called', { blockId, blockName, blockType }) - sendEvent({ + await sendEvent({ type: 'block:started', timestamp: new Date().toISOString(), executionId, @@ -976,7 +990,7 @@ async function handleExecutePost( blockType, error: callbackData.output.error, }) - sendEvent({ + await sendEvent({ type: 'block:error', timestamp: new Date().toISOString(), executionId, @@ -1010,7 +1024,7 @@ async function handleExecutePost( blockName, blockType, }) - sendEvent({ + await sendEvent({ type: 'block:completed', timestamp: new Date().toISOString(), executionId, @@ -1053,7 +1067,7 @@ async function handleExecutePost( if (done) break const chunk = decoder.decode(value, { stream: true }) - sendEvent({ + await sendEvent({ type: 'stream:chunk', timestamp: new Date().toISOString(), executionId, @@ -1062,7 +1076,7 @@ async function handleExecutePost( }) } - sendEvent({ + await sendEvent({ type: 'stream:done', timestamp: new Date().toISOString(), executionId, @@ -1107,13 +1121,14 @@ async function handleExecutePost( selectedOutputs ) - const onChildWorkflowInstanceReady = ( + const onChildWorkflowInstanceReady = async ( blockId: string, childWorkflowInstanceId: string, iterationContext?: IterationContext, - executionOrder?: number + executionOrder?: number, + childWorkflowContext?: ChildWorkflowContext ) => { - sendEvent({ + await sendEvent({ type: 'block:childWorkflowStarted', timestamp: new Date().toISOString(), executionId, @@ -1123,7 +1138,16 @@ async function handleExecutePost( childWorkflowInstanceId, ...(iterationContext && { iterationCurrent: iterationContext.iterationCurrent, + iterationTotal: iterationContext.iterationTotal, + iterationType: iterationContext.iterationType, iterationContainerId: iterationContext.iterationContainerId, + ...(iterationContext.parentIterations?.length && { + parentIterations: iterationContext.parentIterations, + }), + }), + ...(childWorkflowContext && { + childWorkflowBlockId: childWorkflowContext.parentBlockId, + childWorkflowName: childWorkflowContext.workflowName, }), ...(executionOrder !== undefined && { executionOrder }), }, @@ -1157,32 +1181,38 @@ async function handleExecutePost( await loggingSession.markAsFailed(timeoutErrorMessage) - sendEvent({ - type: 'execution:error', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - error: timeoutErrorMessage, - duration: result.metadata?.duration || 0, - finalBlockLogs: result.logs, - }, - }) finalMetaStatus = 'error' + await sendEvent( + { + type: 'execution:error', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + error: timeoutErrorMessage, + duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, + }, + }, + 'error' + ) } else { reqLogger.info('Workflow execution was cancelled') - sendEvent({ - type: 'execution:cancelled', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - duration: result.metadata?.duration || 0, - finalBlockLogs: result.logs, - }, - }) finalMetaStatus = 'cancelled' + await sendEvent( + { + type: 'execution:cancelled', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, + }, + }, + 'cancelled' + ) } return } @@ -1196,35 +1226,43 @@ async function handleExecutePost( : result.output if (result.status === 'paused') { - sendEvent({ - type: 'execution:paused', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - output: sseOutput, - duration: result.metadata?.duration || 0, - startTime: result.metadata?.startTime || startTime.toISOString(), - endTime: result.metadata?.endTime || new Date().toISOString(), + finalMetaStatus = 'complete' + await sendEvent( + { + type: 'execution:paused', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + output: sseOutput, + duration: result.metadata?.duration || 0, + startTime: result.metadata?.startTime || startTime.toISOString(), + endTime: result.metadata?.endTime || new Date().toISOString(), + finalBlockLogs: result.logs, + }, }, - }) + 'complete' + ) } else { - sendEvent({ - type: 'execution:completed', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - success: result.success, - output: sseOutput, - duration: result.metadata?.duration || 0, - startTime: result.metadata?.startTime || startTime.toISOString(), - endTime: result.metadata?.endTime || new Date().toISOString(), - finalBlockLogs: result.logs, + finalMetaStatus = 'complete' + await sendEvent( + { + type: 'execution:completed', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + success: result.success, + output: sseOutput, + duration: result.metadata?.duration || 0, + startTime: result.metadata?.startTime || startTime.toISOString(), + endTime: result.metadata?.endTime || new Date().toISOString(), + finalBlockLogs: result.logs, + }, }, - }) + 'complete' + ) } - finalMetaStatus = 'complete' } catch (error: unknown) { const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut() const errorMessage = isTimeout @@ -1237,32 +1275,55 @@ async function handleExecutePost( const executionResult = hasExecutionResult(error) ? error.executionResult : undefined - sendEvent({ - type: 'execution:error', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - error: executionResult?.error || errorMessage, - duration: executionResult?.metadata?.duration || 0, - finalBlockLogs: executionResult?.logs, - }, - }) finalMetaStatus = 'error' + await sendEvent( + { + type: 'execution:error', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + error: executionResult?.error || errorMessage, + duration: executionResult?.metadata?.duration || 0, + finalBlockLogs: executionResult?.logs, + }, + }, + 'error' + ) } finally { if (isManualAbortRegistered) { unregisterManualExecutionAborter(executionId) isManualAbortRegistered = false } - try { - await eventWriter.close() - } catch (closeError) { - reqLogger.warn('Failed to close event writer', { - error: toError(closeError).message, + if (finalMetaStatus && !terminalEventPublished) { + const replayBufferFlushed = await flushExecutionStreamReplayBuffer( + executionId, + eventWriter + ) + reqLogger.error('Failed to publish terminal execution event durably', { + executionId, + status: finalMetaStatus, + replayBufferFlushed, }) - } - if (finalMetaStatus) { - setExecutionMeta(executionId, { status: finalMetaStatus }).catch(() => {}) + if (!isStreamClosed) { + controller.error(new Error('Run buffer terminal event publish failed')) + isStreamClosed = true + } + } else if (terminalEventPublished) { + await eventWriter.close().catch((closeError) => { + reqLogger.warn('Failed to close execution event writer after terminal publish', { + executionId, + error: closeError instanceof Error ? closeError.message : String(closeError), + }) + }) + } else { + try { + await eventWriter.close() + } catch (closeError) { + reqLogger.warn('Failed to close event writer', { + error: toError(closeError).message, + }) + } } timeoutController.cleanup() if (executionId) { diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts index c7b86847f0b..6ee6c71aa7d 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts @@ -15,17 +15,27 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' const { mockMarkExecutionCancelled, mockAbortManualExecution, - mockCancelPausedExecution, - mockSetExecutionMeta, + mockBeginPausedCancellation, + mockBlockQueuedResumesForCancellation, + mockClearPausedCancellationIntent, + mockCompletePausedCancellation, + mockGetPausedCancellationStatus, + mockFinalizeExecutionStream, + mockReadExecutionMetaState, mockWriteEvent, - mockCloseWriter, + mockWriteTerminalEvent, } = vi.hoisted(() => ({ mockMarkExecutionCancelled: vi.fn(), mockAbortManualExecution: vi.fn(), - mockCancelPausedExecution: vi.fn(), - mockSetExecutionMeta: vi.fn(), + mockBeginPausedCancellation: vi.fn(), + mockBlockQueuedResumesForCancellation: vi.fn(), + mockClearPausedCancellationIntent: vi.fn(), + mockCompletePausedCancellation: vi.fn(), + mockGetPausedCancellationStatus: vi.fn(), + mockFinalizeExecutionStream: vi.fn(), + mockReadExecutionMetaState: vi.fn(), mockWriteEvent: vi.fn(), - mockCloseWriter: vi.fn(), + mockWriteTerminalEvent: vi.fn(), })) vi.mock('@/lib/execution/cancellation', () => ({ @@ -38,7 +48,13 @@ vi.mock('@/lib/execution/manual-cancellation', () => ({ vi.mock('@/lib/workflows/executor/human-in-the-loop-manager', () => ({ PauseResumeManager: { - cancelPausedExecution: (...args: unknown[]) => mockCancelPausedExecution(...args), + beginPausedCancellation: (...args: unknown[]) => mockBeginPausedCancellation(...args), + blockQueuedResumesForCancellation: (...args: unknown[]) => + mockBlockQueuedResumesForCancellation(...args), + clearPausedCancellationIntent: (...args: unknown[]) => + mockClearPausedCancellationIntent(...args), + completePausedCancellation: (...args: unknown[]) => mockCompletePausedCancellation(...args), + getPausedCancellationStatus: (...args: unknown[]) => mockGetPausedCancellationStatus(...args), }, })) @@ -47,10 +63,12 @@ vi.mock('@/lib/workflows/utils', () => workflowsUtilsMock) vi.mock('@/lib/posthog/server', () => posthogServerMock) vi.mock('@/lib/execution/event-buffer', () => ({ - setExecutionMeta: (...args: unknown[]) => mockSetExecutionMeta(...args), + finalizeExecutionStream: (...args: unknown[]) => mockFinalizeExecutionStream(...args), + readExecutionMetaState: (...args: unknown[]) => mockReadExecutionMetaState(...args), createExecutionEventWriter: () => ({ write: (...args: unknown[]) => mockWriteEvent(...args), - close: () => mockCloseWriter(), + writeTerminal: (...args: unknown[]) => mockWriteTerminalEvent(...args), + close: vi.fn().mockResolvedValue(undefined), }), })) @@ -71,10 +89,15 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => { allowed: true, }) mockAbortManualExecution.mockReturnValue(false) - mockCancelPausedExecution.mockResolvedValue(false) - mockSetExecutionMeta.mockResolvedValue(undefined) + mockBeginPausedCancellation.mockResolvedValue(false) + mockBlockQueuedResumesForCancellation.mockResolvedValue(false) + mockClearPausedCancellationIntent.mockResolvedValue(undefined) + mockCompletePausedCancellation.mockResolvedValue(false) + mockGetPausedCancellationStatus.mockResolvedValue(null) + mockFinalizeExecutionStream.mockResolvedValue(true) + mockReadExecutionMetaState.mockResolvedValue({ status: 'missing' }) mockWriteEvent.mockResolvedValue({ eventId: 1 }) - mockCloseWriter.mockResolvedValue(undefined) + mockWriteTerminalEvent.mockResolvedValue({ eventId: 1 }) }) it('returns success when cancellation was durably recorded', async () => { @@ -159,11 +182,8 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => { }) it('returns success when a paused HITL execution is cancelled directly in the database', async () => { - mockMarkExecutionCancelled.mockResolvedValue({ - durablyRecorded: false, - reason: 'redis_unavailable', - }) - mockCancelPausedExecution.mockResolvedValue(true) + mockBeginPausedCancellation.mockResolvedValue(true) + mockCompletePausedCancellation.mockResolvedValue(true) const response = await POST(makeRequest(), makeParams()) @@ -171,12 +191,77 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => { await expect(response.json()).resolves.toEqual({ success: true, executionId: 'ex-1', + redisAvailable: true, + durablyRecorded: true, + locallyAborted: false, + pausedCancelled: true, + reason: 'recorded', + }) + expect(mockMarkExecutionCancelled).not.toHaveBeenCalled() + expect(mockWriteTerminalEvent).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'execution:cancelled', + executionId: 'ex-1', + workflowId: 'wf-1', + }), + 'cancelled' + ) + expect(mockFinalizeExecutionStream).not.toHaveBeenCalled() + }) + + it('publishes paused cancellation event even when Redis cancellation is recorded', async () => { + mockBeginPausedCancellation.mockResolvedValue(true) + mockCompletePausedCancellation.mockResolvedValue(true) + + const response = await POST(makeRequest(), makeParams()) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toMatchObject({ + success: true, + executionId: 'ex-1', + durablyRecorded: true, + pausedCancelled: true, + }) + expect(mockMarkExecutionCancelled).not.toHaveBeenCalled() + expect(mockWriteTerminalEvent).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'execution:cancelled', + executionId: 'ex-1', + workflowId: 'wf-1', + }), + 'cancelled' + ) + expect(mockFinalizeExecutionStream).not.toHaveBeenCalled() + }) + + it('does not confirm paused cancellation when terminal event publication fails', async () => { + mockBeginPausedCancellation.mockResolvedValue(true) + mockCompletePausedCancellation.mockResolvedValue(true) + mockWriteTerminalEvent.mockRejectedValue(new Error('Redis unavailable')) + + const response = await POST(makeRequest(), makeParams()) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual({ + success: false, + executionId: 'ex-1', redisAvailable: false, durablyRecorded: false, locallyAborted: false, - pausedCancelled: true, - reason: 'redis_unavailable', + pausedCancelled: false, + reason: 'paused_event_publish_failed', }) + expect(mockMarkExecutionCancelled).not.toHaveBeenCalled() + expect(mockCompletePausedCancellation).not.toHaveBeenCalled() + expect(mockWriteTerminalEvent).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'execution:cancelled', + executionId: 'ex-1', + workflowId: 'wf-1', + }), + 'cancelled' + ) + expect(mockFinalizeExecutionStream).not.toHaveBeenCalled() }) it('returns 401 when auth fails', async () => { @@ -241,11 +326,7 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => { }) it('does not update execution log status in DB when only paused execution was cancelled', async () => { - mockMarkExecutionCancelled.mockResolvedValue({ - durablyRecorded: false, - reason: 'redis_unavailable', - }) - mockCancelPausedExecution.mockResolvedValue(true) + mockBeginPausedCancellation.mockResolvedValue(true) await POST(makeRequest(), makeParams()) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts index 595aef5e4a5..841b92c36fd 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts @@ -1,6 +1,7 @@ import { db } from '@sim/db' import { workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' +import { sleep } from '@sim/utils/helpers' import { authorizeWorkflowByWorkspacePermission } from '@sim/workflow-authz' import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' @@ -8,13 +9,83 @@ import { cancelWorkflowExecutionContract } from '@/lib/api/contracts/workflows' import { parseRequest } from '@/lib/api/server' import { checkHybridAuth } from '@/lib/auth/hybrid' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' -import { markExecutionCancelled } from '@/lib/execution/cancellation' -import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer' +import { + type ExecutionCancellationRecordResult, + markExecutionCancelled, +} from '@/lib/execution/cancellation' +import { createExecutionEventWriter, readExecutionMetaState } from '@/lib/execution/event-buffer' import { abortManualExecution } from '@/lib/execution/manual-cancellation' import { captureServerEvent } from '@/lib/posthog/server' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' const logger = createLogger('CancelExecutionAPI') +const PAUSED_CANCELLATION_DB_ATTEMPTS = 3 +const PAUSED_CANCELLATION_DB_RETRY_MS = 200 + +async function completePausedCancellationWithRetry(executionId: string): Promise { + for (let attempt = 1; attempt <= PAUSED_CANCELLATION_DB_ATTEMPTS; attempt++) { + try { + const cancelled = await PauseResumeManager.completePausedCancellation(executionId) + if (cancelled) { + logger.info('Paused execution cancelled in database', { executionId, attempt }) + return true + } + logger.warn('Paused execution cancellation could not be completed in database', { + executionId, + attempt, + }) + return false + } catch (error) { + logger.warn('Failed to complete paused execution cancellation in database', { + executionId, + attempt, + error, + }) + if (attempt < PAUSED_CANCELLATION_DB_ATTEMPTS) { + await sleep(PAUSED_CANCELLATION_DB_RETRY_MS) + } + } + } + return false +} + +async function ensurePausedCancellationEventPublished( + executionId: string, + workflowId: string +): Promise { + const metaState = await readExecutionMetaState(executionId) + if (metaState.status === 'found' && metaState.meta.status === 'cancelled') { + return true + } + + const writer = createExecutionEventWriter(executionId) + try { + await writer.writeTerminal( + { + type: 'execution:cancelled', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { duration: 0 }, + }, + 'cancelled' + ) + return true + } catch (error) { + logger.warn('Failed to publish paused execution cancellation event', { + executionId, + error, + }) + return false + } finally { + await writer.close().catch((error) => { + logger.warn('Failed to close paused cancellation event writer', { + executionId, + error, + }) + }) + } +} export const runtime = 'nodejs' export const dynamic = 'force-dynamic' @@ -55,40 +126,102 @@ export const POST = withRouteHandler( logger.info('Cancel execution requested', { workflowId, executionId, userId: auth.userId }) - const cancellation = await markExecutionCancelled(executionId) - const locallyAborted = abortManualExecution(executionId) + let pausedCancellationStarted = false let pausedCancelled = false try { - pausedCancelled = await PauseResumeManager.cancelPausedExecution(executionId) + pausedCancellationStarted = await PauseResumeManager.beginPausedCancellation(executionId) } catch (error) { - logger.warn('Failed to cancel paused execution in database', { executionId, error }) + logger.warn('Failed to begin paused execution cancellation in database', { + executionId, + error, + }) } + const pendingPausedCancellation = pausedCancellationStarted + ? null + : await PauseResumeManager.getPausedCancellationStatus(executionId) + const isPausedCancellationPath = + pausedCancellationStarted || pendingPausedCancellation !== null + + const cancellation: ExecutionCancellationRecordResult = isPausedCancellationPath + ? { durablyRecorded: false, reason: 'redis_unavailable' } + : await markExecutionCancelled(executionId) + const locallyAborted = isPausedCancellationPath ? false : abortManualExecution(executionId) - if (cancellation.durablyRecorded) { + if (pausedCancellationStarted) { + logger.info('Paused execution cancellation reserved in database', { executionId }) + } else if (cancellation.durablyRecorded) { logger.info('Execution marked as cancelled in Redis', { executionId }) } else if (locallyAborted) { logger.info('Execution cancelled via local in-process fallback', { executionId }) - } else if (pausedCancelled) { - logger.info('Paused execution cancelled directly in database', { executionId }) - void setExecutionMeta(executionId, { status: 'cancelled', workflowId }).catch(() => {}) - const writer = createExecutionEventWriter(executionId) - void writer - .write({ - type: 'execution:cancelled', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { duration: 0 }, - }) - .then(() => writer.close()) - .catch(() => {}) - } else { + } else if (!pausedCancellationStarted) { logger.warn('Execution cancellation was not durably recorded', { executionId, reason: cancellation.reason, }) } + if (!isPausedCancellationPath && (cancellation.durablyRecorded || locallyAborted)) { + await PauseResumeManager.blockQueuedResumesForCancellation(executionId).catch((error) => { + logger.warn('Failed to block queued paused resumes after cancellation', { + executionId, + error, + }) + }) + } else if (!isPausedCancellationPath) { + await PauseResumeManager.clearPausedCancellationIntent(executionId).catch((error) => { + logger.warn( + 'Failed to clear paused cancellation intent after unsuccessful cancellation', + { + executionId, + error, + } + ) + }) + } + + let pausedCancellationPublished = false + let pausedCancellationPublishFailed = false + if (pausedCancellationStarted) { + pausedCancellationPublished = await ensurePausedCancellationEventPublished( + executionId, + workflowId + ) + pausedCancellationPublishFailed = !pausedCancellationPublished + if (pausedCancellationPublished) { + pausedCancelled = await completePausedCancellationWithRetry(executionId) + } + } else { + if (pendingPausedCancellation === 'cancelled') { + pausedCancellationPublished = await ensurePausedCancellationEventPublished( + executionId, + workflowId + ) + pausedCancellationPublishFailed = !pausedCancellationPublished + pausedCancelled = pausedCancellationPublished + } else if (pendingPausedCancellation === 'cancelling') { + pausedCancellationPublished = await ensurePausedCancellationEventPublished( + executionId, + workflowId + ) + pausedCancellationPublishFailed = !pausedCancellationPublished + if (pausedCancellationPublished) { + pausedCancelled = await completePausedCancellationWithRetry(executionId) + } + } + } + + if ( + pausedCancellationPublishFailed && + (pausedCancellationStarted || pendingPausedCancellation === 'cancelling') + ) { + await PauseResumeManager.clearPausedCancellationIntent(executionId).catch((error) => { + logger.warn('Failed to clear paused cancellation intent after publish failure', { + executionId, + error, + }) + }) + } + if ((cancellation.durablyRecorded || locallyAborted) && !pausedCancelled) { try { await db @@ -108,7 +241,10 @@ export const POST = withRouteHandler( } } - const success = cancellation.durablyRecorded || locallyAborted || pausedCancelled + const success = + (isPausedCancellationPath + ? pausedCancelled && pausedCancellationPublished + : cancellation.durablyRecorded) || locallyAborted if (success) { const workspaceId = workflowAuthorization.workflow?.workspaceId @@ -120,14 +256,30 @@ export const POST = withRouteHandler( ) } + const durablyRecorded = isPausedCancellationPath + ? pausedCancellationPublished + : pausedCancelled || cancellation.durablyRecorded + const reason = pausedCancellationPublishFailed + ? 'paused_event_publish_failed' + : !pausedCancelled && isPausedCancellationPath + ? 'paused_database_cancel_failed' + : pausedCancelled && !pausedCancellationPublished + ? 'paused_event_publish_failed' + : pausedCancelled || isPausedCancellationPath + ? 'recorded' + : cancellation.reason + return NextResponse.json({ success, executionId, - redisAvailable: cancellation.reason !== 'redis_unavailable', - durablyRecorded: cancellation.durablyRecorded, + redisAvailable: + isPausedCancellationPath || pausedCancelled + ? pausedCancellationPublished + : cancellation.reason !== 'redis_unavailable', + durablyRecorded, locallyAborted, pausedCancelled, - reason: cancellation.reason, + reason, }) } catch (error: any) { logger.error('Failed to cancel execution', { workflowId, executionId, error: error.message }) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.test.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.test.ts new file mode 100644 index 00000000000..5e41a225e9e --- /dev/null +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.test.ts @@ -0,0 +1,266 @@ +/** + * @vitest-environment node + */ +import { createMockRequest } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' +import type { ExecutionEventEntry } from '@/lib/execution/event-buffer' + +const { + mockAuthorizeWorkflowByWorkspacePermission, + mockGetSession, + mockReadExecutionEventsState, + mockReadExecutionMetaState, +} = vi.hoisted(() => ({ + mockAuthorizeWorkflowByWorkspacePermission: vi.fn(), + mockGetSession: vi.fn(), + mockReadExecutionEventsState: vi.fn(), + mockReadExecutionMetaState: vi.fn(), +})) + +vi.mock('@/lib/auth', () => ({ + getSession: mockGetSession, +})) + +vi.mock('@sim/workflow-authz', () => ({ + authorizeWorkflowByWorkspacePermission: mockAuthorizeWorkflowByWorkspacePermission, +})) + +vi.mock('@/lib/execution/event-buffer', () => ({ + readExecutionEventsState: mockReadExecutionEventsState, + readExecutionMetaState: mockReadExecutionMetaState, +})) + +import { GET } from './route' + +function completedEntry(eventId: number): ExecutionEventEntry { + return { + eventId, + executionId: 'exec-1', + event: { + type: 'execution:completed', + timestamp: new Date().toISOString(), + executionId: 'exec-1', + workflowId: 'wf-1', + data: { + success: true, + output: {}, + duration: 10, + startTime: new Date().toISOString(), + endTime: new Date().toISOString(), + finalBlockLogs: [], + }, + }, + } +} + +describe('execution stream reconnect route', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetSession.mockResolvedValue({ user: { id: 'user-1' } }) + mockAuthorizeWorkflowByWorkspacePermission.mockResolvedValue({ allowed: true }) + mockReadExecutionMetaState.mockResolvedValue({ + status: 'found', + meta: { status: 'active', workflowId: 'wf-1' }, + }) + mockReadExecutionEventsState.mockResolvedValue({ status: 'ok', events: [] }) + }) + + it('drains final events after terminal meta before sending DONE', async () => { + mockReadExecutionMetaState + .mockResolvedValueOnce({ + status: 'found', + meta: { status: 'active', workflowId: 'wf-1' }, + }) + .mockResolvedValueOnce({ + status: 'found', + meta: { status: 'complete', workflowId: 'wf-1' }, + }) + mockReadExecutionEventsState + .mockResolvedValueOnce({ status: 'ok', events: [] }) + .mockResolvedValueOnce({ status: 'ok', events: [completedEntry(4)] }) + + const req = createMockRequest( + 'GET', + undefined, + undefined, + 'http://localhost/api/workflows/wf-1/executions/exec-1/stream?from=3' + ) + const response = await GET(req, { + params: Promise.resolve({ id: 'wf-1', executionId: 'exec-1' }), + }) + + expect(response.status).toBe(200) + const body = await response.text() + const completedIndex = body.indexOf('"type":"execution:completed"') + const doneIndex = body.indexOf('data: [DONE]') + + expect(completedIndex).toBeGreaterThanOrEqual(0) + expect(doneIndex).toBeGreaterThan(completedIndex) + expect(mockReadExecutionEventsState).toHaveBeenNthCalledWith(1, 'exec-1', 3) + expect(mockReadExecutionEventsState).toHaveBeenNthCalledWith(2, 'exec-1', 3) + }) + + it('errors when terminal metadata has no terminal event to replay', async () => { + mockReadExecutionMetaState + .mockResolvedValueOnce({ + status: 'found', + meta: { status: 'active', workflowId: 'wf-1' }, + }) + .mockResolvedValueOnce({ + status: 'found', + meta: { status: 'complete', workflowId: 'wf-1' }, + }) + mockReadExecutionEventsState + .mockResolvedValueOnce({ status: 'ok', events: [] }) + .mockResolvedValueOnce({ status: 'ok', events: [] }) + + const req = createMockRequest( + 'GET', + undefined, + undefined, + 'http://localhost/api/workflows/wf-1/executions/exec-1/stream?from=3' + ) + const response = await GET(req, { + params: Promise.resolve({ id: 'wf-1', executionId: 'exec-1' }), + }) + + expect(response.status).toBe(200) + await expect(response.text()).rejects.toThrow( + 'Execution reached terminal metadata without a terminal event' + ) + }) + + it('allows replay event id gaps from reserved but unused writer ids', async () => { + mockReadExecutionEventsState.mockResolvedValueOnce({ + status: 'ok', + events: [completedEntry(101)], + }) + + const req = createMockRequest( + 'GET', + undefined, + undefined, + 'http://localhost/api/workflows/wf-1/executions/exec-1/stream?from=3' + ) + const response = await GET(req, { + params: Promise.resolve({ id: 'wf-1', executionId: 'exec-1' }), + }) + + expect(response.status).toBe(200) + const body = await response.text() + + expect(body).toContain('"eventId":101') + expect(body).toContain('data: [DONE]') + }) + + it('errors when replay events are not strictly increasing', async () => { + mockReadExecutionEventsState.mockResolvedValueOnce({ + status: 'ok', + events: [completedEntry(3)], + }) + + const req = createMockRequest( + 'GET', + undefined, + undefined, + 'http://localhost/api/workflows/wf-1/executions/exec-1/stream?from=3' + ) + const response = await GET(req, { + params: Promise.resolve({ id: 'wf-1', executionId: 'exec-1' }), + }) + + expect(response.status).toBe(200) + await expect(response.text()).rejects.toThrow( + 'Execution event replay order violation: previous 3, received 3' + ) + }) + + it('returns unavailable when metadata cannot be read', async () => { + mockReadExecutionMetaState.mockResolvedValueOnce({ + status: 'unavailable', + error: 'redis unavailable', + }) + + const req = createMockRequest( + 'GET', + undefined, + undefined, + 'http://localhost/api/workflows/wf-1/executions/exec-1/stream?from=3' + ) + const response = await GET(req, { + params: Promise.resolve({ id: 'wf-1', executionId: 'exec-1' }), + }) + + expect(response.status).toBe(503) + await expect(response.json()).resolves.toEqual({ + error: 'Run buffer temporarily unavailable', + }) + }) + + it('stops after replaying a terminal event even when metadata is still active', async () => { + mockReadExecutionEventsState.mockResolvedValueOnce({ + status: 'ok', + events: [completedEntry(4)], + }) + + const req = createMockRequest( + 'GET', + undefined, + undefined, + 'http://localhost/api/workflows/wf-1/executions/exec-1/stream?from=3' + ) + const response = await GET(req, { + params: Promise.resolve({ id: 'wf-1', executionId: 'exec-1' }), + }) + + expect(response.status).toBe(200) + const body = await response.text() + + expect(body).toContain('"type":"execution:completed"') + expect(body).toContain('data: [DONE]') + expect(mockReadExecutionEventsState).toHaveBeenCalledTimes(1) + expect(mockReadExecutionMetaState).toHaveBeenCalledTimes(1) + }) + + it('errors the stream when replay events cannot be read', async () => { + mockReadExecutionEventsState.mockResolvedValueOnce({ + status: 'unavailable', + error: 'redis read failed', + }) + + const req = createMockRequest( + 'GET', + undefined, + undefined, + 'http://localhost/api/workflows/wf-1/executions/exec-1/stream?from=3' + ) + const response = await GET(req, { + params: Promise.resolve({ id: 'wf-1', executionId: 'exec-1' }), + }) + + expect(response.status).toBe(200) + await expect(response.text()).rejects.toThrow('Execution events unavailable: redis read failed') + }) + + it('errors the stream when requested events were pruned', async () => { + mockReadExecutionEventsState.mockResolvedValueOnce({ + status: 'pruned', + earliestEventId: 10, + }) + + const req = createMockRequest( + 'GET', + undefined, + undefined, + 'http://localhost/api/workflows/wf-1/executions/exec-1/stream?from=3' + ) + const response = await GET(req, { + params: Promise.resolve({ id: 'wf-1', executionId: 'exec-1' }), + }) + + expect(response.status).toBe(200) + await expect(response.text()).rejects.toThrow( + 'Execution events pruned before requested event id' + ) + }) +}) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts index 4775306d0c4..6915a8dcbc1 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts @@ -9,10 +9,12 @@ import { getSession } from '@/lib/auth' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { + type ExecutionEventEntry, type ExecutionStreamStatus, - getExecutionMeta, - readExecutionEvents, + readExecutionEventsState, + readExecutionMetaState, } from '@/lib/execution/event-buffer' +import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events' import { formatSSEEvent } from '@/lib/workflows/executor/execution-events' const logger = createLogger('ExecutionStreamReconnectAPI') @@ -24,6 +26,15 @@ function isTerminalStatus(status: ExecutionStreamStatus): boolean { return status === 'complete' || status === 'error' || status === 'cancelled' } +function isTerminalEvent(event: ExecutionEvent): boolean { + return ( + event.type === 'execution:completed' || + event.type === 'execution:error' || + event.type === 'execution:cancelled' || + event.type === 'execution:paused' + ) +} + export const runtime = 'nodejs' export const dynamic = 'force-dynamic' @@ -52,10 +63,14 @@ export const GET = withRouteHandler( ) } - const meta = await getExecutionMeta(executionId) - if (!meta) { + const metaResult = await readExecutionMetaState(executionId) + if (metaResult.status === 'unavailable') { + return NextResponse.json({ error: 'Run buffer temporarily unavailable' }, { status: 503 }) + } + if (metaResult.status === 'missing') { return NextResponse.json({ error: 'Run buffer not found or expired' }, { status: 404 }) } + const { meta } = metaResult if (meta.workflowId && meta.workflowId !== workflowId) { return NextResponse.json({ error: 'Run does not belong to this workflow' }, { status: 403 }) @@ -86,19 +101,68 @@ export const GET = withRouteHandler( } } - try { - const events = await readExecutionEvents(executionId, lastEventId) + const readEventsOrThrow = async ( + afterEventId: number + ): Promise => { + const result = await readExecutionEventsState(executionId, afterEventId) + if (result.status === 'unavailable') { + throw new Error(`Execution events unavailable: ${result.error}`) + } + if (result.status === 'pruned') { + throw new Error( + `Execution events pruned before requested event id: earliest retained event is ${result.earliestEventId}` + ) + } + let previousEventId = afterEventId + for (const entry of result.events) { + if (entry.eventId <= previousEventId) { + throw new Error( + `Execution event replay order violation: previous ${previousEventId}, received ${entry.eventId}` + ) + } + previousEventId = entry.eventId + } + return result.events + } + + const enqueueEvents = (events: ExecutionEventEntry[]) => { + let sawTerminalEvent = false for (const entry of events) { - if (closed) return + if (closed) break entry.event.eventId = entry.eventId enqueue(formatSSEEvent(entry.event)) lastEventId = entry.eventId + sawTerminalEvent ||= isTerminalEvent(entry.event) + } + return sawTerminalEvent + } + + const closeWithDone = () => { + enqueue('data: [DONE]\n\n') + if (!closed) controller.close() + } + + const closeAfterTerminalEvent = (events: ExecutionEventEntry[]) => { + if (!enqueueEvents(events)) { + throw new Error('Execution reached terminal metadata without a terminal event') + } + closeWithDone() + } + + try { + const events = await readEventsOrThrow(lastEventId) + if (enqueueEvents(events)) { + closeWithDone() + return } - const currentMeta = await getExecutionMeta(executionId) - if (!currentMeta || isTerminalStatus(currentMeta.status)) { - enqueue('data: [DONE]\n\n') - if (!closed) controller.close() + const currentMeta = await readExecutionMetaState(executionId) + if (currentMeta.status === 'unavailable') { + throw new Error(`Execution metadata unavailable: ${currentMeta.error}`) + } + if (currentMeta.status === 'missing' || isTerminalStatus(currentMeta.meta.status)) { + const finalEvents = await readEventsOrThrow(lastEventId) + closeAfterTerminalEvent(finalEvents) return } @@ -106,33 +170,26 @@ export const GET = withRouteHandler( await sleep(POLL_INTERVAL_MS) if (closed) return - const newEvents = await readExecutionEvents(executionId, lastEventId) - for (const entry of newEvents) { - if (closed) return - entry.event.eventId = entry.eventId - enqueue(formatSSEEvent(entry.event)) - lastEventId = entry.eventId + const newEvents = await readEventsOrThrow(lastEventId) + if (enqueueEvents(newEvents)) { + closeWithDone() + return } - const polledMeta = await getExecutionMeta(executionId) - if (!polledMeta || isTerminalStatus(polledMeta.status)) { - const finalEvents = await readExecutionEvents(executionId, lastEventId) - for (const entry of finalEvents) { - if (closed) return - entry.event.eventId = entry.eventId - enqueue(formatSSEEvent(entry.event)) - lastEventId = entry.eventId - } - enqueue('data: [DONE]\n\n') - if (!closed) controller.close() + const polledMeta = await readExecutionMetaState(executionId) + if (polledMeta.status === 'unavailable') { + throw new Error(`Execution metadata unavailable: ${polledMeta.error}`) + } + if (polledMeta.status === 'missing' || isTerminalStatus(polledMeta.meta.status)) { + const finalEvents = await readEventsOrThrow(lastEventId) + closeAfterTerminalEvent(finalEvents) return } } if (!closed) { logger.warn('Reconnection stream poll deadline reached', { executionId }) - enqueue('data: [DONE]\n\n') - controller.close() + throw new Error('Execution stream ended before a terminal event was available') } } catch (error) { logger.error('Error in reconnection stream', { @@ -141,7 +198,7 @@ export const GET = withRouteHandler( }) if (!closed) { try { - controller.close() + controller.error(error) } catch {} } } diff --git a/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx b/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx index 73f5826d564..70d67620f95 100644 --- a/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx +++ b/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx @@ -88,6 +88,8 @@ interface PausePointWithQueue { latestResumeEntry?: ResumeQueueEntrySummary | null parallelScope?: any loopScope?: any + pauseKind?: 'human' | 'time' + resumeAt?: string } interface PausedExecutionSummary { diff --git a/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/preview-panel.tsx b/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/preview-panel.tsx index be5e581418f..36966864095 100644 --- a/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/preview-panel.tsx +++ b/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/preview-panel.tsx @@ -1,6 +1,17 @@ 'use client' -import { createContext, memo, useContext, useEffect, useMemo, useRef, useState } from 'react' +import { + Children, + cloneElement, + createContext, + isValidElement, + memo, + useContext, + useEffect, + useMemo, + useRef, + useState, +} from 'react' import matter from 'gray-matter' import { useRouter } from 'next/navigation' import rehypeSlug from 'rehype-slug' @@ -10,7 +21,7 @@ import { Streamdown } from 'streamdown' import 'streamdown/styles.css' import { toError } from '@sim/utils/errors' import { generateShortId } from '@sim/utils/id' -import { Checkbox, CopyCodeButton, highlight, languages, Skeleton } from '@/components/emcn' +import { Checkbox, highlight, languages, Skeleton } from '@/components/emcn' import '@/components/emcn/components/code/code.css' import 'prismjs/components/prism-bash' import 'prismjs/components/prism-css' @@ -473,7 +484,15 @@ function resolveSimFileUrl(src: string | undefined): string | undefined { } const STATIC_MARKDOWN_COMPONENTS = { - pre: ({ children }: { children?: React.ReactNode }) => <>{children}, + pre: ({ children }: { children?: React.ReactNode }) => ( + <> + {Children.map(children, (child) => + isValidElement>(child) + ? cloneElement(child, { 'data-block': 'true' }) + : child + ) ?? children} + + ), 'mermaid-diagram': ({ definition }: { definition?: string }) => { const isStreaming = useContext(MermaidStreamingCtx) return @@ -531,20 +550,11 @@ const STATIC_MARKDOWN_COMPONENTS = { {children} ), - inlineCode: ({ children }: { children?: React.ReactNode }) => { - if (typeof children === 'string' && children.includes('\n')) { - return ( - - {children} - - ) - } - return ( - - {children} - - ) - }, + inlineCode: ({ children }: { children?: React.ReactNode }) => ( + + {children} + + ), code: ({ children, className }: { children?: React.ReactNode; className?: string }) => { const langMatch = className?.match(/language-(\w+)/) const langRaw = langMatch?.[1] ?? '' @@ -564,22 +574,18 @@ const STATIC_MARKDOWN_COMPONENTS = { return (
-
+
{langRaw || 'code'} -
{html ? (
           ) : (
-            
-              {codeString.trimEnd()}
+            
+              {codeString.trimEnd()}
             
)}
@@ -634,7 +640,7 @@ const STATIC_MARKDOWN_COMPONENTS = { {children} ), th: ({ children }: { children?: React.ReactNode }) => ( - + {children} ), @@ -684,20 +690,36 @@ function LiRenderer({ const isTaskItem = typeof className === 'string' && className.includes('task-list-item') if (isTaskItem) { + const [checkboxChild, ...contentChildren] = Children.toArray(children) + const content = {contentChildren} + if (ctx) { const offset = node?.position?.start?.offset if (offset === undefined) { - return
  • {children}
  • + return ( +
  • + {checkboxChild} + {content} +
  • + ) } const before = ctx.contentRef.current.slice(0, offset) const prior = before.match(/^(\s*(?:[-*+]|\d+[.)]) +)\[([ xX])\]/gm) return ( -
  • {children}
  • +
  • + {checkboxChild} + {content} +
  • ) } - return
  • {children}
  • + return ( +
  • + {checkboxChild} + {content} +
  • + ) } return
  • {children}
  • diff --git a/apps/sim/app/workspace/[workspaceId]/files/files.tsx b/apps/sim/app/workspace/[workspaceId]/files/files.tsx index e259a7dad2a..45e7fb9dd37 100644 --- a/apps/sim/app/workspace/[workspaceId]/files/files.tsx +++ b/apps/sim/app/workspace/[workspaceId]/files/files.tsx @@ -622,7 +622,7 @@ export function Files() { const mimeType = getMimeTypeFromExtension('md') const blob = new Blob([''], { type: mimeType }) const file = new File([blob], name, { type: mimeType }) - const result = await uploadFile.mutateAsync({ workspaceId, file }) + const result = await uploadFile.mutateAsync({ workspaceId, file, skipToast: true }) const fileId = result.file?.id if (fileId) { justCreatedFileIdRef.current = fileId diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/message-content/components/agent-group/agent-group.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/message-content/components/agent-group/agent-group.tsx index b7a073dab5e..516e4a39964 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/message-content/components/agent-group/agent-group.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/message-content/components/agent-group/agent-group.tsx @@ -42,6 +42,7 @@ export function AgentGroup({ }: AgentGroupProps) { const AgentIcon = getAgentIcon(agentName) const hasItems = items.length > 0 + const isSubagent = agentName !== 'mothership' const toolItems = items.filter( (item): item is Extract => item.type === 'tool' ) @@ -112,7 +113,7 @@ export function AgentGroup({ -
    +
    {items.map((item, idx) => { if (item.type === 'tool') { return ( @@ -128,7 +129,7 @@ export function AgentGroup({ return ( {item.content.trim()} diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-content/resource-content.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-content/resource-content.tsx index e793991957c..1adfb0f3445 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-content/resource-content.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-content/resource-content.tsx @@ -32,6 +32,7 @@ import { RESOURCE_TAB_ICON_BUTTON_CLASS, RESOURCE_TAB_ICON_CLASS, } from '@/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-tabs/resource-tab-controls' +import { hasRenderableFilePreviewContent } from '@/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions' import type { GenericResourceData, MothershipResource, @@ -116,13 +117,19 @@ export const ResourceContent = memo(function ResourceContent({ const disableStreamingAutoScroll = previewSession?.operation === 'patch' const rawPreviewText = previewSession?.previewText const streamingPreviewText = - typeof rawPreviewText === 'string' && rawPreviewText.length > 0 ? rawPreviewText : undefined + previewSession && + typeof rawPreviewText === 'string' && + hasRenderableFilePreviewContent(previewSession) + ? rawPreviewText + : undefined const pendingOrStreamingFilePreviewText = - previewSession?.fileId === resource.id && typeof rawPreviewText === 'string' + previewSession?.fileId === resource.id && + typeof rawPreviewText === 'string' && + hasRenderableFilePreviewContent(previewSession) ? rawPreviewText : undefined - if (previewSession && resource.id === 'streaming-file') { + if (resource.id === 'streaming-file') { return (
    {streamingPreviewText !== undefined ? ( diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/mothership-view.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/mothership-view.tsx index 4eb7227c850..be06ee8481a 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/mothership-view.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/mothership-view.tsx @@ -6,6 +6,7 @@ import { cn } from '@/lib/core/utils/cn' import { getFileExtension } from '@/lib/uploads/utils/file-utils' import type { PreviewMode } from '@/app/workspace/[workspaceId]/files/components/file-viewer' import { RICH_PREVIEWABLE_EXTENSIONS } from '@/app/workspace/[workspaceId]/files/components/file-viewer' +import { hasRenderableFilePreviewContent } from '@/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions' import type { GenericResourceData, MothershipResource, @@ -23,7 +24,7 @@ const PREVIEW_CYCLE: Record = { /** * Whether the active resource should show the in-progress file stream. * The synthetic `streaming-file` tab always shows it; a real file tab only shows it - * when the streamed fileId matches that exact resource. + * after a preview content event has arrived for that exact resource. */ function shouldShowStreamingFilePanel( previewSession: FilePreviewSession | null | undefined, @@ -32,7 +33,9 @@ function shouldShowStreamingFilePanel( if (!previewSession || previewSession.status === 'complete' || !active) return false if (active.id === 'streaming-file') return true if (active.type !== 'file') return false - if (active.id && previewSession.fileId === active.id) return true + if (active.id && previewSession.fileId === active.id) { + return hasRenderableFilePreviewContent(previewSession) + } return false } diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index f320d44d46a..8e6d5bc49d2 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -104,8 +104,10 @@ import { invalidateResourceQueries } from '@/app/workspace/[workspaceId]/home/co import { buildCompletedPreviewSessions, type FilePreviewSessionsState, + hasRenderableFilePreviewContent, INITIAL_FILE_PREVIEW_SESSIONS_STATE, reduceFilePreviewSessions, + shouldReplaceSession, useFilePreviewSessions, } from '@/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions' import { deploymentKeys } from '@/hooks/queries/deployments' @@ -1385,6 +1387,52 @@ export function useChat( const activeResourceIdRef = useRef(effectiveActiveResourceId) activeResourceIdRef.current = effectiveActiveResourceId + const previewActivationOwnerRef = useRef>(new Map()) + const completedPreviewResourceHandoffRef = useRef< + Map + >(new Map()) + + const rememberPreviewActivationOwner = useCallback((session: FilePreviewSession) => { + if (!session.fileId || previewActivationOwnerRef.current.has(session.id)) { + return + } + previewActivationOwnerRef.current.set(session.id, activeResourceIdRef.current) + }, []) + + const shouldAutoActivatePreviewSession = useCallback((session: FilePreviewSession) => { + if (!session.fileId) { + return false + } + const currentActiveResourceId = activeResourceIdRef.current + const activationOwnerId = previewActivationOwnerRef.current.get(session.id) + return ( + currentActiveResourceId === null || + currentActiveResourceId === session.fileId || + currentActiveResourceId === 'streaming-file' || + currentActiveResourceId === activationOwnerId + ) + }, []) + + const seedCompletedPreviewContentCache = useCallback( + (fileId: string, previewText: string) => { + queryClient.setQueriesData( + { queryKey: workspaceFilesKeys.content(workspaceId, fileId, 'text') }, + previewText + ) + + const activeFiles = queryClient.getQueryData>( + workspaceFilesKeys.list(workspaceId, 'active') + ) + const fileKey = activeFiles?.find((file) => file.id === fileId)?.key + if (fileKey) { + queryClient.setQueryData( + [...workspaceFilesKeys.content(workspaceId, fileId, 'text'), fileKey], + previewText + ) + } + }, + [queryClient, workspaceId] + ) const upsertTaskChatHistory = useCallback( (chatId: string, updater: (current: TaskChatHistory) => TaskChatHistory) => { @@ -1541,6 +1589,8 @@ export function useChat( const resetEphemeralPreviewState = useCallback( (options?: { removeStreamingResource?: boolean }) => { + previewActivationOwnerRef.current.clear() + completedPreviewResourceHandoffRef.current.clear() syncPreviewSessionRefs(INITIAL_FILE_PREVIEW_SESSIONS_STATE) resetPreviewSessions() if (options?.removeStreamingResource) { @@ -1550,35 +1600,40 @@ export function useChat( [resetPreviewSessions, syncPreviewSessionRefs] ) - const syncPreviewResourceChrome = useCallback((session: FilePreviewSession) => { - if (session.targetKind === 'new_file') { - setResources((current) => { - const existing = current.find((resource) => resource.id === 'streaming-file') - if (existing) { - return current.map((resource) => - resource.id === 'streaming-file' - ? { ...resource, title: session.fileName || 'Writing file...' } - : resource - ) - } - return [ - ...current, - { - type: 'file', - id: 'streaming-file', - title: session.fileName || 'Writing file...', - }, - ] - }) - setActiveResourceId('streaming-file') - return - } + const syncPreviewResourceChrome = useCallback( + (session: FilePreviewSession, options?: { activate?: boolean }) => { + if (session.targetKind === 'new_file') { + setResources((current) => { + const existing = current.find((resource) => resource.id === 'streaming-file') + if (existing) { + return current.map((resource) => + resource.id === 'streaming-file' + ? { ...resource, title: session.fileName || 'Writing file...' } + : resource + ) + } + return [ + ...current, + { + type: 'file', + id: 'streaming-file', + title: session.fileName || 'Writing file...', + }, + ] + }) + setActiveResourceId('streaming-file') + return + } - if (session.fileId) { - setResources((current) => current.filter((resource) => resource.id !== 'streaming-file')) - setActiveResourceId(session.fileId) - } - }, []) + if (session.fileId && hasRenderableFilePreviewContent(session)) { + setResources((current) => current.filter((resource) => resource.id !== 'streaming-file')) + if (options?.activate !== false) { + setActiveResourceId(session.fileId) + } + } + }, + [] + ) const seedPreviewSessions = useCallback( (sessions: FilePreviewSession[]) => { @@ -1597,10 +1652,17 @@ export function useChat( ? (nextState.sessions[nextState.activeSessionId] ?? null) : null if (active) { - syncPreviewResourceChrome(active) + syncPreviewResourceChrome(active, { + activate: active.targetKind === 'new_file' || shouldAutoActivatePreviewSession(active), + }) } }, - [hydratePreviewSessions, syncPreviewResourceChrome, syncPreviewSessionRefs] + [ + hydratePreviewSessions, + shouldAutoActivatePreviewSession, + syncPreviewResourceChrome, + syncPreviewSessionRefs, + ] ) const abortControllerRef = useRef(null) @@ -1965,7 +2027,8 @@ export function useChat( void recoverPendingClientWorkflowTools(mappedMessages) - if (chatHistory.resources.some((r) => r.id === 'streaming-file')) { + const hasPersistedStreamingFile = chatHistory.resources.some((r) => r.id === 'streaming-file') + if (hasPersistedStreamingFile) { // boundary-raw-fetch: fire-and-forget cleanup during chat-history hydration; failures are silently swallowed to keep hydration non-blocking fetch('/api/mothership/chat/resources', { method: 'DELETE', @@ -1980,18 +2043,21 @@ export function useChat( const persistedResources = chatHistory.resources.filter((r) => r.id !== 'streaming-file') if (persistedResources.length > 0) { - setResources(persistedResources) - setActiveResourceId((prev) => - prev && persistedResources.some((r) => r.id === prev) - ? prev + const hydratedActiveResourceId = + activeResourceIdRef.current && + persistedResources.some((resource) => resource.id === activeResourceIdRef.current) + ? activeResourceIdRef.current : persistedResources[persistedResources.length - 1].id - ) + activeResourceIdRef.current = hydratedActiveResourceId + setResources(persistedResources) + setActiveResourceId(hydratedActiveResourceId) for (const resource of persistedResources) { if (resource.type !== 'workflow') continue ensureWorkflowInRegistry(resource.id, resource.title, workspaceId) } - } else if (chatHistory.resources.some((r) => r.id === 'streaming-file')) { + } else if (hasPersistedStreamingFile) { + activeResourceIdRef.current = null setResources([]) setActiveResourceId(null) } @@ -2558,9 +2624,6 @@ export function useChat( status: 'pending', updatedAt: new Date().toISOString(), } - if (nextSession.fileId) { - setActiveResourceId(nextSession.fileId) - } applyPreviewSessionUpdate(nextSession) break } @@ -2570,13 +2633,18 @@ export function useChat( ...baseSession, updatedAt: new Date().toISOString(), } + rememberPreviewActivationOwner(nextSession) const nextState = applyPreviewSessionUpdate(nextSession) const activePreview = nextState.activeSessionId !== null ? (nextState.sessions[nextState.activeSessionId] ?? null) : null if (activePreview?.id === nextSession.id) { - syncPreviewResourceChrome(activePreview) + syncPreviewResourceChrome(activePreview, { + activate: + activePreview.targetKind === 'new_file' || + shouldAutoActivatePreviewSession(activePreview), + }) } break } @@ -2604,6 +2672,13 @@ export function useChat( updatedAt: new Date().toISOString(), } applyPreviewSessionUpdate(nextSession) + if (!prevSession || !hasRenderableFilePreviewContent(prevSession)) { + syncPreviewResourceChrome(nextSession, { + activate: + nextSession.targetKind === 'new_file' || + shouldAutoActivatePreviewSession(nextSession), + }) + } const previewToolIdx = toolMap.get(id) if (previewToolIdx !== undefined && blocks[previewToolIdx].toolCall) { blocks[previewToolIdx].toolCall!.status = 'executing' @@ -2613,7 +2688,10 @@ export function useChat( if (payload.previewPhase === 'file_preview_complete') { const resultData = asPayloadRecord(payload.output) + const outputData = asPayloadRecord(resultData?.data) ?? resultData const completedAt = new Date().toISOString() + const wasRenderableBeforeComplete = + prevSession !== undefined && hasRenderableFilePreviewContent(prevSession) const nextSession: FilePreviewSession = { ...baseSession, status: 'complete', @@ -2623,8 +2701,8 @@ export function useChat( } const nextState = applyCompletedPreviewSession(nextSession) - if (fileId && resultData?.id) { - const fileName = (resultData.name as string) ?? nextSession.fileName ?? 'File' + if (fileId && resultData?.success === true && outputData?.id === fileId) { + const fileName = (outputData.name as string) ?? nextSession.fileName ?? 'File' const fileResource = { type: 'file' as const, id: fileId, title: fileName } setResources((rs) => { const without = rs.filter((r) => r.id !== 'streaming-file') @@ -2633,12 +2711,19 @@ export function useChat( } return [...without, fileResource] }) - setActiveResourceId(fileId) - if (nextSession.previewText) { - queryClient.setQueryData( - workspaceFilesKeys.content(workspaceId, fileId, 'text'), - nextSession.previewText - ) + const shouldActivateOnComplete = + !wasRenderableBeforeComplete && + hasRenderableFilePreviewContent(nextSession) && + shouldAutoActivatePreviewSession(nextSession) + if (shouldActivateOnComplete) { + setActiveResourceId(fileId) + } + completedPreviewResourceHandoffRef.current.set(fileId, { + sessionId: nextSession.id, + suppressActivation: !shouldActivateOnComplete, + }) + if (hasRenderableFilePreviewContent(nextSession)) { + seedCompletedPreviewContentCache(fileId, nextSession.previewText) } invalidateResourceQueries(queryClient, workspaceId, 'file', fileId) } else { @@ -2647,7 +2732,11 @@ export function useChat( ? (nextState.sessions[nextState.activeSessionId] ?? null) : null if (activePreview) { - syncPreviewResourceChrome(activePreview) + syncPreviewResourceChrome(activePreview, { + activate: + activePreview.targetKind === 'new_file' || + shouldAutoActivatePreviewSession(activePreview), + }) } } break @@ -2893,7 +2982,59 @@ export function useChat( id: resource.id, title: typeof resource.title === 'string' ? resource.title : resource.id, } - const wasAdded = addResource(nextResource) + const completedPreviewHandoff = + nextResource.type === 'file' + ? completedPreviewResourceHandoffRef.current.get(nextResource.id) + : undefined + const matchingPreviewSessions = + nextResource.type === 'file' + ? Object.values(previewSessionsRef.current).filter( + (session) => session.fileId === nextResource.id + ) + : [] + const latestPreviewForResource = ( + sessions: FilePreviewSession[] + ): FilePreviewSession | undefined => + sessions.reduce( + (latest, session) => (shouldReplaceSession(latest, session) ? session : latest), + undefined + ) + const latestActivePreviewForResource = latestPreviewForResource( + matchingPreviewSessions.filter((session) => session.status !== 'complete') + ) + const previewForResource = + latestActivePreviewForResource ?? latestPreviewForResource(matchingPreviewSessions) + const isCompletedPreviewHandoffCurrent = + completedPreviewHandoff !== undefined && + (!latestActivePreviewForResource || + latestActivePreviewForResource.id === completedPreviewHandoff.sessionId) + if (completedPreviewHandoff && !isCompletedPreviewHandoffCurrent) { + completedPreviewResourceHandoffRef.current.delete(nextResource.id) + previewActivationOwnerRef.current.delete(completedPreviewHandoff.sessionId) + } + const shouldSuppressFileResourceActivation = + (isCompletedPreviewHandoffCurrent && + completedPreviewHandoff?.suppressActivation === true) || + (previewForResource !== undefined && + previewForResource.status !== 'complete' && + (!hasRenderableFilePreviewContent(previewForResource) || + !shouldAutoActivatePreviewSession(previewForResource))) + const wasAdded = shouldSuppressFileResourceActivation + ? !resourcesRef.current.some( + (r) => r.type === nextResource.type && r.id === nextResource.id + ) + : addResource(nextResource) + if (shouldSuppressFileResourceActivation && wasAdded) { + setResources((current) => + current.some((r) => r.type === nextResource.type && r.id === nextResource.id) + ? current + : [...current, nextResource] + ) + } + if (completedPreviewHandoff && isCompletedPreviewHandoffCurrent) { + completedPreviewResourceHandoffRef.current.delete(nextResource.id) + previewActivationOwnerRef.current.delete(completedPreviewHandoff.sessionId) + } invalidateResourceQueries( queryClient, workspaceId, @@ -2901,7 +3042,11 @@ export function useChat( nextResource.id ) - if (!wasAdded && activeResourceIdRef.current !== nextResource.id) { + if ( + !shouldSuppressFileResourceActivation && + !wasAdded && + activeResourceIdRef.current !== nextResource.id + ) { setActiveResourceId(nextResource.id) } onResourceEventRef.current?.() @@ -4596,7 +4741,7 @@ export function useChat( }).catch(() => {}) } - consoleStore.cancelRunningEntries(workflowId) + consoleStore.cancelRunningEntries(workflowId, executionId ?? undefined) const now = new Date() consoleStore.addConsole({ input: {}, diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.test.tsx b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.test.tsx index 863df51596d..8a4887b846a 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.test.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.test.tsx @@ -5,8 +5,10 @@ import { describe, expect, it } from 'vitest' import type { FilePreviewSession } from '@/lib/copilot/request/session' import { buildCompletedPreviewSessions, + hasRenderableFilePreviewContent, INITIAL_FILE_PREVIEW_SESSIONS_STATE, reduceFilePreviewSessions, + shouldReplaceSession, } from '@/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions' function createSession( @@ -31,6 +33,56 @@ function createSession( } describe('reduceFilePreviewSessions', () => { + it('does not treat a pending empty preview as renderable content', () => { + expect( + hasRenderableFilePreviewContent( + createSession({ + id: 'preview-1', + toolCallId: 'preview-1', + status: 'pending', + previewText: '', + previewVersion: 0, + }) + ) + ).toBe(false) + }) + + it('treats emitted preview snapshots as renderable even when empty', () => { + expect( + hasRenderableFilePreviewContent( + createSession({ + id: 'preview-1', + toolCallId: 'preview-1', + status: 'streaming', + previewText: '', + previewVersion: 1, + }) + ) + ).toBe(true) + }) + + it('does not replace a completed session with same-version replayed streaming events', () => { + const completed = createSession({ + id: 'preview-1', + toolCallId: 'preview-1', + status: 'complete', + previewText: 'final', + previewVersion: 2, + updatedAt: '2026-04-10T00:00:02.000Z', + completedAt: '2026-04-10T00:00:02.000Z', + }) + const replayedStreaming = createSession({ + id: 'preview-1', + toolCallId: 'preview-1', + status: 'streaming', + previewText: 'final', + previewVersion: 2, + updatedAt: '2026-04-10T00:00:03.000Z', + }) + + expect(shouldReplaceSession(completed, replayedStreaming)).toBe(false) + }) + it('builds complete sessions for terminal stream reconciliation', () => { const completedAt = '2026-04-10T00:00:10.000Z' const nextSessions = buildCompletedPreviewSessions( diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.ts index 6782585bbbb..28ca3ad4416 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.ts @@ -18,11 +18,22 @@ export const INITIAL_FILE_PREVIEW_SESSIONS_STATE: FilePreviewSessionsState = { sessions: {}, } +export function hasRenderableFilePreviewContent(session: FilePreviewSession): boolean { + return session.previewText.length > 0 || session.previewVersion > 0 +} + export function shouldReplaceSession( current: FilePreviewSession | undefined, next: FilePreviewSession ): boolean { if (!current) return true + if ( + current.status === 'complete' && + next.status !== 'complete' && + next.previewVersion <= current.previewVersion + ) { + return false + } if (next.previewVersion !== current.previewVersion) { return next.previewVersion > current.previewVersion } diff --git a/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/components/add-connector-modal/add-connector-modal.tsx b/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/components/add-connector-modal/add-connector-modal.tsx index bb46a1fd7ad..849b4820a9d 100644 --- a/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/components/add-connector-modal/add-connector-modal.tsx +++ b/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/components/add-connector-modal/add-connector-modal.tsx @@ -157,10 +157,13 @@ export function AddConnectorModal({ for (const [key, value] of Object.entries(resolveSourceConfig())) { if (value) resolvedConfig[key] = value } - const finalSourceConfig = - disabledTagIds.size > 0 - ? { ...resolvedConfig, disabledTagIds: Array.from(disabledTagIds) } - : resolvedConfig + if (disabledTagIds.size > 0) { + resolvedConfig.disabledTagIds = Array.from(disabledTagIds) + } + if (Object.keys(canonicalModes).length > 0) { + resolvedConfig._canonicalModes = canonicalModes + } + const finalSourceConfig = resolvedConfig createConnector( { diff --git a/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/components/edit-connector-modal/edit-connector-modal.tsx b/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/components/edit-connector-modal/edit-connector-modal.tsx index e54846e6fbc..3b6c85d806d 100644 --- a/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/components/edit-connector-modal/edit-connector-modal.tsx +++ b/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/components/edit-connector-modal/edit-connector-modal.tsx @@ -43,8 +43,33 @@ import type { SelectorKey } from '@/hooks/selectors/types' const logger = createLogger('EditConnectorModal') -/** Keys injected by the sync engine — not user-editable */ -const INTERNAL_CONFIG_KEYS = new Set(['tagSlotMapping', 'disabledTagIds']) +/** Keys injected by the sync engine or modal state — not user-editable */ +const INTERNAL_CONFIG_KEYS = new Set(['tagSlotMapping', 'disabledTagIds', '_canonicalModes']) + +const CANONICAL_MODES_KEY = '_canonicalModes' + +function readPersistedCanonicalModes( + sourceConfig: Record +): Record { + const raw = sourceConfig[CANONICAL_MODES_KEY] + if (!raw || typeof raw !== 'object') return {} + const result: Record = {} + for (const [key, value] of Object.entries(raw as Record)) { + if (value === 'basic' || value === 'advanced') result[key] = value + } + return result +} + +function didCanonicalModesChange( + current: Record, + persisted: Record +): boolean { + const keys = new Set([...Object.keys(persisted), ...Object.keys(current)]) + for (const key of keys) { + if ((current[key] ?? 'basic') !== (persisted[key] ?? 'basic')) return true + } + return false +} interface EditConnectorModalProps { open: boolean @@ -87,6 +112,10 @@ export function EditConnectorModal({ return config }) + const [initialCanonicalModes] = useState>(() => + readPersistedCanonicalModes(connector.sourceConfig) + ) + const { sourceConfig, canonicalModes, @@ -95,7 +124,11 @@ export function EditConnectorModal({ handleFieldChange, toggleCanonicalMode, resolveSourceConfig, - } = useConnectorConfigFields({ connectorConfig, initialSourceConfig }) + } = useConnectorConfigFields({ + connectorConfig, + initialSourceConfig, + initialCanonicalModes, + }) const { mutate: updateConnector, isPending: isSaving } = useUpdateConnector() @@ -103,14 +136,27 @@ export function EditConnectorModal({ const subscriptionAccess = getSubscriptionAccessState(subscriptionResponse?.data) const hasMaxAccess = !isBillingEnabled || subscriptionAccess.hasUsableMaxAccess + const persistedCanonicalModes = useMemo( + () => readPersistedCanonicalModes(connector.sourceConfig), + [connector.sourceConfig] + ) + const hasChanges = useMemo(() => { if (syncInterval !== connector.syncIntervalMinutes) return true + if (didCanonicalModesChange(canonicalModes, persistedCanonicalModes)) return true const resolved = resolveSourceConfig() for (const [key, value] of Object.entries(resolved)) { if (String(connector.sourceConfig[key] ?? '') !== value) return true } return false - }, [resolveSourceConfig, syncInterval, connector.syncIntervalMinutes, connector.sourceConfig]) + }, [ + resolveSourceConfig, + syncInterval, + connector.syncIntervalMinutes, + connector.sourceConfig, + canonicalModes, + persistedCanonicalModes, + ]) const handleSave = () => { setError(null) @@ -126,8 +172,17 @@ export function EditConnectorModal({ for (const [key, value] of Object.entries(resolved)) { if (String(connector.sourceConfig[key] ?? '') !== value) changedEntries[key] = value } - if (Object.keys(changedEntries).length > 0) { - updates.sourceConfig = { ...connector.sourceConfig, ...changedEntries } + + const modesChanged = didCanonicalModesChange(canonicalModes, persistedCanonicalModes) + + if (Object.keys(changedEntries).length > 0 || modesChanged) { + const next: Record = { ...connector.sourceConfig, ...changedEntries } + if (Object.keys(canonicalModes).length > 0) { + next[CANONICAL_MODES_KEY] = canonicalModes + } else { + delete next[CANONICAL_MODES_KEY] + } + updates.sourceConfig = next } if (Object.keys(updates).length === 0) { diff --git a/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/hooks/use-connector-config-fields.ts b/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/hooks/use-connector-config-fields.ts index 54ff7c16906..8419b749602 100644 --- a/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/hooks/use-connector-config-fields.ts +++ b/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/hooks/use-connector-config-fields.ts @@ -7,6 +7,7 @@ import type { ConnectorConfig, ConnectorConfigField } from '@/connectors/types' export interface UseConnectorConfigFieldsOptions { connectorConfig: ConnectorConfig | null initialSourceConfig?: Record + initialCanonicalModes?: Record } export interface UseConnectorConfigFieldsResult { @@ -34,11 +35,14 @@ export interface UseConnectorConfigFieldsResult { export function useConnectorConfigFields({ connectorConfig, initialSourceConfig, + initialCanonicalModes, }: UseConnectorConfigFieldsOptions): UseConnectorConfigFieldsResult { const [sourceConfig, setSourceConfig] = useState>( () => initialSourceConfig ?? {} ) - const [canonicalModes, setCanonicalModes] = useState>({}) + const [canonicalModes, setCanonicalModes] = useState>( + () => initialCanonicalModes ?? {} + ) const canonicalGroups = useMemo(() => { const groups = new Map() diff --git a/apps/sim/app/workspace/[workspaceId]/settings/[section]/settings.tsx b/apps/sim/app/workspace/[workspaceId]/settings/[section]/settings.tsx index b4fd13563dd..cb697082685 100644 --- a/apps/sim/app/workspace/[workspaceId]/settings/[section]/settings.tsx +++ b/apps/sim/app/workspace/[workspaceId]/settings/[section]/settings.tsx @@ -29,6 +29,7 @@ import { isCredentialSetsEnabled, } from '@/app/workspace/[workspaceId]/settings/navigation' import { AuditLogsSkeleton } from '@/ee/audit-logs/components/audit-logs-skeleton' +import { DataDrainsSkeleton } from '@/ee/data-drains/components/data-drains-skeleton' import { DataRetentionSkeleton } from '@/ee/data-retention/components/data-retention-skeleton' /** @@ -177,6 +178,11 @@ const DataRetentionSettings = dynamic( ), { loading: () => } ) +const DataDrainsSettings = dynamic( + () => + import('@/ee/data-drains/components/data-drains-settings').then((m) => m.DataDrainsSettings), + { loading: () => } +) const WhitelabelingSettings = dynamic( () => import('@/ee/whitelabeling/components/whitelabeling-settings').then( @@ -235,6 +241,7 @@ export function SettingsPage({ section }: SettingsPageProps) { {isBillingEnabled && effectiveSection === 'organization' && } {effectiveSection === 'sso' && } {effectiveSection === 'data-retention' && } + {effectiveSection === 'data-drains' && } {effectiveSection === 'whitelabeling' && } {effectiveSection === 'byok' && } {effectiveSection === 'copilot' && } diff --git a/apps/sim/app/workspace/[workspaceId]/settings/components/integrations/atlassian-service-account-form.tsx b/apps/sim/app/workspace/[workspaceId]/settings/components/integrations/atlassian-service-account-form.tsx new file mode 100644 index 00000000000..7ed77fe71e0 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/settings/components/integrations/atlassian-service-account-form.tsx @@ -0,0 +1,246 @@ +'use client' + +import { createElement, useState } from 'react' +import { + Badge, + Button, + Input, + Label, + ModalBody, + ModalFooter, + ModalHeader, + Textarea, + toast, +} from '@/components/emcn' +import { isApiClientError } from '@/lib/api/client/errors' +import type { OAuthServiceConfig } from '@/lib/oauth' +import { ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID } from '@/lib/oauth/types' + +interface AtlassianServiceAccountFormProps { + service: OAuthServiceConfig | null + serviceLabel: string + workspaceId: string + onBack: () => void + onCreate: (input: { + workspaceId: string + type: 'service_account' + providerId: typeof ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID + apiToken: string + domain: string + displayName?: string + description?: string + }) => Promise + onCreated: () => void +} + +const DOMAIN_HINT_REGEX = /^[a-z0-9-]+\.atlassian\.net$/i + +const ERROR_MESSAGES: Record = { + invalid_credentials: + "We couldn't authenticate with that API token. Double-check the token and that the service account has access to this site.", + site_not_found: + "We couldn't find an Atlassian site at that domain. Check the spelling — it should look like your-team.atlassian.net.", + duplicate_display_name: 'A credential with that name already exists in this workspace.', + atlassian_unavailable: + "We couldn't reach Atlassian to verify these credentials. Try again in a moment.", +} + +const FALLBACK_ERROR_MESSAGE = "We couldn't add this service account. Try again in a moment." + +function normalizeDomain(raw: string): string { + return raw + .trim() + .replace(/^https?:\/\//i, '') + .replace(/\/+$/, '') +} + +function messageForError(err: unknown): string { + if (isApiClientError(err) && err.code && ERROR_MESSAGES[err.code]) { + return ERROR_MESSAGES[err.code] + } + return FALLBACK_ERROR_MESSAGE +} + +export function AtlassianServiceAccountForm({ + service, + serviceLabel, + workspaceId, + onBack, + onCreate, + onCreated, +}: AtlassianServiceAccountFormProps) { + const [apiToken, setApiToken] = useState('') + const [domain, setDomain] = useState('') + const [displayName, setDisplayName] = useState('') + const [description, setDescription] = useState('') + const [error, setError] = useState(null) + const [isSubmitting, setIsSubmitting] = useState(false) + + const trimmedToken = apiToken.trim() + const normalizedDomain = normalizeDomain(domain) + + const canSubmit = trimmedToken.length > 0 && normalizedDomain.length > 0 && !isSubmitting + const showDomainHint = normalizedDomain.length > 0 && !DOMAIN_HINT_REGEX.test(normalizedDomain) + + const handleSubmit = async () => { + setError(null) + if (!trimmedToken || !normalizedDomain) return + + setIsSubmitting(true) + try { + await onCreate({ + workspaceId, + type: 'service_account', + providerId: ATLASSIAN_SERVICE_ACCOUNT_PROVIDER_ID, + apiToken: trimmedToken, + domain: normalizedDomain, + displayName: displayName.trim() || undefined, + description: description.trim() || undefined, + }) + toast.success('Service account connected') + onCreated() + } catch (err) { + setError(messageForError(err)) + } finally { + setIsSubmitting(false) + } + } + + return ( + <> + +
    + + Add {serviceLabel} +
    +
    + + {error && ( +
    + + {error} + +
    + )} +
    +
    +
    + {service && createElement(service.icon, { className: 'h-[18px] w-[18px]' })} +
    +
    +

    + Add {service?.name || 'Atlassian service account'} +

    +

    + {service?.description || + 'Use a scoped API token from a service account in admin.atlassian.com.'} +

    + + View setup guide + +
    +
    + +
    + + { + setApiToken(event.target.value) + setError(null) + }} + placeholder='Paste API token' + autoComplete='off' + data-lpignore='true' + className='mt-1.5' + /> +

    + Issued from the service account's profile in admin.atlassian.com. Stored encrypted. +

    +
    + +
    + + { + setDomain(event.target.value) + setError(null) + }} + placeholder='your-team.atlassian.net' + autoComplete='off' + data-lpignore='true' + className='mt-1.5' + /> + {showDomainHint && ( +

    + Atlassian sites usually look like your-team.atlassian.net. We'll strip + any leading https://. +

    + )} +
    + +
    + + setDisplayName(event.target.value)} + placeholder="Defaults to the account's Atlassian display name" + autoComplete='off' + data-lpignore='true' + className='mt-1.5' + /> +
    + +
    + +