Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { setExecutionMeta } from '@/lib/execution/event-buffer'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { createStreamingResponse } from '@/lib/workflows/streaming/streaming'
Expand Down Expand Up @@ -157,12 +156,6 @@ export const POST = withRouteHandler(
})
}

await setExecutionMeta(enqueueResult.resumeExecutionId, {
status: 'active',
userId,
workflowId,
})

const resumeArgs = {
resumeEntryId: enqueueResult.resumeEntryId,
resumeExecutionId: enqueueResult.resumeExecutionId,
Expand Down Expand Up @@ -249,6 +242,14 @@ export const POST = withRouteHandler(
error: toError(dispatchError).message,
resumeExecutionId: enqueueResult.resumeExecutionId,
})
await PauseResumeManager.markResumeAttemptFailed({
resumeEntryId: enqueueResult.resumeEntryId,
pausedExecutionId: enqueueResult.pausedExecution.id,
parentExecutionId: executionId,
contextId: enqueueResult.contextId,
failureReason: 'Failed to queue async resume execution',
})
await PauseResumeManager.processQueuedResumes(executionId)
return NextResponse.json(
{ error: 'Failed to queue resume execution. Please try again.' },
{ status: 503 }
Expand Down
231 changes: 146 additions & 85 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import {
SIM_VIA_HEADER,
validateCallChain,
} from '@/lib/execution/call-chain'
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
import {
createExecutionEventWriter,
flushExecutionStreamReplayBuffer,
initializeExecutionStreamMeta,
type TerminalExecutionStreamStatus,
} from '@/lib/execution/event-buffer'
import { processInputFileFields } from '@/lib/execution/files'
import {
registerManualExecutionAborter,
Expand Down Expand Up @@ -868,11 +873,17 @@ async function handleExecutePost(
let isManualAbortRegistered = false

const eventWriter = createExecutionEventWriter(executionId)
setExecutionMeta(executionId, {
status: 'active',
const metaInitialized = await initializeExecutionStreamMeta(executionId, {
userId: actorUserId,
workflowId,
}).catch(() => {})
})
if (!metaInitialized) {
timeoutController.cleanup()
return NextResponse.json(
{ error: 'Run buffer temporarily unavailable' },
{ status: 503, headers: { 'X-Execution-Id': executionId } }
)
}

const stream = new ReadableStream<Uint8Array>({
async start(controller) {
Expand All @@ -881,12 +892,18 @@ async function handleExecutePost(
registerManualExecutionAborter(executionId, timeoutController.abort)
isManualAbortRegistered = true

let localEventSeq = 0
const sendEvent = (event: ExecutionEvent) => {
let terminalEventPublished = false
const sendEvent = async (
event: ExecutionEvent,
terminalStatus?: TerminalExecutionStreamStatus
) => {
const isBuffered = event.type !== 'stream:chunk' && event.type !== 'stream:done'
if (isBuffered) {
localEventSeq++
event.eventId = localEventSeq
const entry = terminalStatus
? await eventWriter.writeTerminal(event, terminalStatus)
: await eventWriter.write(event)
event.eventId = entry.eventId
terminalEventPublished ||= Boolean(terminalStatus)
}
if (!isStreamClosed) {
try {
Expand All @@ -895,15 +912,12 @@ async function handleExecutePost(
isStreamClosed = true
}
}
if (isBuffered) {
eventWriter.write(event).catch(() => {})
}
}

try {
const startTime = new Date()

sendEvent({
await sendEvent({
type: 'execution:started',
timestamp: startTime.toISOString(),
executionId,
Expand All @@ -922,7 +936,7 @@ async function handleExecutePost(
childWorkflowContext?: ChildWorkflowContext
) => {
reqLogger.info('onBlockStart called', { blockId, blockName, blockType })
sendEvent({
await sendEvent({
type: 'block:started',
timestamp: new Date().toISOString(),
executionId,
Expand Down Expand Up @@ -976,7 +990,7 @@ async function handleExecutePost(
blockType,
error: callbackData.output.error,
})
sendEvent({
await sendEvent({
type: 'block:error',
timestamp: new Date().toISOString(),
executionId,
Expand Down Expand Up @@ -1010,7 +1024,7 @@ async function handleExecutePost(
blockName,
blockType,
})
sendEvent({
await sendEvent({
type: 'block:completed',
timestamp: new Date().toISOString(),
executionId,
Expand Down Expand Up @@ -1053,7 +1067,7 @@ async function handleExecutePost(
if (done) break

const chunk = decoder.decode(value, { stream: true })
sendEvent({
await sendEvent({
type: 'stream:chunk',
timestamp: new Date().toISOString(),
executionId,
Expand All @@ -1062,7 +1076,7 @@ async function handleExecutePost(
})
}

sendEvent({
await sendEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId,
Expand Down Expand Up @@ -1107,13 +1121,14 @@ async function handleExecutePost(
selectedOutputs
)

const onChildWorkflowInstanceReady = (
const onChildWorkflowInstanceReady = async (
blockId: string,
childWorkflowInstanceId: string,
iterationContext?: IterationContext,
executionOrder?: number
executionOrder?: number,
childWorkflowContext?: ChildWorkflowContext
) => {
sendEvent({
await sendEvent({
type: 'block:childWorkflowStarted',
timestamp: new Date().toISOString(),
executionId,
Expand All @@ -1123,7 +1138,16 @@ async function handleExecutePost(
childWorkflowInstanceId,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
...(iterationContext.parentIterations?.length && {
parentIterations: iterationContext.parentIterations,
}),
}),
...(childWorkflowContext && {
childWorkflowBlockId: childWorkflowContext.parentBlockId,
childWorkflowName: childWorkflowContext.workflowName,
}),
...(executionOrder !== undefined && { executionOrder }),
},
Expand Down Expand Up @@ -1157,32 +1181,38 @@ async function handleExecutePost(

await loggingSession.markAsFailed(timeoutErrorMessage)

sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: timeoutErrorMessage,
duration: result.metadata?.duration || 0,
finalBlockLogs: result.logs,
},
})
finalMetaStatus = 'error'
await sendEvent(
{
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: timeoutErrorMessage,
duration: result.metadata?.duration || 0,
finalBlockLogs: result.logs,
},
},
'error'
)
} else {
reqLogger.info('Workflow execution was cancelled')

sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
duration: result.metadata?.duration || 0,
finalBlockLogs: result.logs,
},
})
finalMetaStatus = 'cancelled'
await sendEvent(
{
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
duration: result.metadata?.duration || 0,
finalBlockLogs: result.logs,
},
},
'cancelled'
)
}
return
}
Expand All @@ -1196,35 +1226,43 @@ async function handleExecutePost(
: result.output

if (result.status === 'paused') {
sendEvent({
type: 'execution:paused',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
output: sseOutput,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
finalMetaStatus = 'complete'
await sendEvent(
{
type: 'execution:paused',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
output: sseOutput,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
finalBlockLogs: result.logs,
},
},
})
'complete'
)
} else {
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: sseOutput,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
finalBlockLogs: result.logs,
finalMetaStatus = 'complete'
await sendEvent(
{
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: sseOutput,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
finalBlockLogs: result.logs,
},
},
})
'complete'
)
}
finalMetaStatus = 'complete'
} catch (error: unknown) {
const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut()
const errorMessage = isTimeout
Expand All @@ -1237,32 +1275,55 @@ async function handleExecutePost(

const executionResult = hasExecutionResult(error) ? error.executionResult : undefined

sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: executionResult?.error || errorMessage,
duration: executionResult?.metadata?.duration || 0,
finalBlockLogs: executionResult?.logs,
},
})
finalMetaStatus = 'error'
await sendEvent(
{
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: executionResult?.error || errorMessage,
duration: executionResult?.metadata?.duration || 0,
finalBlockLogs: executionResult?.logs,
},
},
'error'
)
} finally {
if (isManualAbortRegistered) {
unregisterManualExecutionAborter(executionId)
isManualAbortRegistered = false
}
try {
await eventWriter.close()
} catch (closeError) {
reqLogger.warn('Failed to close event writer', {
error: toError(closeError).message,
if (finalMetaStatus && !terminalEventPublished) {
const replayBufferFlushed = await flushExecutionStreamReplayBuffer(
executionId,
eventWriter
)
reqLogger.error('Failed to publish terminal execution event durably', {
executionId,
status: finalMetaStatus,
replayBufferFlushed,
})
}
if (finalMetaStatus) {
setExecutionMeta(executionId, { status: finalMetaStatus }).catch(() => {})
if (!isStreamClosed) {
controller.error(new Error('Run buffer terminal event publish failed'))
isStreamClosed = true
}
} else if (terminalEventPublished) {
await eventWriter.close().catch((closeError) => {
reqLogger.warn('Failed to close execution event writer after terminal publish', {
executionId,
error: closeError instanceof Error ? closeError.message : String(closeError),
})
})
} else {
try {
await eventWriter.close()
} catch (closeError) {
reqLogger.warn('Failed to close event writer', {
error: toError(closeError).message,
})
}
}
timeoutController.cleanup()
if (executionId) {
Expand Down
Loading
Loading