From d7a6339068db59a782182127582433103d3de080 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 2 May 2026 18:29:21 -0700 Subject: [PATCH 1/2] Add queueing for hosted keys --- .../hosted-key-rate-limiter.test.ts | 79 +++++- .../hosted-key/hosted-key-rate-limiter.ts | 247 +++++++++++++++--- apps/sim/lib/core/telemetry.ts | 41 +++ apps/sim/tools/index.ts | 84 +++++- 4 files changed, 404 insertions(+), 47 deletions(-) diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts index 50cf346222d..ace2e9569f4 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts @@ -7,6 +7,10 @@ import type { import { HostedKeyRateLimiter } from './hosted-key-rate-limiter' import type { CustomRateLimit, PerRequestRateLimit } from './types' +/** Force the queue wait to give up on the first iteration by reporting a retry time + * larger than the 5-minute MAX_QUEUE_WAIT_MS cap. */ +const RETRY_PAST_CAP_MS = 6 * 60 * 1000 + interface MockAdapter { consumeTokens: Mock getTokenStatus: Mock @@ -72,11 +76,12 @@ describe('HostedKeyRateLimiter', () => { expect(result.error).toContain('No hosted keys configured') }) - it('should rate limit billing actor when they exceed their limit', async () => { + it('should rate limit billing actor when wait exceeds the queue cap', async () => { + // resetAt past the 5-minute cap forces the wait loop to bail immediately. const rateLimitedResult: ConsumeResult = { allowed: false, tokensRemaining: 0, - resetAt: new Date(Date.now() + 30000), + resetAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.consumeTokens.mockResolvedValue(rateLimitedResult) @@ -93,6 +98,33 @@ describe('HostedKeyRateLimiter', () => { expect(result.error).toContain('Rate limit exceeded') }) + it('should wait for capacity then succeed when bucket refills within the cap', async () => { + // First call: bucket empty, refills in 100ms (well under cap). + // Second call: bucket has capacity, consumed. + const blocked: ConsumeResult = { + allowed: false, + tokensRemaining: 0, + resetAt: new Date(Date.now() + 100), + } + const allowed: ConsumeResult = { + allowed: true, + tokensRemaining: 9, + resetAt: new Date(Date.now() + 60000), + } + mockAdapter.consumeTokens.mockResolvedValueOnce(blocked).mockResolvedValueOnce(allowed) + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-wait' + ) + + expect(result.success).toBe(true) + expect(result.key).toBe('test-key-1') + expect(mockAdapter.consumeTokens).toHaveBeenCalledTimes(2) + }) + it('should allow billing actor within their rate limit', async () => { const allowedResult: ConsumeResult = { allowed: true, @@ -197,11 +229,11 @@ describe('HostedKeyRateLimiter', () => { ], } - it('should enforce requestsPerMinute for custom mode', async () => { + it('should enforce requestsPerMinute for custom mode when wait exceeds the cap', async () => { const rateLimitedResult: ConsumeResult = { allowed: false, tokensRemaining: 0, - resetAt: new Date(Date.now() + 30000), + resetAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.consumeTokens.mockResolvedValue(rateLimitedResult) @@ -246,7 +278,7 @@ describe('HostedKeyRateLimiter', () => { expect(mockAdapter.getTokenStatus).toHaveBeenCalledTimes(1) }) - it('should block request when a dimension is depleted', async () => { + it('should block request when a dimension wait exceeds the cap', async () => { const allowedConsume: ConsumeResult = { allowed: true, tokensRemaining: 4, @@ -258,7 +290,7 @@ describe('HostedKeyRateLimiter', () => { tokensAvailable: 0, maxTokens: 2000, lastRefillAt: new Date(), - nextRefillAt: new Date(Date.now() + 45000), + nextRefillAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.getTokenStatus.mockResolvedValue(depleted) @@ -274,6 +306,39 @@ describe('HostedKeyRateLimiter', () => { expect(result.error).toContain('tokens') }) + it('should wait for dimension capacity then succeed when budget refills', async () => { + const allowedConsume: ConsumeResult = { + allowed: true, + tokensRemaining: 4, + resetAt: new Date(Date.now() + 60000), + } + mockAdapter.consumeTokens.mockResolvedValue(allowedConsume) + + const depleted: TokenStatus = { + tokensAvailable: 0, + maxTokens: 2000, + lastRefillAt: new Date(), + nextRefillAt: new Date(Date.now() + 100), + } + const refilled: TokenStatus = { + tokensAvailable: 500, + maxTokens: 2000, + lastRefillAt: new Date(), + nextRefillAt: new Date(Date.now() + 60000), + } + mockAdapter.getTokenStatus.mockResolvedValueOnce(depleted).mockResolvedValueOnce(refilled) + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + customRateLimit, + 'workspace-dim-wait' + ) + + expect(result.success).toBe(true) + expect(mockAdapter.getTokenStatus).toHaveBeenCalledTimes(2) + }) + it('should pre-check all dimensions and block on first depleted one', async () => { const multiDimensionConfig: CustomRateLimit = { mode: 'custom', @@ -309,7 +374,7 @@ describe('HostedKeyRateLimiter', () => { tokensAvailable: 0, maxTokens: 100, lastRefillAt: new Date(), - nextRefillAt: new Date(Date.now() + 30000), + nextRefillAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.getTokenStatus .mockResolvedValueOnce(tokensBudget) diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts index a20cf8413f3..d07efd055f8 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts @@ -1,9 +1,13 @@ import { createLogger } from '@sim/logger' +import { sleep } from '@sim/utils/helpers' +import { generateShortId } from '@sim/utils/id' +import { acquireLock, releaseLock } from '@/lib/core/config/redis' import { createStorageAdapter, type RateLimitStorageAdapter, type TokenBucketConfig, } from '@/lib/core/rate-limiter/storage' +import { PlatformEvents } from '@/lib/core/telemetry' import { type AcquireKeyResult, type CustomRateLimit, @@ -16,6 +20,24 @@ import { const logger = createLogger('HostedKeyRateLimiter') +/** + * Maximum time a hosted-key acquisition will wait for the per-workspace bucket + * to refill before falling back to a 429. Sized comfortably under the 90-min + * Trigger.dev container ceiling so a queued call still has time to actually + * execute after acquisition. + */ +const MAX_QUEUE_WAIT_MS = 5 * 60 * 1000 + +/** + * Floor on per-iteration sleep when the bucket reports `retryAfterMs <= 0`, + * which can happen due to clock skew or sub-millisecond resets. Prevents a + * tight retry loop hammering the storage adapter. + */ +const MIN_QUEUE_RETRY_DELAY_MS = 50 + +/** TTL slack on the FIFO lock — a crashed worker can't permanently block its workspace. */ +const QUEUE_LOCK_TTL_SECONDS = Math.ceil(MAX_QUEUE_WAIT_MS / 1000) + 30 + /** * Resolves env var names for a numbered key prefix using a `{PREFIX}_COUNT` env var. * E.g. with `EXA_API_KEY_COUNT=5`, returns `['EXA_API_KEY_1', ..., 'EXA_API_KEY_5']`. @@ -179,11 +201,16 @@ export class HostedKeyRateLimiter { * Acquire an available key via round-robin selection. * * For both modes: - * 1. Per-billing-actor request rate limiting (enforced): blocks actors who exceed their request limit + * 1. Per-billing-actor request rate limiting (enforced): when the actor is over their + * limit, the call blocks (waits for refill) up to `MAX_QUEUE_WAIT_MS`. A Redis + * FIFO lock keyed on `{provider, billingActorId}` keeps callers in the same + * workspace serialized so the bucket drains predictably. * 2. Round-robin key selection: cycles through available keys for even distribution * * For `custom` mode additionally: - * 3. Pre-checks dimension budgets: blocks if any dimension is already depleted + * 3. Pre-checks dimension budgets: same wait-for-refill behavior if a dimension is depleted + * + * If the wait exceeds the cap, the call falls back to today's 429 result. * * @param envKeyPrefix - Env var prefix (e.g. 'EXA_API_KEY'). Keys resolved via `{prefix}_COUNT`. * @param billingActorId - The billing actor (typically workspace ID) to rate limit against @@ -194,56 +221,198 @@ export class HostedKeyRateLimiter { config: HostedKeyRateLimitConfig, billingActorId: string ): Promise { - if (config.requestsPerMinute) { - const rateLimitResult = await this.checkActorRateLimit(provider, billingActorId, config) - if (rateLimitResult) { - return { - success: false, - billingActorRateLimited: true, - retryAfterMs: rateLimitResult.retryAfterMs, - error: `Rate limit exceeded. Please wait ${Math.ceil(rateLimitResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + const lockKey = `hosted-queue:${provider}:${billingActorId}` + const lockValue = generateShortId() + const lockHeld = await this.acquireFifoLock(lockKey, lockValue) + + try { + if (config.requestsPerMinute) { + const rateLimitResult = await this.waitForActorCapacity(provider, billingActorId, config) + if (rateLimitResult.rateLimited) { + return { + success: false, + billingActorRateLimited: true, + retryAfterMs: rateLimitResult.retryAfterMs, + error: `Rate limit exceeded. Please wait ${Math.ceil(rateLimitResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + } } } - } - if (config.mode === 'custom' && config.dimensions.length > 0) { - const dimensionResult = await this.preCheckDimensions(provider, billingActorId, config) - if (dimensionResult) { + if (config.mode === 'custom' && config.dimensions.length > 0) { + const dimensionResult = await this.waitForDimensionCapacity( + provider, + billingActorId, + config + ) + if (dimensionResult.rateLimited) { + return { + success: false, + billingActorRateLimited: true, + retryAfterMs: dimensionResult.retryAfterMs, + error: `Rate limit exceeded for ${dimensionResult.dimension}. Please wait ${Math.ceil(dimensionResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + } + } + } + + const envKeys = resolveEnvKeys(envKeyPrefix) + const availableKeys = this.getAvailableKeys(envKeys) + + if (availableKeys.length === 0) { + logger.warn(`No hosted keys configured for provider ${provider}`) return { success: false, - billingActorRateLimited: true, - retryAfterMs: dimensionResult.retryAfterMs, - error: `Rate limit exceeded for ${dimensionResult.dimension}. Please wait ${Math.ceil(dimensionResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + error: `No hosted keys configured for ${provider}`, } } - } - const envKeys = resolveEnvKeys(envKeyPrefix) - const availableKeys = this.getAvailableKeys(envKeys) + const counter = this.roundRobinCounters.get(provider) ?? 0 + const selected = availableKeys[counter % availableKeys.length] + this.roundRobinCounters.set(provider, counter + 1) + + logger.debug(`Selected hosted key for ${provider}`, { + provider, + keyIndex: selected.keyIndex, + envVarName: selected.envVarName, + }) - if (availableKeys.length === 0) { - logger.warn(`No hosted keys configured for provider ${provider}`) return { - success: false, - error: `No hosted keys configured for ${provider}`, + success: true, + key: selected.key, + keyIndex: selected.keyIndex, + envVarName: selected.envVarName, + } + } finally { + if (lockHeld) { + await this.releaseFifoLock(lockKey, lockValue) } } + } + + /** + * Acquire the per-workspace+provider FIFO lock that serializes queue waits. + * Returns true if the lock was held by this caller (or Redis is unavailable, in which + * case the lock is a no-op and we proceed without fairness). Returns false if the lock + * is already held by another caller and we should still proceed without waiting on it + * (correctness is preserved by the token bucket; we just lose fairness). + */ + private async acquireFifoLock(lockKey: string, lockValue: string): Promise { + try { + return await acquireLock(lockKey, lockValue, QUEUE_LOCK_TTL_SECONDS) + } catch (error) { + logger.warn(`Failed to acquire hosted-queue FIFO lock ${lockKey}`, { error }) + return false + } + } + + /** + * Release the per-workspace+provider FIFO lock. Best-effort; logs but does not throw. + */ + private async releaseFifoLock(lockKey: string, lockValue: string): Promise { + try { + await releaseLock(lockKey, lockValue) + } catch (error) { + logger.warn(`Failed to release hosted-queue FIFO lock ${lockKey}`, { error }) + } + } + + /** + * Wait for actor request-rate capacity. Re-checks the bucket after each refill window + * up to `MAX_QUEUE_WAIT_MS`. Returns `{ rateLimited: false }` once a token has been + * consumed (the underlying check is consume-on-success, matching the original behavior). + */ + private async waitForActorCapacity( + provider: string, + billingActorId: string, + config: HostedKeyRateLimitConfig + ): Promise<{ rateLimited: false } | { rateLimited: true; retryAfterMs: number }> { + const startedAt = Date.now() + let attempts = 0 + + while (true) { + const result = await this.checkActorRateLimit(provider, billingActorId, config) + attempts++ + + if (!result) { + if (attempts > 1) { + PlatformEvents.hostedKeyQueueWaited({ + provider, + workspaceId: billingActorId, + waitedMs: Date.now() - startedAt, + attempts, + reason: 'actor_requests', + }) + } + return { rateLimited: false } + } + + const elapsed = Date.now() - startedAt + const remaining = MAX_QUEUE_WAIT_MS - elapsed + if (remaining <= 0 || result.retryAfterMs > remaining) { + PlatformEvents.hostedKeyQueueWaitExceeded({ + provider, + workspaceId: billingActorId, + waitedMs: elapsed, + reason: 'actor_requests', + }) + return { rateLimited: true, retryAfterMs: result.retryAfterMs } + } + + const sleepMs = Math.max(MIN_QUEUE_RETRY_DELAY_MS, result.retryAfterMs) + await sleep(sleepMs) + } + } + + /** + * Wait for custom-mode dimension capacity. `preCheckDimensions` is read-only — it does + * not consume — so re-running it after a sleep is safe and does not double-charge. + * Post-execution `reportUsage` performs the actual consumption. + */ + private async waitForDimensionCapacity( + provider: string, + billingActorId: string, + config: CustomRateLimit + ): Promise< + { rateLimited: false } | { rateLimited: true; retryAfterMs: number; dimension: string } + > { + const startedAt = Date.now() + let attempts = 0 + + while (true) { + const result = await this.preCheckDimensions(provider, billingActorId, config) + attempts++ + + if (!result) { + if (attempts > 1) { + PlatformEvents.hostedKeyQueueWaited({ + provider, + workspaceId: billingActorId, + waitedMs: Date.now() - startedAt, + attempts, + reason: 'dimension', + }) + } + return { rateLimited: false } + } + + const elapsed = Date.now() - startedAt + const remaining = MAX_QUEUE_WAIT_MS - elapsed + if (remaining <= 0 || result.retryAfterMs > remaining) { + PlatformEvents.hostedKeyQueueWaitExceeded({ + provider, + workspaceId: billingActorId, + waitedMs: elapsed, + reason: 'dimension', + dimension: result.dimension, + }) + return { + rateLimited: true, + retryAfterMs: result.retryAfterMs, + dimension: result.dimension, + } + } - const counter = this.roundRobinCounters.get(provider) ?? 0 - const selected = availableKeys[counter % availableKeys.length] - this.roundRobinCounters.set(provider, counter + 1) - - logger.debug(`Selected hosted key for ${provider}`, { - provider, - keyIndex: selected.keyIndex, - envVarName: selected.envVarName, - }) - - return { - success: true, - key: selected.key, - keyIndex: selected.keyIndex, - envVarName: selected.envVarName, + const sleepMs = Math.max(MIN_QUEUE_RETRY_DELAY_MS, result.retryAfterMs) + await sleep(sleepMs) } } diff --git a/apps/sim/lib/core/telemetry.ts b/apps/sim/lib/core/telemetry.ts index 34af72809f1..ce97a94e8a4 100644 --- a/apps/sim/lib/core/telemetry.ts +++ b/apps/sim/lib/core/telemetry.ts @@ -1002,6 +1002,47 @@ export const PlatformEvents = { }) }, + /** + * Track a successful hosted-key acquisition that had to wait for capacity. + * Fires after the actor or dimension bucket refilled enough for the call to proceed. + */ + hostedKeyQueueWaited: (attrs: { + provider: string + workspaceId: string + waitedMs: number + attempts: number + reason: 'actor_requests' | 'dimension' + dimension?: string + }) => { + trackPlatformEvent('platform.hosted_key.queue_waited', { + 'provider.id': attrs.provider, + 'workspace.id': attrs.workspaceId, + 'queue.waited_ms': attrs.waitedMs, + 'queue.attempts': attrs.attempts, + 'queue.reason': attrs.reason, + ...(attrs.dimension && { 'queue.dimension': attrs.dimension }), + }) + }, + + /** + * Track a hosted-key acquisition that exceeded the queue wait cap and fell back to a 429. + */ + hostedKeyQueueWaitExceeded: (attrs: { + provider: string + workspaceId: string + waitedMs: number + reason: 'actor_requests' | 'dimension' + dimension?: string + }) => { + trackPlatformEvent('platform.hosted_key.queue_wait_exceeded', { + 'provider.id': attrs.provider, + 'workspace.id': attrs.workspaceId, + 'queue.waited_ms': attrs.waitedMs, + 'queue.reason': attrs.reason, + ...(attrs.dimension && { 'queue.dimension': attrs.dimension }), + }) + }, + /** * Track chat deployed (workflow deployed as chat interface) */ diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index 3bf69a56101..a90ab0cee24 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -302,6 +302,48 @@ async function injectHostedKeyIfNeeded( } } +/** + * Re-acquire a hosted key after upstream-429 retries have been exhausted. Calls + * `acquireKey` (which now blocks on the per-workspace bucket) and re-injects the + * fresh key into `params`. Returns false if no key could be obtained — caller + * should re-throw the original upstream 429. + * + * Does not consult BYOK. We only enter this path from inside the hosted-key + * branch of `executeTool`, so BYOK has already been ruled out for this call. + */ +async function reacquireHostedKey( + tool: ToolConfig, + params: Record, + executionContext: ExecutionContext | undefined, + requestId: string +): Promise { + if (!tool.hosting) return false + const { envKeyPrefix, apiKeyParam, byokProviderId, rateLimit } = tool.hosting + const { workspaceId } = resolveToolScope(params, executionContext) + if (!workspaceId) return false + + const provider = byokProviderId || tool.id + const acquireResult = await getHostedKeyRateLimiter().acquireKey( + provider, + envKeyPrefix, + rateLimit, + workspaceId + ) + + if (!acquireResult.success || !acquireResult.key) { + logger.warn( + `[${requestId}] Re-acquire of hosted key for ${tool.id} failed: ${acquireResult.error ?? 'unknown'}` + ) + return false + } + + params[apiKeyParam] = acquireResult.key + logger.info( + `[${requestId}] Re-acquired hosted key for ${tool.id} (${acquireResult.envVarName}) after upstream throttling` + ) + return true +} + /** * Check if an error is a rate limit (throttling) or quota exhaustion error. * Some providers (e.g. Perplexity) return 401/403 with "insufficient_quota" @@ -328,11 +370,23 @@ interface RetryContext { toolId: string envVarName: string executionContext?: ExecutionContext + /** + * Optional callback invoked after the local exponential backoff has been exhausted by + * upstream 429s. Should re-enter the per-workspace hosted-key queue (which now blocks + * on the bucket) and return a fresh execution thunk bound to the newly acquired key. + * If the callback returns null, we give up and re-throw the last error. + */ + reacquireAfterRetriesExhausted?: () => Promise<(() => Promise) | null> } /** * Execute a function with exponential backoff retry for rate limiting errors. * Only used for hosted key requests. Tracks rate limit events via telemetry. + * + * On terminal upstream 429, optionally re-enters the hosted-key queue (which waits for + * the per-workspace bucket to refill) and retries once with a freshly acquired key. + * This handles the case where the upstream provider's limit is tighter than ours — we + * re-queue the call instead of surfacing the error. */ async function executeWithRetry( fn: () => Promise, @@ -340,7 +394,8 @@ async function executeWithRetry( maxRetries = 3, baseDelayMs = 1000 ): Promise { - const { requestId, toolId, envVarName, executionContext } = context + const { requestId, toolId, envVarName, executionContext, reacquireAfterRetriesExhausted } = + context let lastError: unknown for (let attempt = 0; attempt <= maxRetries; attempt++) { @@ -351,6 +406,23 @@ async function executeWithRetry( if (!isRateLimitError(error) || attempt === maxRetries) { if (isRateLimitError(error) && attempt === maxRetries) { + if (reacquireAfterRetriesExhausted) { + try { + const requeued = await reacquireAfterRetriesExhausted() + if (requeued) { + logger.warn( + `[${requestId}] Upstream retries exhausted for ${toolId} (${envVarName}); re-queued and retrying once with fresh key` + ) + return (await requeued()) as T + } + } catch (requeueError) { + logger.error( + `[${requestId}] Re-queue after exhausted upstream retries failed for ${toolId}`, + { error: toError(requeueError).message } + ) + } + } + PlatformEvents.hostedKeyUserThrottled({ toolId, reason: 'upstream_retries_exhausted', @@ -984,6 +1056,16 @@ export async function executeTool( toolId, envVarName: hostedKeyInfo.envVarName!, executionContext, + reacquireAfterRetriesExhausted: async () => { + const reacquired = await reacquireHostedKey( + tool, + contextParams, + executionContext, + requestId + ) + if (!reacquired) return null + return () => executeToolRequest(toolId, tool, contextParams) + }, }) : await executeToolRequest(toolId, tool, contextParams) From 0b80ed34732d27c974ac943a0c41354ebb5708e7 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 4 May 2026 17:30:16 -0700 Subject: [PATCH 2/2] feat(rate-limiter): FIFO queue for hosted-key per-workspace fairness Replace the per-call distributed lock with a Redis-backed FIFO queue so callers within a workspace get strict ordering instead of racing the bucket. Adds heartbeat-based crash recovery and dead-head reaping in a single Lua script. Bumps Exa search hosted RPM from 5 to 60. --- .../hosted-key-rate-limiter.test.ts | 156 +++++++++++- .../hosted-key/hosted-key-rate-limiter.ts | 199 +++++++++------ .../rate-limiter/hosted-key/queue.test.ts | 226 ++++++++++++++++++ .../lib/core/rate-limiter/hosted-key/queue.ts | 203 ++++++++++++++++ apps/sim/lib/core/telemetry.ts | 11 +- apps/sim/tools/exa/search.ts | 2 +- 6 files changed, 715 insertions(+), 82 deletions(-) create mode 100644 apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts create mode 100644 apps/sim/lib/core/rate-limiter/hosted-key/queue.ts diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts index ace2e9569f4..f6783254566 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts @@ -5,6 +5,7 @@ import type { TokenStatus, } from '@/lib/core/rate-limiter/storage' import { HostedKeyRateLimiter } from './hosted-key-rate-limiter' +import type { HostedKeyQueue } from './queue' import type { CustomRateLimit, PerRequestRateLimit } from './types' /** Force the queue wait to give up on the first iteration by reporting a retry time @@ -23,10 +24,30 @@ const createMockAdapter = (): MockAdapter => ({ resetBucket: vi.fn(), }) +interface MockQueue { + enqueue: Mock + checkHead: Mock + refreshHeartbeat: Mock + dequeue: Mock +} + +/** Stub queue that defaults to "you're at the head, no waiting" — i.e. acts as if the + * queue is empty or Redis is unavailable. Tests override per-call to simulate ordering. */ +const createMockQueue = (): MockQueue => { + const queue: MockQueue = { + enqueue: vi.fn().mockResolvedValue({ position: 0, enabled: true }), + checkHead: vi.fn().mockResolvedValue('head'), + refreshHeartbeat: vi.fn().mockResolvedValue(undefined), + dequeue: vi.fn().mockResolvedValue(undefined), + } + return queue +} + describe('HostedKeyRateLimiter', () => { const testProvider = 'exa' const envKeyPrefix = 'EXA_API_KEY' let mockAdapter: MockAdapter + let mockQueue: MockQueue let rateLimiter: HostedKeyRateLimiter let originalEnv: NodeJS.ProcessEnv @@ -38,7 +59,11 @@ describe('HostedKeyRateLimiter', () => { beforeEach(() => { vi.clearAllMocks() mockAdapter = createMockAdapter() - rateLimiter = new HostedKeyRateLimiter(mockAdapter as RateLimitStorageAdapter) + mockQueue = createMockQueue() + rateLimiter = new HostedKeyRateLimiter( + mockAdapter as RateLimitStorageAdapter, + mockQueue as unknown as HostedKeyQueue + ) originalEnv = { ...process.env } process.env.EXA_API_KEY_COUNT = '3' @@ -216,6 +241,135 @@ describe('HostedKeyRateLimiter', () => { }) }) + describe('FIFO queue ordering', () => { + const allowed: ConsumeResult = { + allowed: true, + tokensRemaining: 9, + resetAt: new Date(Date.now() + 60000), + } + + it('enqueues every call onto the per-workspace+provider queue', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + + await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1') + + expect(mockQueue.enqueue).toHaveBeenCalledWith( + testProvider, + 'workspace-1', + expect.any(String) + ) + }) + + it('always dequeues at the end of a successful acquisition', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + + await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1') + + expect(mockQueue.dequeue).toHaveBeenCalledWith( + testProvider, + 'workspace-1', + expect.any(String) + ) + }) + + it('always dequeues even when the call fails (no keys configured)', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + process.env.EXA_API_KEY_COUNT = '0' + + await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1') + + expect(mockQueue.dequeue).toHaveBeenCalled() + }) + + it('waits at the head of the queue before consuming from the bucket', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + // First two checkHead calls say we're waiting; third says we're up. + mockQueue.checkHead + .mockResolvedValueOnce('waiting') + .mockResolvedValueOnce('waiting') + .mockResolvedValueOnce('head') + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1' + ) + + expect(result.success).toBe(true) + expect(mockQueue.checkHead).toHaveBeenCalledTimes(3) + // Bucket is only consumed once we reach the head. + expect(mockAdapter.consumeTokens).toHaveBeenCalledTimes(1) + }) + + it('refreshes the heartbeat while waiting at the head of the queue', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + + // We need the wait loop to iterate long enough for HEARTBEAT_REFRESH_INTERVAL_MS + // to elapse. Use fake timers so we don't actually sleep. + vi.useFakeTimers() + try { + // Queue says we're waiting forever — except after some time we're at head. + mockQueue.checkHead.mockImplementation(async () => { + // Advance past the heartbeat interval each time we poll, then say we're up. + vi.advanceTimersByTime(15_000) + return mockQueue.checkHead.mock.calls.length >= 2 ? 'head' : 'waiting' + }) + + const promise = rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1' + ) + // Drain pending timers so the sleep() resolves. + await vi.runAllTimersAsync() + await promise + + expect(mockQueue.refreshHeartbeat).toHaveBeenCalled() + } finally { + vi.useRealTimers() + } + }) + + it('returns 429 when the queue wait exceeds the cap', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + mockQueue.checkHead.mockResolvedValue('waiting') + + vi.useFakeTimers() + try { + const promise = rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1' + ) + // Burn past the 5-minute cap. + await vi.advanceTimersByTimeAsync(6 * 60 * 1000) + const result = await promise + + expect(result.success).toBe(false) + expect(result.billingActorRateLimited).toBe(true) + } finally { + vi.useRealTimers() + } + }) + + it('treats "missing" status as proceed (queue evicted, fall through to bucket race)', async () => { + mockAdapter.consumeTokens.mockResolvedValue(allowed) + mockQueue.checkHead.mockResolvedValueOnce('missing') + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-1' + ) + + expect(result.success).toBe(true) + }) + }) + describe('acquireKey with custom rate limit', () => { const customRateLimit: CustomRateLimit = { mode: 'custom', diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts index d07efd055f8..55b21aeaf79 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts @@ -1,13 +1,13 @@ import { createLogger } from '@sim/logger' import { sleep } from '@sim/utils/helpers' import { generateShortId } from '@sim/utils/id' -import { acquireLock, releaseLock } from '@/lib/core/config/redis' import { createStorageAdapter, type RateLimitStorageAdapter, type TokenBucketConfig, } from '@/lib/core/rate-limiter/storage' import { PlatformEvents } from '@/lib/core/telemetry' +import { getHostedKeyQueue, HEARTBEAT_REFRESH_INTERVAL_MS, type HostedKeyQueue } from './queue' import { type AcquireKeyResult, type CustomRateLimit, @@ -35,8 +35,13 @@ const MAX_QUEUE_WAIT_MS = 5 * 60 * 1000 */ const MIN_QUEUE_RETRY_DELAY_MS = 50 -/** TTL slack on the FIFO lock — a crashed worker can't permanently block its workspace. */ -const QUEUE_LOCK_TTL_SECONDS = Math.ceil(MAX_QUEUE_WAIT_MS / 1000) + 30 +/** + * Poll interval while waiting to reach the head of the FIFO queue. 200ms balances + * acquisition latency (worst-case wait for advancement is one poll period) against + * Redis load — at this cadence, N waiters generate N×5 EVAL/sec, which is fine for + * the typical low-tens contention. Revisit if telemetry shows hot Redis under load. + */ +const QUEUE_HEAD_POLL_MS = 200 /** * Resolves env var names for a numbered key prefix using a `{PREFIX}_COUNT` env var. @@ -74,11 +79,13 @@ interface AvailableKey { */ export class HostedKeyRateLimiter { private storage: RateLimitStorageAdapter + private queue: HostedKeyQueue /** Round-robin counter per provider for even key distribution */ private roundRobinCounters = new Map() - constructor(storage?: RateLimitStorageAdapter) { + constructor(storage?: RateLimitStorageAdapter, queue?: HostedKeyQueue) { this.storage = storage ?? createStorageAdapter() + this.queue = queue ?? getHostedKeyQueue() } private buildActorStorageKey(provider: string, billingActorId: string): string { @@ -201,16 +208,20 @@ export class HostedKeyRateLimiter { * Acquire an available key via round-robin selection. * * For both modes: - * 1. Per-billing-actor request rate limiting (enforced): when the actor is over their - * limit, the call blocks (waits for refill) up to `MAX_QUEUE_WAIT_MS`. A Redis - * FIFO lock keyed on `{provider, billingActorId}` keeps callers in the same - * workspace serialized so the bucket drains predictably. + * 1. Per-billing-actor request rate limiting (enforced): the call enqueues itself + * onto a per-workspace+provider FIFO queue. Only the head of the queue attempts + * to consume from the token bucket, guaranteeing strict ordering across callers + * within a workspace. Different workspaces have independent queues and don't + * block each other. * 2. Round-robin key selection: cycles through available keys for even distribution * * For `custom` mode additionally: - * 3. Pre-checks dimension budgets: same wait-for-refill behavior if a dimension is depleted + * 3. Pre-checks dimension budgets: head waits on dimension refill the same way it + * waits on actor request capacity. * - * If the wait exceeds the cap, the call falls back to today's 429 result. + * If the total wait (queue position + bucket refill) exceeds `MAX_QUEUE_WAIT_MS`, the + * call falls back to today's 429 result. The ticket is removed from the queue on exit + * regardless of success or failure. * * @param envKeyPrefix - Env var prefix (e.g. 'EXA_API_KEY'). Keys resolved via `{prefix}_COUNT`. * @param billingActorId - The billing actor (typically workspace ID) to rate limit against @@ -221,13 +232,36 @@ export class HostedKeyRateLimiter { config: HostedKeyRateLimitConfig, billingActorId: string ): Promise { - const lockKey = `hosted-queue:${provider}:${billingActorId}` - const lockValue = generateShortId() - const lockHeld = await this.acquireFifoLock(lockKey, lockValue) + const ticketId = generateShortId() + const startedAt = Date.now() + const enqueueResult = await this.queue.enqueue(provider, billingActorId, ticketId) try { + // Wait for our turn at the head of the queue (no-op when Redis unavailable). + const headStatus = await this.waitForQueueHead(provider, billingActorId, ticketId, startedAt) + if (headStatus.timedOut) { + PlatformEvents.hostedKeyQueueWaitExceeded({ + provider, + workspaceId: billingActorId, + waitedMs: Date.now() - startedAt, + reason: 'queue_position', + }) + return { + success: false, + billingActorRateLimited: true, + retryAfterMs: MAX_QUEUE_WAIT_MS, + error: `Rate limit exceeded — request waited too long in the queue. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + } + } + if (config.requestsPerMinute) { - const rateLimitResult = await this.waitForActorCapacity(provider, billingActorId, config) + const rateLimitResult = await this.waitForActorCapacity( + provider, + billingActorId, + ticketId, + config, + startedAt + ) if (rateLimitResult.rateLimited) { return { success: false, @@ -242,7 +276,9 @@ export class HostedKeyRateLimiter { const dimensionResult = await this.waitForDimensionCapacity( provider, billingActorId, - config + ticketId, + config, + startedAt ) if (dimensionResult.rateLimited) { return { @@ -254,6 +290,18 @@ export class HostedKeyRateLimiter { } } + const totalWaitedMs = Date.now() - startedAt + if (enqueueResult.enabled && (enqueueResult.position > 0 || totalWaitedMs > 100)) { + PlatformEvents.hostedKeyQueueWaited({ + provider, + workspaceId: billingActorId, + waitedMs: totalWaitedMs, + attempts: 1, + reason: 'queue_position', + queuePosition: enqueueResult.position, + }) + } + const envKeys = resolveEnvKeys(envKeyPrefix) const availableKeys = this.getAvailableKeys(envKeys) @@ -282,68 +330,69 @@ export class HostedKeyRateLimiter { envVarName: selected.envVarName, } } finally { - if (lockHeld) { - await this.releaseFifoLock(lockKey, lockValue) - } + // Always remove our ticket so the next caller can advance, regardless of whether + // we succeeded, hit the cap, or threw. Best-effort; safe to call multiple times. + await this.queue.dequeue(provider, billingActorId, ticketId) } } /** - * Acquire the per-workspace+provider FIFO lock that serializes queue waits. - * Returns true if the lock was held by this caller (or Redis is unavailable, in which - * case the lock is a no-op and we proceed without fairness). Returns false if the lock - * is already held by another caller and we should still proceed without waiting on it - * (correctness is preserved by the token bucket; we just lose fairness). + * Block until our ticket reaches the head of the queue. Refreshes the heartbeat on a + * regular cadence so we don't get reaped as dead. Returns `timedOut: true` if we exceed + * `MAX_QUEUE_WAIT_MS` before reaching the head. + * + * No-op when Redis is unavailable (queue.enqueue returns enabled=false and checkHead + * always returns 'head'). */ - private async acquireFifoLock(lockKey: string, lockValue: string): Promise { - try { - return await acquireLock(lockKey, lockValue, QUEUE_LOCK_TTL_SECONDS) - } catch (error) { - logger.warn(`Failed to acquire hosted-queue FIFO lock ${lockKey}`, { error }) - return false - } - } + private async waitForQueueHead( + provider: string, + billingActorId: string, + ticketId: string, + startedAt: number + ): Promise<{ timedOut: boolean }> { + let lastHeartbeatAt = Date.now() - /** - * Release the per-workspace+provider FIFO lock. Best-effort; logs but does not throw. - */ - private async releaseFifoLock(lockKey: string, lockValue: string): Promise { - try { - await releaseLock(lockKey, lockValue) - } catch (error) { - logger.warn(`Failed to release hosted-queue FIFO lock ${lockKey}`, { error }) + while (true) { + const status = await this.queue.checkHead(provider, billingActorId, ticketId) + if (status === 'head') return { timedOut: false } + + // 'missing' shouldn't normally happen — queue list TTL is 10min and our cap is 5min — + // but if it does (e.g. Redis flushed mid-wait), treat as "you're up" so the caller + // proceeds to the bucket race rather than hanging forever. + if (status === 'missing') return { timedOut: false } + + const elapsed = Date.now() - startedAt + if (elapsed >= MAX_QUEUE_WAIT_MS) { + return { timedOut: true } + } + + if (Date.now() - lastHeartbeatAt >= HEARTBEAT_REFRESH_INTERVAL_MS) { + await this.queue.refreshHeartbeat(provider, billingActorId, ticketId) + lastHeartbeatAt = Date.now() + } + + await sleep(QUEUE_HEAD_POLL_MS) } } /** - * Wait for actor request-rate capacity. Re-checks the bucket after each refill window - * up to `MAX_QUEUE_WAIT_MS`. Returns `{ rateLimited: false }` once a token has been - * consumed (the underlying check is consume-on-success, matching the original behavior). + * Wait for actor request-rate capacity. Called once we're at the head of the FIFO + * queue, so other callers can't race us for the next token — they're blocked behind us + * at queue level. Re-checks the bucket up to the remaining `MAX_QUEUE_WAIT_MS` budget + * (accounting for time already spent waiting in the queue). */ private async waitForActorCapacity( provider: string, billingActorId: string, - config: HostedKeyRateLimitConfig + ticketId: string, + config: HostedKeyRateLimitConfig, + startedAt: number ): Promise<{ rateLimited: false } | { rateLimited: true; retryAfterMs: number }> { - const startedAt = Date.now() - let attempts = 0 + let lastHeartbeatAt = Date.now() while (true) { const result = await this.checkActorRateLimit(provider, billingActorId, config) - attempts++ - - if (!result) { - if (attempts > 1) { - PlatformEvents.hostedKeyQueueWaited({ - provider, - workspaceId: billingActorId, - waitedMs: Date.now() - startedAt, - attempts, - reason: 'actor_requests', - }) - } - return { rateLimited: false } - } + if (!result) return { rateLimited: false } const elapsed = Date.now() - startedAt const remaining = MAX_QUEUE_WAIT_MS - elapsed @@ -357,6 +406,11 @@ export class HostedKeyRateLimiter { return { rateLimited: true, retryAfterMs: result.retryAfterMs } } + if (Date.now() - lastHeartbeatAt >= HEARTBEAT_REFRESH_INTERVAL_MS) { + await this.queue.refreshHeartbeat(provider, billingActorId, ticketId) + lastHeartbeatAt = Date.now() + } + const sleepMs = Math.max(MIN_QUEUE_RETRY_DELAY_MS, result.retryAfterMs) await sleep(sleepMs) } @@ -370,29 +424,17 @@ export class HostedKeyRateLimiter { private async waitForDimensionCapacity( provider: string, billingActorId: string, - config: CustomRateLimit + ticketId: string, + config: CustomRateLimit, + startedAt: number ): Promise< { rateLimited: false } | { rateLimited: true; retryAfterMs: number; dimension: string } > { - const startedAt = Date.now() - let attempts = 0 + let lastHeartbeatAt = Date.now() while (true) { const result = await this.preCheckDimensions(provider, billingActorId, config) - attempts++ - - if (!result) { - if (attempts > 1) { - PlatformEvents.hostedKeyQueueWaited({ - provider, - workspaceId: billingActorId, - waitedMs: Date.now() - startedAt, - attempts, - reason: 'dimension', - }) - } - return { rateLimited: false } - } + if (!result) return { rateLimited: false } const elapsed = Date.now() - startedAt const remaining = MAX_QUEUE_WAIT_MS - elapsed @@ -411,6 +453,11 @@ export class HostedKeyRateLimiter { } } + if (Date.now() - lastHeartbeatAt >= HEARTBEAT_REFRESH_INTERVAL_MS) { + await this.queue.refreshHeartbeat(provider, billingActorId, ticketId) + lastHeartbeatAt = Date.now() + } + const sleepMs = Math.max(MIN_QUEUE_RETRY_DELAY_MS, result.retryAfterMs) await sleep(sleepMs) } diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts b/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts new file mode 100644 index 00000000000..7405c1c5e61 --- /dev/null +++ b/apps/sim/lib/core/rate-limiter/hosted-key/queue.test.ts @@ -0,0 +1,226 @@ +import { redisConfigMock, redisConfigMockFns } from '@sim/testing' +import { beforeEach, describe, expect, it, type Mock, vi } from 'vitest' +import { HostedKeyQueue } from './queue' + +vi.mock('@/lib/core/config/redis', () => redisConfigMock) + +interface MockPipeline { + rpush: Mock + expire: Mock + set: Mock + lrem: Mock + del: Mock + exec: Mock +} + +interface MockRedis { + multi: Mock + set: Mock + eval: Mock + pipeline: MockPipeline +} + +function createFakeRedis(): MockRedis { + const pipeline: MockPipeline = { + rpush: vi.fn(), + expire: vi.fn(), + set: vi.fn(), + lrem: vi.fn(), + del: vi.fn(), + exec: vi.fn(), + } + // Pipeline methods return the pipeline for chaining. + pipeline.rpush.mockReturnValue(pipeline) + pipeline.expire.mockReturnValue(pipeline) + pipeline.set.mockReturnValue(pipeline) + pipeline.lrem.mockReturnValue(pipeline) + pipeline.del.mockReturnValue(pipeline) + + return { + multi: vi.fn(() => pipeline), + set: vi.fn(), + eval: vi.fn(), + pipeline, + } +} + +const provider = 'exa' +const workspaceId = 'workspace-1' +const ticketId = 'ticket-1' + +describe('HostedKeyQueue', () => { + let queue: HostedKeyQueue + let mockRedis: MockRedis + + beforeEach(() => { + vi.clearAllMocks() + mockRedis = createFakeRedis() + redisConfigMockFns.mockGetRedisClient.mockReturnValue(mockRedis) + queue = new HostedKeyQueue() + }) + + describe('enqueue', () => { + it('returns position 0 when first in line', async () => { + // RPUSH returns new list length; first push -> 1. + mockRedis.pipeline.exec.mockResolvedValueOnce([ + [null, 1], + [null, 1], + [null, 'OK'], + ]) + + const result = await queue.enqueue(provider, workspaceId, ticketId) + + expect(result).toEqual({ position: 0, enabled: true }) + expect(mockRedis.pipeline.rpush).toHaveBeenCalledWith( + 'hosted-queue:exa:workspace-1', + ticketId + ) + expect(mockRedis.pipeline.set).toHaveBeenCalledWith( + 'hosted-queue-tkt:exa:workspace-1:ticket-1', + '1', + 'EX', + expect.any(Number) + ) + }) + + it('returns higher position when others are ahead', async () => { + // Length 5 after push -> position 4. + mockRedis.pipeline.exec.mockResolvedValueOnce([ + [null, 5], + [null, 1], + [null, 'OK'], + ]) + + const result = await queue.enqueue(provider, workspaceId, ticketId) + + expect(result.position).toBe(4) + }) + + it('falls back to enabled=false when Redis is unavailable', async () => { + redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null) + + const result = await queue.enqueue(provider, workspaceId, ticketId) + + expect(result).toEqual({ position: 0, enabled: false }) + }) + + it('falls back to enabled=false on Redis error', async () => { + mockRedis.pipeline.exec.mockRejectedValueOnce(new Error('connection lost')) + + const result = await queue.enqueue(provider, workspaceId, ticketId) + + expect(result.enabled).toBe(false) + }) + }) + + describe('checkHead', () => { + it('returns "head" when our ticket is at the head', async () => { + mockRedis.eval.mockResolvedValueOnce('head') + + const status = await queue.checkHead(provider, workspaceId, ticketId) + + expect(status).toBe('head') + }) + + it('returns "waiting" when someone else is the head', async () => { + mockRedis.eval.mockResolvedValueOnce('waiting') + + const status = await queue.checkHead(provider, workspaceId, ticketId) + + expect(status).toBe('waiting') + }) + + it('returns "missing" when our ticket is not in the queue', async () => { + mockRedis.eval.mockResolvedValueOnce('missing') + + const status = await queue.checkHead(provider, workspaceId, ticketId) + + expect(status).toBe('missing') + }) + + it('passes queue list key, heartbeat prefix, and ticketId to the Lua script', async () => { + mockRedis.eval.mockResolvedValueOnce('head') + + await queue.checkHead(provider, workspaceId, ticketId) + + expect(mockRedis.eval).toHaveBeenCalledWith( + expect.stringContaining('lindex'), + 1, + 'hosted-queue:exa:workspace-1', + 'hosted-queue-tkt:exa:workspace-1:', + ticketId + ) + }) + + it('fails open to "head" on Redis error so callers do not hang', async () => { + mockRedis.eval.mockRejectedValueOnce(new Error('boom')) + + const status = await queue.checkHead(provider, workspaceId, ticketId) + + expect(status).toBe('head') + }) + + it('returns "head" no-op when Redis is unavailable', async () => { + redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null) + + const status = await queue.checkHead(provider, workspaceId, ticketId) + + expect(status).toBe('head') + }) + }) + + describe('refreshHeartbeat', () => { + it('writes the heartbeat key with TTL', async () => { + mockRedis.set.mockResolvedValueOnce('OK') + + await queue.refreshHeartbeat(provider, workspaceId, ticketId) + + expect(mockRedis.set).toHaveBeenCalledWith( + 'hosted-queue-tkt:exa:workspace-1:ticket-1', + '1', + 'EX', + expect.any(Number) + ) + }) + + it('is a no-op when Redis is unavailable', async () => { + redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null) + + await expect(queue.refreshHeartbeat(provider, workspaceId, ticketId)).resolves.toBeUndefined() + expect(mockRedis.set).not.toHaveBeenCalled() + }) + }) + + describe('dequeue', () => { + it('removes the ticket from the list and deletes the heartbeat', async () => { + mockRedis.pipeline.exec.mockResolvedValueOnce([ + [null, 1], + [null, 1], + ]) + + await queue.dequeue(provider, workspaceId, ticketId) + + expect(mockRedis.pipeline.lrem).toHaveBeenCalledWith( + 'hosted-queue:exa:workspace-1', + 1, + ticketId + ) + expect(mockRedis.pipeline.del).toHaveBeenCalledWith( + 'hosted-queue-tkt:exa:workspace-1:ticket-1' + ) + }) + + it('is a no-op when Redis is unavailable', async () => { + redisConfigMockFns.mockGetRedisClient.mockReturnValueOnce(null) + + await expect(queue.dequeue(provider, workspaceId, ticketId)).resolves.toBeUndefined() + expect(mockRedis.multi).not.toHaveBeenCalled() + }) + + it('swallows errors so callers do not throw on cleanup', async () => { + mockRedis.pipeline.exec.mockRejectedValueOnce(new Error('connection lost')) + + await expect(queue.dequeue(provider, workspaceId, ticketId)).resolves.toBeUndefined() + }) + }) +}) diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts b/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts new file mode 100644 index 00000000000..4a7ecd5aed1 --- /dev/null +++ b/apps/sim/lib/core/rate-limiter/hosted-key/queue.ts @@ -0,0 +1,203 @@ +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { getRedisClient } from '@/lib/core/config/redis' + +const logger = createLogger('HostedKeyQueue') + +/** + * Per-ticket heartbeat TTL. Refreshed by the head while it's actively waiting + * on the bucket. If the holder crashes, the heartbeat key expires, and the next + * caller sees the head as dead and removes it (lazy cleanup). + */ +const TICKET_HEARTBEAT_TTL_SECONDS = 30 + +/** How often the head should refresh its heartbeat while waiting. */ +export const HEARTBEAT_REFRESH_INTERVAL_MS = 10_000 + +/** + * TTL on the queue list itself. Set on every enqueue. Prevents abandoned queues + * (whole workspace went silent) from sticking around forever in Redis. + */ +const QUEUE_LIST_TTL_SECONDS = 600 + +const queueListKey = (provider: string, billingActorId: string): string => + `hosted-queue:${provider}:${billingActorId}` + +const heartbeatKey = (provider: string, billingActorId: string, ticketId: string): string => + `hosted-queue-tkt:${provider}:${billingActorId}:${ticketId}` + +/** + * Atomically reap any dead head, then return our ticket's status. Combines what + * would otherwise be 3 round-trips (reap, LINDEX, LPOS) into one EVAL — meaningful + * because callers poll this every ~200ms while waiting in the queue. + * + * `KEYS[1]` = queue list key. `ARGV[1]` = heartbeat key prefix. `ARGV[2]` = our ticketId. + * + * Reaping is bounded: at most one dead head is removed per call. If multiple dead + * tickets pile up at the head, subsequent polls will clean them one by one. This + * keeps the script O(1) rather than O(N) and is sufficient because queue depth + * is bounded by concurrent callers per workspace (typically tens). + * + * Returns one of: "head", "waiting", "missing". + */ +const CHECK_HEAD_SCRIPT = ` +local head = redis.call("lindex", KEYS[1], 0) +if head and redis.call("exists", ARGV[1] .. head) == 0 then + redis.call("lrem", KEYS[1], 1, head) + head = redis.call("lindex", KEYS[1], 0) +end +if not head then + return "missing" +end +if head == ARGV[2] then + return "head" +end +if redis.call("lpos", KEYS[1], ARGV[2]) == false then + return "missing" +end +return "waiting" +` + +export interface EnqueueResult { + /** Position at the moment of enqueue (0 = head, you go next). */ + position: number + /** Whether Redis was available — false means we're in no-op mode. */ + enabled: boolean +} + +/** + * Per-workspace+provider FIFO queue for hosted-key acquisitions. + * + * Callers `enqueue` to claim a position, then `waitForHead` until they're at + * the head, then attempt to consume from the token bucket. On success or cap + * exceeded, they `dequeue` to make room for the next caller. + * + * No-op when Redis is unavailable: every method returns "you're the head / + * empty / etc." so the rate limiter falls back to plain bucket racing. + */ +export class HostedKeyQueue { + /** + * Push a ticket onto the tail of the queue and write a heartbeat. Returns the + * position at enqueue time (0 = head, ready to proceed). + */ + async enqueue( + provider: string, + billingActorId: string, + ticketId: string + ): Promise { + const redis = getRedisClient() + if (!redis) { + return { position: 0, enabled: false } + } + + const listKey = queueListKey(provider, billingActorId) + const hbKey = heartbeatKey(provider, billingActorId, ticketId) + + try { + const pipeline = redis.multi() + pipeline.rpush(listKey, ticketId) + pipeline.expire(listKey, QUEUE_LIST_TTL_SECONDS) + pipeline.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS) + const results = await pipeline.exec() + // results[0] is the rpush response: [err, length] + const length = results?.[0] && typeof results[0][1] === 'number' ? results[0][1] : 1 + // Position is length - 1 (just-pushed at the tail). + return { position: length - 1, enabled: true } + } catch (error) { + logger.warn(`Queue enqueue failed for ${listKey}`, { error: toError(error).message }) + return { position: 0, enabled: false } + } + } + + /** + * Check whether `ticketId` is currently at the head of the queue. If the head + * is a different ticket but its heartbeat has expired (caller crashed), reap + * it and re-check on the next poll. + * + * Returns: + * - "head": you're at the head, proceed to consume from the bucket + * - "waiting": someone else is the head and they're alive + * - "missing": your ticket isn't in the queue at all (e.g. queue list TTL + * expired); caller should re-enqueue or treat as enabled=false + */ + async checkHead( + provider: string, + billingActorId: string, + ticketId: string + ): Promise<'head' | 'waiting' | 'missing'> { + const redis = getRedisClient() + if (!redis) { + return 'head' + } + + const listKey = queueListKey(provider, billingActorId) + const hbPrefix = `hosted-queue-tkt:${provider}:${billingActorId}:` + + try { + const result = (await redis.eval(CHECK_HEAD_SCRIPT, 1, listKey, hbPrefix, ticketId)) as + | 'head' + | 'waiting' + | 'missing' + return result + } catch (error) { + logger.warn(`Queue checkHead failed for ${listKey}`, { error: toError(error).message }) + // Fail-open: treat as head so the caller proceeds rather than hanging. + return 'head' + } + } + + /** + * Refresh the ticket's heartbeat. Called periodically by the head while it's + * waiting on the bucket so it doesn't get reaped as dead. + */ + async refreshHeartbeat( + provider: string, + billingActorId: string, + ticketId: string + ): Promise { + const redis = getRedisClient() + if (!redis) return + + const hbKey = heartbeatKey(provider, billingActorId, ticketId) + try { + await redis.set(hbKey, '1', 'EX', TICKET_HEARTBEAT_TTL_SECONDS) + } catch (error) { + logger.warn(`Queue heartbeat refresh failed for ${hbKey}`, { + error: toError(error).message, + }) + } + } + + /** + * Remove a ticket from the queue and its heartbeat key. Best-effort; safe to + * call multiple times. LREM count=1 removes at most one matching entry. + */ + async dequeue(provider: string, billingActorId: string, ticketId: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const listKey = queueListKey(provider, billingActorId) + const hbKey = heartbeatKey(provider, billingActorId, ticketId) + try { + const pipeline = redis.multi() + pipeline.lrem(listKey, 1, ticketId) + pipeline.del(hbKey) + await pipeline.exec() + } catch (error) { + logger.warn(`Queue dequeue failed for ${listKey}`, { error: toError(error).message }) + } + } +} + +let cachedQueue: HostedKeyQueue | null = null + +export function getHostedKeyQueue(): HostedKeyQueue { + if (!cachedQueue) { + cachedQueue = new HostedKeyQueue() + } + return cachedQueue +} + +export function resetHostedKeyQueue(): void { + cachedQueue = null +} diff --git a/apps/sim/lib/core/telemetry.ts b/apps/sim/lib/core/telemetry.ts index ce97a94e8a4..d06293b18c1 100644 --- a/apps/sim/lib/core/telemetry.ts +++ b/apps/sim/lib/core/telemetry.ts @@ -1003,16 +1003,18 @@ export const PlatformEvents = { }, /** - * Track a successful hosted-key acquisition that had to wait for capacity. - * Fires after the actor or dimension bucket refilled enough for the call to proceed. + * Track a successful hosted-key acquisition that had to wait — either for a slot at + * the head of the FIFO queue, or for the actor/dimension bucket to refill once at the + * head. `queuePosition` is the position at the moment of enqueue (0 = ready to proceed). */ hostedKeyQueueWaited: (attrs: { provider: string workspaceId: string waitedMs: number attempts: number - reason: 'actor_requests' | 'dimension' + reason: 'actor_requests' | 'dimension' | 'queue_position' dimension?: string + queuePosition?: number }) => { trackPlatformEvent('platform.hosted_key.queue_waited', { 'provider.id': attrs.provider, @@ -1021,6 +1023,7 @@ export const PlatformEvents = { 'queue.attempts': attrs.attempts, 'queue.reason': attrs.reason, ...(attrs.dimension && { 'queue.dimension': attrs.dimension }), + ...(attrs.queuePosition != null && { 'queue.position': attrs.queuePosition }), }) }, @@ -1031,7 +1034,7 @@ export const PlatformEvents = { provider: string workspaceId: string waitedMs: number - reason: 'actor_requests' | 'dimension' + reason: 'actor_requests' | 'dimension' | 'queue_position' dimension?: string }) => { trackPlatformEvent('platform.hosted_key.queue_wait_exceeded', { diff --git a/apps/sim/tools/exa/search.ts b/apps/sim/tools/exa/search.ts index c0dfe2baf2f..9d495332e43 100644 --- a/apps/sim/tools/exa/search.ts +++ b/apps/sim/tools/exa/search.ts @@ -102,7 +102,7 @@ export const searchTool: ToolConfig = { }, rateLimit: { mode: 'per_request', - requestsPerMinute: 5, + requestsPerMinute: 60, }, },