Skip to content

Commit b29d927

Browse files
rluvatonruyadorno
authored andcommitted
stream: improve readable webstream pipeTo
PR-URL: #49690 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Yagiz Nizipli <[email protected]> Reviewed-By: Moshe Atlow <[email protected]>
1 parent a304d1e commit b29d927

File tree

1 file changed

+34
-17
lines changed

1 file changed

+34
-17
lines changed

‎lib/internal/webstreams/readablestream.js‎

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ const{
1414
ObjectCreate,
1515
ObjectDefineProperties,
1616
ObjectSetPrototypeOf,
17-
Promise,
1817
PromisePrototypeThen,
1918
PromiseResolve,
2019
PromiseReject,
@@ -1354,7 +1353,9 @@ function readableStreamPipeTo(
13541353

13551354
constpromise=createDeferredPromise();
13561355

1357-
letcurrentWrite=PromiseResolve();
1356+
conststate={
1357+
currentWrite: PromiseResolve(),
1358+
};
13581359

13591360
// The error here can be undefined. The rejected arg
13601361
// tells us that the promise must be rejected even
@@ -1371,9 +1372,9 @@ function readableStreamPipeTo(
13711372
}
13721373

13731374
asyncfunctionwaitForCurrentWrite(){
1374-
constwrite=currentWrite;
1375+
constwrite=state.currentWrite;
13751376
awaitwrite;
1376-
if(write!==currentWrite)
1377+
if(write!==state.currentWrite)
13771378
awaitwaitForCurrentWrite();
13781379
}
13791380

@@ -1464,20 +1465,14 @@ function readableStreamPipeTo(
14641465
asyncfunctionstep(){
14651466
if(shuttingDown)
14661467
returntrue;
1468+
14671469
awaitwriter[kState].ready.promise;
1468-
returnnewPromise((resolve,reject)=>{
1469-
readableStreamDefaultReaderRead(
1470-
reader,
1471-
{
1472-
[kChunk](chunk){
1473-
currentWrite=writableStreamDefaultWriterWrite(writer,chunk);
1474-
setPromiseHandled(currentWrite);
1475-
resolve(false);
1476-
},
1477-
[kClose]: ()=>resolve(true),
1478-
[kError]: reject,
1479-
});
1480-
});
1470+
1471+
constpromise=createDeferredPromise();
1472+
// eslint-disable-next-line no-use-before-define
1473+
readableStreamDefaultReaderRead(reader,newPipeToReadableStreamReadRequest(writer,state,promise));
1474+
1475+
returnpromise.promise;
14811476
}
14821477

14831478
asyncfunctionrun(){
@@ -1539,6 +1534,28 @@ function readableStreamPipeTo(
15391534
returnpromise.promise;
15401535
}
15411536

1537+
classPipeToReadableStreamReadRequest{
1538+
constructor(writer,state,promise){
1539+
this.writer=writer;
1540+
this.state=state;
1541+
this.promise=promise;
1542+
}
1543+
1544+
[kChunk](chunk){
1545+
this.state.currentWrite=writableStreamDefaultWriterWrite(this.writer,chunk);
1546+
setPromiseHandled(this.state.currentWrite);
1547+
this.promise.resolve(false);
1548+
}
1549+
1550+
[kClose](){
1551+
this.promise.resolve(true);
1552+
}
1553+
1554+
[kError](error){
1555+
this.promise.reject(error);
1556+
}
1557+
}
1558+
15421559
functionreadableStreamTee(stream,cloneForBranch2){
15431560
if(isReadableByteStreamController(stream[kState].controller)){
15441561
returnreadableByteStreamTee(stream);

0 commit comments

Comments
(0)