Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions lib/internal/streams/iter/classic.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

const {
ArrayIsArray,
ArrayPrototypePush,
MathMax,
NumberMAX_SAFE_INTEGER,
Promise,
Expand Down Expand Up @@ -107,14 +108,14 @@ async function normalizeBatch(raw) {
for (let i = 0; i < raw.length; i++) {
const value = raw[i];
if (isUint8Array(value)) {
batch.push(value);
ArrayPrototypePush(batch, value);
} else {
// normalizeAsyncValue may await for async protocols (e.g.
// toAsyncStreamable on yielded objects). Stream events during
// the suspension are queued, not lost -- errors will surface
// on the next loop iteration after this yield completes.
for await (const normalized of normalizeAsyncValue(value)) {
batch.push(normalized);
ArrayPrototypePush(batch, normalized);
}
}
}
Expand Down Expand Up @@ -163,7 +164,7 @@ async function* createBatchedAsyncIterator(stream, normalize) {
stream._readableState?.length > 0) {
const c = stream.read();
if (c === null) break;
batch.push(c);
ArrayPrototypePush(batch, c);
}
if (normalize !== null) {
const result = await normalize(batch);
Expand Down Expand Up @@ -495,7 +496,7 @@ function fromWritable(writable, options = kNullPrototype) {

function waitForDrain() {
const { promise, resolve, reject } = PromiseWithResolvers();
waiters.push({ __proto__: null, resolve, reject });
ArrayPrototypePush(waiters, { __proto__: null, resolve, reject });
installListeners();
return promise;
}
Expand Down Expand Up @@ -686,7 +687,7 @@ function fromWritable(writable, options = kNullPrototype) {
return PromiseResolve(true);
}
const { promise, resolve } = PromiseWithResolvers();
waiters.push({
ArrayPrototypePush(waiters, {
__proto__: null,
resolve() { resolve(true); },
reject() { resolve(false); },
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/streams/iter/consumers.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const {
ArrayBufferPrototypeSlice,
ArrayPrototypeMap,
ArrayPrototypePush,
ArrayPrototypeShift,
ArrayPrototypeSlice,
Promise,
PromisePrototypeThen,
Expand Down Expand Up @@ -477,7 +478,7 @@ function merge(...args) {

// Drain ready queue synchronously
while (ready.length > 0) {
const item = ready.shift();
const item = ArrayPrototypeShift(ready);
if (item?.error) {
throw item.error;
}
Expand Down
21 changes: 13 additions & 8 deletions lib/internal/streams/iter/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ const {

const {
isAnyArrayBuffer,
isDataView,
isPromise,
isTypedArray,
isUint8Array,
} = require('internal/util/types');

Expand Down Expand Up @@ -106,17 +106,21 @@ function primitiveToUint8Array(chunk) {
return chunk;
}
// Other ArrayBufferView types (Int8Array, DataView, etc.)
if (isDataView(chunk)) {
return arrayBufferViewToUint8Array(chunk);
}

function arrayBufferViewToUint8Array(chunk) {
if (isTypedArray(chunk)) {
return new Uint8Array(
DataViewPrototypeGetBuffer(chunk),
DataViewPrototypeGetByteOffset(chunk),
DataViewPrototypeGetByteLength(chunk),
TypedArrayPrototypeGetBuffer(chunk),
TypedArrayPrototypeGetByteOffset(chunk),
TypedArrayPrototypeGetByteLength(chunk),
);
}
return new Uint8Array(
TypedArrayPrototypeGetBuffer(chunk),
TypedArrayPrototypeGetByteOffset(chunk),
TypedArrayPrototypeGetByteLength(chunk),
DataViewPrototypeGetBuffer(chunk),
DataViewPrototypeGetByteOffset(chunk),
DataViewPrototypeGetByteLength(chunk),
);
}

Expand Down Expand Up @@ -580,6 +584,7 @@ function from(input) {
// =============================================================================

module.exports = {
arrayBufferViewToUint8Array,
from,
fromSync,
isAsyncIterable,
Expand Down
30 changes: 15 additions & 15 deletions lib/internal/streams/iter/pull.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ const {
const { AbortController } = require('internal/abort_controller');

const {
arrayBufferViewToUint8Array,
from,
fromSync,
primitiveToUint8Array,
isSyncIterable,
isAsyncIterable,
isUint8ArrayBatch,
Expand Down Expand Up @@ -136,7 +136,7 @@ function* flattenTransformYieldSync(value) {
return;
}
if (ArrayBufferIsView(value)) {
yield primitiveToUint8Array(value);
yield arrayBufferViewToUint8Array(value);
return;
}
// Must be Iterable<TransformYield>
Expand Down Expand Up @@ -170,7 +170,7 @@ async function* flattenTransformYieldAsync(value) {
return;
}
if (ArrayBufferIsView(value)) {
yield primitiveToUint8Array(value);
yield arrayBufferViewToUint8Array(value);
return;
}
// Check for async iterable first
Expand All @@ -180,10 +180,10 @@ async function* flattenTransformYieldAsync(value) {
}
return;
}
// Must be sync Iterable<TransformYield>
// Must be sync Iterable<TransformYield>, no nested async iterables
if (isSyncIterable(value)) {
for (const item of value) {
yield* flattenTransformYieldAsync(item);
yield* flattenTransformYieldSync(item);
}
return;
}
Expand Down Expand Up @@ -218,7 +218,7 @@ function* processTransformResultSync(result) {
return;
}
if (ArrayBufferIsView(result)) {
yield [primitiveToUint8Array(result)];
yield [arrayBufferViewToUint8Array(result)];
return;
}
// Uint8Array[] batch
Expand Down Expand Up @@ -278,7 +278,7 @@ async function* processTransformResultAsync(result) {
return;
}
if (ArrayBufferIsView(result)) {
yield [primitiveToUint8Array(result)];
yield [arrayBufferViewToUint8Array(result)];
return;
}
// Uint8Array[] batch
Expand Down Expand Up @@ -313,7 +313,9 @@ async function* processTransformResultAsync(result) {
ArrayPrototypePush(batch, item);
continue;
}
for await (const chunk of flattenTransformYieldAsync(item)) {
// Note: This iteration is synchronous, since async iterables
// may not be nested within sync iterables.
for (const chunk of flattenTransformYieldSync(item)) {
ArrayPrototypePush(batch, chunk);
}
}
Expand Down Expand Up @@ -366,7 +368,7 @@ function* applyFusedStatelessSyncTransforms(source, run) {
} else if (isAnyArrayBuffer(current)) {
yield [new Uint8Array(current)];
} else if (ArrayBufferIsView(current)) {
yield [primitiveToUint8Array(current)];
yield [arrayBufferViewToUint8Array(current)];
} else {
yield* processTransformResultSync(current);
}
Expand Down Expand Up @@ -428,7 +430,7 @@ function* createSyncPipeline(source, transforms) {
}
current = applyStatefulSyncTransform(current, transform.transform);
} else {
statelessRun.push(transform);
ArrayPrototypePush(statelessRun, transform);
}
}
if (statelessRun.length > 0) {
Expand Down Expand Up @@ -490,7 +492,7 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) {
} else if (isAnyArrayBuffer(current)) {
yield [new Uint8Array(current)];
} else if (ArrayBufferIsView(current)) {
yield [primitiveToUint8Array(current)];
yield [arrayBufferViewToUint8Array(current)];
} else {
yield* processTransformResultAsync(current);
}
Expand Down Expand Up @@ -531,9 +533,7 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) {
* @yields {Uint8Array[]}
*/
async function* withFlushAsync(source) {
for await (const batch of source) {
yield batch;
}
yield* source;
yield null;
}

Expand Down Expand Up @@ -647,7 +647,7 @@ async function* createAsyncPipeline(source, transforms, signal) {
current, transform.transform, opts);
}
} else {
statelessRun.push(transform);
ArrayPrototypePush(statelessRun, transform);
}
}
// Flush remaining stateless run
Expand Down
45 changes: 27 additions & 18 deletions lib/internal/streams/iter/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const {
TypedArrayPrototypeGetBuffer,
TypedArrayPrototypeGetByteLength,
TypedArrayPrototypeGetByteOffset,
TypedArrayPrototypeSet,
Uint8Array,
} = primordials;

Expand All @@ -24,8 +25,6 @@ const {
} = require('internal/errors');
const { isError } = require('internal/util');

const { Buffer } = require('buffer');

const { isSharedArrayBuffer, isUint8Array } = require('internal/util/types');

const { validateOneOf } = require('internal/validators');
Expand Down Expand Up @@ -127,26 +126,36 @@ function concatBytes(chunks) {
if (chunks.length === 0) {
return new Uint8Array(0);
}
// Single chunk: return directly if it covers the entire backing buffer
// Single chunk: return directly if it covers the entire backing buffer,
// otherwise return a copy
if (chunks.length === 1) {
const chunk = chunks[0];
const buf = TypedArrayPrototypeGetBuffer(chunk);
// SharedArrayBuffer is not available in primordials, so use
// direct property access for its byteLength.
const bufByteLength = isSharedArrayBuffer(buf) ?
buf.byteLength :
ArrayBufferPrototypeGetByteLength(buf);
if (TypedArrayPrototypeGetByteOffset(chunk) === 0 &&
TypedArrayPrototypeGetByteLength(chunk) === bufByteLength) {
return chunk;
// If non-zero offset, skip the remaining buffer checks.
if (TypedArrayPrototypeGetByteOffset(chunk) === 0) {
const buf = TypedArrayPrototypeGetBuffer(chunk);
// SharedArrayBuffer is not available in primordials, so use
// direct property access for its byteLength.
const bufByteLength = isSharedArrayBuffer(buf) ?
buf.byteLength :
ArrayBufferPrototypeGetByteLength(buf);
if (TypedArrayPrototypeGetByteLength(chunk) === bufByteLength) {
return chunk;
}
}
return new Uint8Array(chunk);
}
// Multiple chunks: concatenate
let totalByteLength = 0;
for (let i = 0; i < chunks.length; i++) {
totalByteLength += TypedArrayPrototypeGetByteLength(chunks[i]);
}
const concatenated = new Uint8Array(totalByteLength);
let offset = 0;
for (let i = 0; i < chunks.length; i++) {
TypedArrayPrototypeSet(concatenated, chunks[i], offset);
offset += TypedArrayPrototypeGetByteLength(chunks[i]);
}
// Multiple chunks or shared buffer: concatenate
const buf = Buffer.concat(chunks);
return new Uint8Array(
TypedArrayPrototypeGetBuffer(buf),
TypedArrayPrototypeGetByteOffset(buf),
TypedArrayPrototypeGetByteLength(buf));
return concatenated;
}

/**
Expand Down
Loading