Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ import type {
TokenStatus,
} from '@/lib/core/rate-limiter/storage'
import { HostedKeyRateLimiter } from './hosted-key-rate-limiter'
import type { HostedKeyQueue } from './queue'
import type { CustomRateLimit, PerRequestRateLimit } from './types'

/** Force the queue wait to give up on the first iteration by reporting a retry time
* larger than the 5-minute MAX_QUEUE_WAIT_MS cap. */
const RETRY_PAST_CAP_MS = 6 * 60 * 1000

interface MockAdapter {
consumeTokens: Mock
getTokenStatus: Mock
Expand All @@ -19,10 +24,30 @@ const createMockAdapter = (): MockAdapter => ({
resetBucket: vi.fn(),
})

interface MockQueue {
enqueue: Mock
checkHead: Mock
refreshHeartbeat: Mock
dequeue: Mock
}

/** Stub queue that defaults to "you're at the head, no waiting" — i.e. acts as if the
* queue is empty or Redis is unavailable. Tests override per-call to simulate ordering. */
const createMockQueue = (): MockQueue => {
const queue: MockQueue = {
enqueue: vi.fn().mockResolvedValue({ position: 0, enabled: true }),
checkHead: vi.fn().mockResolvedValue('head'),
refreshHeartbeat: vi.fn().mockResolvedValue(undefined),
dequeue: vi.fn().mockResolvedValue(undefined),
}
return queue
}

describe('HostedKeyRateLimiter', () => {
const testProvider = 'exa'
const envKeyPrefix = 'EXA_API_KEY'
let mockAdapter: MockAdapter
let mockQueue: MockQueue
let rateLimiter: HostedKeyRateLimiter
let originalEnv: NodeJS.ProcessEnv

Expand All @@ -34,7 +59,11 @@ describe('HostedKeyRateLimiter', () => {
beforeEach(() => {
vi.clearAllMocks()
mockAdapter = createMockAdapter()
rateLimiter = new HostedKeyRateLimiter(mockAdapter as RateLimitStorageAdapter)
mockQueue = createMockQueue()
rateLimiter = new HostedKeyRateLimiter(
mockAdapter as RateLimitStorageAdapter,
mockQueue as unknown as HostedKeyQueue
)

originalEnv = { ...process.env }
process.env.EXA_API_KEY_COUNT = '3'
Expand Down Expand Up @@ -72,11 +101,12 @@ describe('HostedKeyRateLimiter', () => {
expect(result.error).toContain('No hosted keys configured')
})

it('should rate limit billing actor when they exceed their limit', async () => {
it('should rate limit billing actor when wait exceeds the queue cap', async () => {
// resetAt past the 5-minute cap forces the wait loop to bail immediately.
const rateLimitedResult: ConsumeResult = {
allowed: false,
tokensRemaining: 0,
resetAt: new Date(Date.now() + 30000),
resetAt: new Date(Date.now() + RETRY_PAST_CAP_MS),
}
mockAdapter.consumeTokens.mockResolvedValue(rateLimitedResult)

Expand All @@ -93,6 +123,33 @@ describe('HostedKeyRateLimiter', () => {
expect(result.error).toContain('Rate limit exceeded')
})

it('should wait for capacity then succeed when bucket refills within the cap', async () => {
// First call: bucket empty, refills in 100ms (well under cap).
// Second call: bucket has capacity, consumed.
const blocked: ConsumeResult = {
allowed: false,
tokensRemaining: 0,
resetAt: new Date(Date.now() + 100),
}
const allowed: ConsumeResult = {
allowed: true,
tokensRemaining: 9,
resetAt: new Date(Date.now() + 60000),
}
mockAdapter.consumeTokens.mockResolvedValueOnce(blocked).mockResolvedValueOnce(allowed)

const result = await rateLimiter.acquireKey(
testProvider,
envKeyPrefix,
perRequestRateLimit,
'workspace-wait'
)

expect(result.success).toBe(true)
expect(result.key).toBe('test-key-1')
expect(mockAdapter.consumeTokens).toHaveBeenCalledTimes(2)
})

it('should allow billing actor within their rate limit', async () => {
const allowedResult: ConsumeResult = {
allowed: true,
Expand Down Expand Up @@ -184,6 +241,135 @@ describe('HostedKeyRateLimiter', () => {
})
})

describe('FIFO queue ordering', () => {
const allowed: ConsumeResult = {
allowed: true,
tokensRemaining: 9,
resetAt: new Date(Date.now() + 60000),
}

it('enqueues every call onto the per-workspace+provider queue', async () => {
mockAdapter.consumeTokens.mockResolvedValue(allowed)

await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1')

expect(mockQueue.enqueue).toHaveBeenCalledWith(
testProvider,
'workspace-1',
expect.any(String)
)
})

it('always dequeues at the end of a successful acquisition', async () => {
mockAdapter.consumeTokens.mockResolvedValue(allowed)

await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1')

expect(mockQueue.dequeue).toHaveBeenCalledWith(
testProvider,
'workspace-1',
expect.any(String)
)
})

it('always dequeues even when the call fails (no keys configured)', async () => {
mockAdapter.consumeTokens.mockResolvedValue(allowed)
process.env.EXA_API_KEY_COUNT = '0'

await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1')

expect(mockQueue.dequeue).toHaveBeenCalled()
})

it('waits at the head of the queue before consuming from the bucket', async () => {
mockAdapter.consumeTokens.mockResolvedValue(allowed)
// First two checkHead calls say we're waiting; third says we're up.
mockQueue.checkHead
.mockResolvedValueOnce('waiting')
.mockResolvedValueOnce('waiting')
.mockResolvedValueOnce('head')

const result = await rateLimiter.acquireKey(
testProvider,
envKeyPrefix,
perRequestRateLimit,
'workspace-1'
)

expect(result.success).toBe(true)
expect(mockQueue.checkHead).toHaveBeenCalledTimes(3)
// Bucket is only consumed once we reach the head.
expect(mockAdapter.consumeTokens).toHaveBeenCalledTimes(1)
})

it('refreshes the heartbeat while waiting at the head of the queue', async () => {
mockAdapter.consumeTokens.mockResolvedValue(allowed)

// We need the wait loop to iterate long enough for HEARTBEAT_REFRESH_INTERVAL_MS
// to elapse. Use fake timers so we don't actually sleep.
vi.useFakeTimers()
try {
// Queue says we're waiting forever — except after some time we're at head.
mockQueue.checkHead.mockImplementation(async () => {
// Advance past the heartbeat interval each time we poll, then say we're up.
vi.advanceTimersByTime(15_000)
return mockQueue.checkHead.mock.calls.length >= 2 ? 'head' : 'waiting'
})

const promise = rateLimiter.acquireKey(
testProvider,
envKeyPrefix,
perRequestRateLimit,
'workspace-1'
)
// Drain pending timers so the sleep() resolves.
await vi.runAllTimersAsync()
await promise

expect(mockQueue.refreshHeartbeat).toHaveBeenCalled()
} finally {
vi.useRealTimers()
}
})

it('returns 429 when the queue wait exceeds the cap', async () => {
mockAdapter.consumeTokens.mockResolvedValue(allowed)
mockQueue.checkHead.mockResolvedValue('waiting')

vi.useFakeTimers()
try {
const promise = rateLimiter.acquireKey(
testProvider,
envKeyPrefix,
perRequestRateLimit,
'workspace-1'
)
// Burn past the 5-minute cap.
await vi.advanceTimersByTimeAsync(6 * 60 * 1000)
const result = await promise

expect(result.success).toBe(false)
expect(result.billingActorRateLimited).toBe(true)
} finally {
vi.useRealTimers()
}
})

it('treats "missing" status as proceed (queue evicted, fall through to bucket race)', async () => {
mockAdapter.consumeTokens.mockResolvedValue(allowed)
mockQueue.checkHead.mockResolvedValueOnce('missing')

const result = await rateLimiter.acquireKey(
testProvider,
envKeyPrefix,
perRequestRateLimit,
'workspace-1'
)

expect(result.success).toBe(true)
})
})

describe('acquireKey with custom rate limit', () => {
const customRateLimit: CustomRateLimit = {
mode: 'custom',
Expand All @@ -197,11 +383,11 @@ describe('HostedKeyRateLimiter', () => {
],
}

it('should enforce requestsPerMinute for custom mode', async () => {
it('should enforce requestsPerMinute for custom mode when wait exceeds the cap', async () => {
const rateLimitedResult: ConsumeResult = {
allowed: false,
tokensRemaining: 0,
resetAt: new Date(Date.now() + 30000),
resetAt: new Date(Date.now() + RETRY_PAST_CAP_MS),
}
mockAdapter.consumeTokens.mockResolvedValue(rateLimitedResult)

Expand Down Expand Up @@ -246,7 +432,7 @@ describe('HostedKeyRateLimiter', () => {
expect(mockAdapter.getTokenStatus).toHaveBeenCalledTimes(1)
})

it('should block request when a dimension is depleted', async () => {
it('should block request when a dimension wait exceeds the cap', async () => {
const allowedConsume: ConsumeResult = {
allowed: true,
tokensRemaining: 4,
Expand All @@ -258,7 +444,7 @@ describe('HostedKeyRateLimiter', () => {
tokensAvailable: 0,
maxTokens: 2000,
lastRefillAt: new Date(),
nextRefillAt: new Date(Date.now() + 45000),
nextRefillAt: new Date(Date.now() + RETRY_PAST_CAP_MS),
}
mockAdapter.getTokenStatus.mockResolvedValue(depleted)

Expand All @@ -274,6 +460,39 @@ describe('HostedKeyRateLimiter', () => {
expect(result.error).toContain('tokens')
})

it('should wait for dimension capacity then succeed when budget refills', async () => {
const allowedConsume: ConsumeResult = {
allowed: true,
tokensRemaining: 4,
resetAt: new Date(Date.now() + 60000),
}
mockAdapter.consumeTokens.mockResolvedValue(allowedConsume)

const depleted: TokenStatus = {
tokensAvailable: 0,
maxTokens: 2000,
lastRefillAt: new Date(),
nextRefillAt: new Date(Date.now() + 100),
}
const refilled: TokenStatus = {
tokensAvailable: 500,
maxTokens: 2000,
lastRefillAt: new Date(),
nextRefillAt: new Date(Date.now() + 60000),
}
mockAdapter.getTokenStatus.mockResolvedValueOnce(depleted).mockResolvedValueOnce(refilled)

const result = await rateLimiter.acquireKey(
testProvider,
envKeyPrefix,
customRateLimit,
'workspace-dim-wait'
)

expect(result.success).toBe(true)
expect(mockAdapter.getTokenStatus).toHaveBeenCalledTimes(2)
})

it('should pre-check all dimensions and block on first depleted one', async () => {
const multiDimensionConfig: CustomRateLimit = {
mode: 'custom',
Expand Down Expand Up @@ -309,7 +528,7 @@ describe('HostedKeyRateLimiter', () => {
tokensAvailable: 0,
maxTokens: 100,
lastRefillAt: new Date(),
nextRefillAt: new Date(Date.now() + 30000),
nextRefillAt: new Date(Date.now() + RETRY_PAST_CAP_MS),
}
mockAdapter.getTokenStatus
.mockResolvedValueOnce(tokensBudget)
Expand Down
Loading
Loading