Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions lib/internal/streams/end-of-stream.js
Original file line numberDiff line numberDiff line change
Expand Up@@ -22,20 +22,23 @@ const{
validateBoolean
} = require('internal/validators');

const{Promise } = primordials;
const{Promise, PromisePrototypeThen } = primordials;

const{
isClosed,
isReadable,
isReadableNodeStream,
isReadableStream,
isReadableFinished,
isReadableErrored,
isWritable,
isWritableNodeStream,
isWritableStream,
isWritableFinished,
isWritableErrored,
isNodeStream,
willEmitClose: _willEmitClose,
kIsClosedPromise,
} = require('internal/streams/utils');

function isRequest(stream){
Expand All@@ -58,14 +61,17 @@ function eos(stream, options, callback){

callback = once(callback);

const readable = options.readable ?? isReadableNodeStream(stream);
const writable = options.writable ?? isWritableNodeStream(stream);
if (isReadableStream(stream) || isWritableStream(stream)){
return eosWeb(stream, options, callback);
}
Comment on lines +64 to +66
Copy link
Contributor

@kanongilkanongilApr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These match non-node WebStreams – which I guess is fine, except that eosWeb() will throw a TypeError since the private property kIsClosedPromise will not exist on it. This will in turn mean that the callback is never called.

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in you want eosWeb to additionally have a check to throw TypeError ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what node should do – probably still throw some kind of error.

Currently non-node WebStreams would just throw a TypeError from accessing the promise property on the undefinedkIsClosedPromise property here:

stream[kIsClosedPromise].promise,


if (!isNodeStream(stream)){
// TODO: Webstreams.
throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream);
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
}

const readable = options.readable ?? isReadableNodeStream(stream);
const writable = options.writable ?? isWritableNodeStream(stream);

const wState = stream._writableState;
const rState = stream._readableState;

Expand DownExpand Up@@ -255,6 +261,15 @@ function eos(stream, options, callback){
return cleanup;
}

function eosWeb(stream, opts, callback){
PromisePrototypeThen(
stream[kIsClosedPromise].promise,
() => process.nextTick(() => callback.call(stream)),
(err) => process.nextTick(() => callback.call(stream, err)),
);
return nop;
}

function finished(stream, opts){
let autoCleanup = false;
if (opts === null){
Expand Down
25 changes: 25 additions & 0 deletions lib/internal/streams/utils.js
Original file line numberDiff line numberDiff line change
Expand Up@@ -4,13 +4,16 @@ const{
Symbol,
SymbolAsyncIterator,
SymbolIterator,
SymbolFor,
} = primordials;

const kDestroyed = Symbol('kDestroyed');
const kIsErrored = Symbol('kIsErrored');
const kIsReadable = Symbol('kIsReadable');
const kIsDisturbed = Symbol('kIsDisturbed');

const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');

function isReadableNodeStream(obj, strict = false){
return !!(
obj &&
Expand DownExpand Up@@ -55,6 +58,25 @@ function isNodeStream(obj){
);
}

function isReadableStream(obj){
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.pipeThrough === 'function' &&
typeof obj.getReader === 'function' &&
typeof obj.cancel === 'function'
);
}

function isWritableStream(obj){
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.getWriter === 'function' &&
typeof obj.abort === 'function'
);
}

function isIterable(obj, isAsync){
if (obj == null) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'
Expand DownExpand Up@@ -269,18 +291,21 @@ module.exports ={
kIsErrored,
isReadable,
kIsReadable,
kIsClosedPromise,
isClosed,
isDestroyed,
isDuplexNodeStream,
isFinished,
isIterable,
isReadableNodeStream,
isReadableStream,
isReadableEnded,
isReadableFinished,
isReadableErrored,
isNodeStream,
isWritable,
isWritableNodeStream,
isWritableStream,
isWritableEnded,
isWritableFinished,
isWritableErrored,
Expand Down
14 changes: 11 additions & 3 deletions lib/internal/webstreams/readablestream.js
Original file line numberDiff line numberDiff line change
Expand Up@@ -84,6 +84,7 @@ const{
kIsDisturbed,
kIsErrored,
kIsReadable,
kIsClosedPromise,
} = require('internal/streams/utils');

const{
Expand DownExpand Up@@ -231,9 +232,11 @@ class ReadableStream{
port1: undefined,
port2: undefined,
promise: undefined,
}
},
};

this[kIsClosedPromise] = createDeferredPromise();

// The spec requires handling of the strategy first
// here. Specifically, if getting the size and
// highWaterMark from the strategy fail, that has
Expand DownExpand Up@@ -625,8 +628,9 @@ function TransferredReadableStream(){
writable: undefined,
port: undefined,
promise: undefined,
}
},
};
this[kIsClosedPromise] = createDeferredPromise();
},
[], ReadableStream));
}
Expand DownExpand Up@@ -1195,8 +1199,9 @@ function createTeeReadableStream(start, pull, cancel){
writable: undefined,
port: undefined,
promise: undefined,
}
},
};
this[kIsClosedPromise] = createDeferredPromise();
setupReadableStreamDefaultControllerFromSource(
this,
ObjectCreate(null,{
Expand DownExpand Up@@ -1869,6 +1874,7 @@ function readableStreamCancel(stream, reason){
function readableStreamClose(stream){
assert(stream[kState].state === 'readable');
stream[kState].state = 'closed'
stream[kIsClosedPromise].resolve();

const{
reader,
Expand All@@ -1890,6 +1896,8 @@ function readableStreamError(stream, error){
assert(stream[kState].state === 'readable');
stream[kState].state = 'errored'
stream[kState].storedError = error;
stream[kIsClosedPromise].reject(error);
setPromiseHandled(stream[kIsClosedPromise].promise);

const{
reader
Expand Down
14 changes: 13 additions & 1 deletion lib/internal/webstreams/writablestream.js
Original file line numberDiff line numberDiff line change
Expand Up@@ -67,6 +67,10 @@ const{
kState,
} = require('internal/webstreams/util');

const{
kIsClosedPromise,
} = require('internal/streams/utils');

const{
AbortController,
} = require('internal/abort_controller');
Expand DownExpand Up@@ -175,9 +179,11 @@ class WritableStream{
port1: undefined,
port2: undefined,
promise: undefined,
}
},
};

this[kIsClosedPromise] = createDeferredPromise();

const size = extractSizeAlgorithm(strategy?.size);
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);

Expand DownExpand Up@@ -347,6 +353,7 @@ function TransferredWritableStream(){
readable: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
},
[], WritableStream));
}
Expand DownExpand Up@@ -726,6 +733,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream){
resolve: undefined,
};
}

stream[kIsClosedPromise].reject(stream[kState]?.storedError);
setPromiseHandled(stream[kIsClosedPromise].promise);

const{
writer,
} = stream[kState];
Expand DownExpand Up@@ -839,6 +850,7 @@ function writableStreamFinishInFlightClose(stream){
stream[kState].state = 'closed'
if (stream[kState].writer !== undefined)
stream[kState].writer[kState].close.resolve?.();
stream[kIsClosedPromise].resolve?.();
assert(stream[kState].pendingAbortRequest.abort.promise === undefined);
assert(stream[kState].storedError === undefined);
}
Expand Down
Loading