From 0b2845335b0e32a060b910a121915348f73519dd Mon Sep 17 00:00:00 2001 From: Renegade334 Date: Tue, 5 May 2026 18:53:12 +0200 Subject: [PATCH] stream: minor stream/iter implementation edits Signed-off-by: Renegade334 --- lib/internal/streams/iter/classic.js | 11 ++++--- lib/internal/streams/iter/consumers.js | 3 +- lib/internal/streams/iter/from.js | 21 +++++++----- lib/internal/streams/iter/pull.js | 30 ++++++++--------- lib/internal/streams/iter/utils.js | 45 +++++++++++++++----------- 5 files changed, 63 insertions(+), 47 deletions(-) diff --git a/lib/internal/streams/iter/classic.js b/lib/internal/streams/iter/classic.js index 17aa43f0969202..18d1733d6ad648 100644 --- a/lib/internal/streams/iter/classic.js +++ b/lib/internal/streams/iter/classic.js @@ -13,6 +13,7 @@ const { ArrayIsArray, + ArrayPrototypePush, MathMax, NumberMAX_SAFE_INTEGER, Promise, @@ -107,14 +108,14 @@ async function normalizeBatch(raw) { for (let i = 0; i < raw.length; i++) { const value = raw[i]; if (isUint8Array(value)) { - batch.push(value); + ArrayPrototypePush(batch, value); } else { // normalizeAsyncValue may await for async protocols (e.g. // toAsyncStreamable on yielded objects). Stream events during // the suspension are queued, not lost -- errors will surface // on the next loop iteration after this yield completes. for await (const normalized of normalizeAsyncValue(value)) { - batch.push(normalized); + ArrayPrototypePush(batch, normalized); } } } @@ -163,7 +164,7 @@ async function* createBatchedAsyncIterator(stream, normalize) { stream._readableState?.length > 0) { const c = stream.read(); if (c === null) break; - batch.push(c); + ArrayPrototypePush(batch, c); } if (normalize !== null) { const result = await normalize(batch); @@ -495,7 +496,7 @@ function fromWritable(writable, options = kNullPrototype) { function waitForDrain() { const { promise, resolve, reject } = PromiseWithResolvers(); - waiters.push({ __proto__: null, resolve, reject }); + ArrayPrototypePush(waiters, { __proto__: null, resolve, reject }); installListeners(); return promise; } @@ -686,7 +687,7 @@ function fromWritable(writable, options = kNullPrototype) { return PromiseResolve(true); } const { promise, resolve } = PromiseWithResolvers(); - waiters.push({ + ArrayPrototypePush(waiters, { __proto__: null, resolve() { resolve(true); }, reject() { resolve(false); }, diff --git a/lib/internal/streams/iter/consumers.js b/lib/internal/streams/iter/consumers.js index 6e47d3cf9638fd..442fe95b8e1b85 100644 --- a/lib/internal/streams/iter/consumers.js +++ b/lib/internal/streams/iter/consumers.js @@ -12,6 +12,7 @@ const { ArrayBufferPrototypeSlice, ArrayPrototypeMap, ArrayPrototypePush, + ArrayPrototypeShift, ArrayPrototypeSlice, Promise, PromisePrototypeThen, @@ -477,7 +478,7 @@ function merge(...args) { // Drain ready queue synchronously while (ready.length > 0) { - const item = ready.shift(); + const item = ArrayPrototypeShift(ready); if (item?.error) { throw item.error; } diff --git a/lib/internal/streams/iter/from.js b/lib/internal/streams/iter/from.js index 0533f0e3810398..5ac802a00f2d08 100644 --- a/lib/internal/streams/iter/from.js +++ b/lib/internal/streams/iter/from.js @@ -31,8 +31,8 @@ const { const { isAnyArrayBuffer, - isDataView, isPromise, + isTypedArray, isUint8Array, } = require('internal/util/types'); @@ -106,17 +106,21 @@ function primitiveToUint8Array(chunk) { return chunk; } // Other ArrayBufferView types (Int8Array, DataView, etc.) - if (isDataView(chunk)) { + return arrayBufferViewToUint8Array(chunk); +} + +function arrayBufferViewToUint8Array(chunk) { + if (isTypedArray(chunk)) { return new Uint8Array( - DataViewPrototypeGetBuffer(chunk), - DataViewPrototypeGetByteOffset(chunk), - DataViewPrototypeGetByteLength(chunk), + TypedArrayPrototypeGetBuffer(chunk), + TypedArrayPrototypeGetByteOffset(chunk), + TypedArrayPrototypeGetByteLength(chunk), ); } return new Uint8Array( - TypedArrayPrototypeGetBuffer(chunk), - TypedArrayPrototypeGetByteOffset(chunk), - TypedArrayPrototypeGetByteLength(chunk), + DataViewPrototypeGetBuffer(chunk), + DataViewPrototypeGetByteOffset(chunk), + DataViewPrototypeGetByteLength(chunk), ); } @@ -580,6 +584,7 @@ function from(input) { // ============================================================================= module.exports = { + arrayBufferViewToUint8Array, from, fromSync, isAsyncIterable, diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index a228f052a8e433..b4a7678237f465 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -33,9 +33,9 @@ const { const { AbortController } = require('internal/abort_controller'); const { + arrayBufferViewToUint8Array, from, fromSync, - primitiveToUint8Array, isSyncIterable, isAsyncIterable, isUint8ArrayBatch, @@ -136,7 +136,7 @@ function* flattenTransformYieldSync(value) { return; } if (ArrayBufferIsView(value)) { - yield primitiveToUint8Array(value); + yield arrayBufferViewToUint8Array(value); return; } // Must be Iterable @@ -170,7 +170,7 @@ async function* flattenTransformYieldAsync(value) { return; } if (ArrayBufferIsView(value)) { - yield primitiveToUint8Array(value); + yield arrayBufferViewToUint8Array(value); return; } // Check for async iterable first @@ -180,10 +180,10 @@ async function* flattenTransformYieldAsync(value) { } return; } - // Must be sync Iterable + // Must be sync Iterable, no nested async iterables if (isSyncIterable(value)) { for (const item of value) { - yield* flattenTransformYieldAsync(item); + yield* flattenTransformYieldSync(item); } return; } @@ -218,7 +218,7 @@ function* processTransformResultSync(result) { return; } if (ArrayBufferIsView(result)) { - yield [primitiveToUint8Array(result)]; + yield [arrayBufferViewToUint8Array(result)]; return; } // Uint8Array[] batch @@ -278,7 +278,7 @@ async function* processTransformResultAsync(result) { return; } if (ArrayBufferIsView(result)) { - yield [primitiveToUint8Array(result)]; + yield [arrayBufferViewToUint8Array(result)]; return; } // Uint8Array[] batch @@ -313,7 +313,9 @@ async function* processTransformResultAsync(result) { ArrayPrototypePush(batch, item); continue; } - for await (const chunk of flattenTransformYieldAsync(item)) { + // Note: This iteration is synchronous, since async iterables + // may not be nested within sync iterables. + for (const chunk of flattenTransformYieldSync(item)) { ArrayPrototypePush(batch, chunk); } } @@ -366,7 +368,7 @@ function* applyFusedStatelessSyncTransforms(source, run) { } else if (isAnyArrayBuffer(current)) { yield [new Uint8Array(current)]; } else if (ArrayBufferIsView(current)) { - yield [primitiveToUint8Array(current)]; + yield [arrayBufferViewToUint8Array(current)]; } else { yield* processTransformResultSync(current); } @@ -428,7 +430,7 @@ function* createSyncPipeline(source, transforms) { } current = applyStatefulSyncTransform(current, transform.transform); } else { - statelessRun.push(transform); + ArrayPrototypePush(statelessRun, transform); } } if (statelessRun.length > 0) { @@ -490,7 +492,7 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) { } else if (isAnyArrayBuffer(current)) { yield [new Uint8Array(current)]; } else if (ArrayBufferIsView(current)) { - yield [primitiveToUint8Array(current)]; + yield [arrayBufferViewToUint8Array(current)]; } else { yield* processTransformResultAsync(current); } @@ -531,9 +533,7 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) { * @yields {Uint8Array[]} */ async function* withFlushAsync(source) { - for await (const batch of source) { - yield batch; - } + yield* source; yield null; } @@ -647,7 +647,7 @@ async function* createAsyncPipeline(source, transforms, signal) { current, transform.transform, opts); } } else { - statelessRun.push(transform); + ArrayPrototypePush(statelessRun, transform); } } // Flush remaining stateless run diff --git a/lib/internal/streams/iter/utils.js b/lib/internal/streams/iter/utils.js index 2a156d2828a2e0..0520630b09c4b8 100644 --- a/lib/internal/streams/iter/utils.js +++ b/lib/internal/streams/iter/utils.js @@ -12,6 +12,7 @@ const { TypedArrayPrototypeGetBuffer, TypedArrayPrototypeGetByteLength, TypedArrayPrototypeGetByteOffset, + TypedArrayPrototypeSet, Uint8Array, } = primordials; @@ -24,8 +25,6 @@ const { } = require('internal/errors'); const { isError } = require('internal/util'); -const { Buffer } = require('buffer'); - const { isSharedArrayBuffer, isUint8Array } = require('internal/util/types'); const { validateOneOf } = require('internal/validators'); @@ -127,26 +126,36 @@ function concatBytes(chunks) { if (chunks.length === 0) { return new Uint8Array(0); } - // Single chunk: return directly if it covers the entire backing buffer + // Single chunk: return directly if it covers the entire backing buffer, + // otherwise return a copy if (chunks.length === 1) { const chunk = chunks[0]; - const buf = TypedArrayPrototypeGetBuffer(chunk); - // SharedArrayBuffer is not available in primordials, so use - // direct property access for its byteLength. - const bufByteLength = isSharedArrayBuffer(buf) ? - buf.byteLength : - ArrayBufferPrototypeGetByteLength(buf); - if (TypedArrayPrototypeGetByteOffset(chunk) === 0 && - TypedArrayPrototypeGetByteLength(chunk) === bufByteLength) { - return chunk; + // If non-zero offset, skip the remaining buffer checks. + if (TypedArrayPrototypeGetByteOffset(chunk) === 0) { + const buf = TypedArrayPrototypeGetBuffer(chunk); + // SharedArrayBuffer is not available in primordials, so use + // direct property access for its byteLength. + const bufByteLength = isSharedArrayBuffer(buf) ? + buf.byteLength : + ArrayBufferPrototypeGetByteLength(buf); + if (TypedArrayPrototypeGetByteLength(chunk) === bufByteLength) { + return chunk; + } } + return new Uint8Array(chunk); + } + // Multiple chunks: concatenate + let totalByteLength = 0; + for (let i = 0; i < chunks.length; i++) { + totalByteLength += TypedArrayPrototypeGetByteLength(chunks[i]); + } + const concatenated = new Uint8Array(totalByteLength); + let offset = 0; + for (let i = 0; i < chunks.length; i++) { + TypedArrayPrototypeSet(concatenated, chunks[i], offset); + offset += TypedArrayPrototypeGetByteLength(chunks[i]); } - // Multiple chunks or shared buffer: concatenate - const buf = Buffer.concat(chunks); - return new Uint8Array( - TypedArrayPrototypeGetBuffer(buf), - TypedArrayPrototypeGetByteOffset(buf), - TypedArrayPrototypeGetByteLength(buf)); + return concatenated; } /**