Skip to content

Commit 4640ea2

Browse files
ronagMylesBorins
authored andcommitted
stream: don't destroy final readable stream in pipeline
If the last stream in a pipeline is still usable/readable don't destroy it to allow further composition. Fixes: #32105 Backport-PR-URL: #32111 PR-URL: #32110 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]>
1 parent ddb8824 commit 4640ea2

File tree

2 files changed

+52
-3
lines changed

2 files changed

+52
-3
lines changed

‎lib/internal/streams/pipeline.js‎

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,18 @@ function destroyStream(stream, err){
3636
if(typeofstream.close==='function')returnstream.close();
3737
}
3838

39-
functiondestroyer(stream,reading,writing,callback){
39+
functiondestroyer(stream,reading,writing,final,callback){
4040
callback=once(callback);
4141
letdestroyed=false;
4242

4343
if(eos===undefined)eos=require('internal/streams/end-of-stream');
4444
eos(stream,{readable: reading,writable: writing},(err)=>{
4545
if(destroyed)return;
4646
destroyed=true;
47-
destroyStream(stream,err);
47+
constreadable=stream.readable||isRequest(stream);
48+
if(err||!final||!readable){
49+
destroyStream(stream,err);
50+
}
4851
callback(err);
4952
});
5053

@@ -176,7 +179,7 @@ function pipeline(...streams){
176179
}
177180

178181
functionwrap(stream,reading,writing,final){
179-
destroys.push(destroyer(stream,reading,writing,(err)=>{
182+
destroys.push(destroyer(stream,reading,writing,final,(err)=>{
180183
finish(err,final);
181184
}));
182185
}

‎test/parallel/test-stream-pipeline.js‎

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -918,6 +918,52 @@ const{promisify } = require('util');
918918
constdst=newPassThrough({autoDestroy: false});
919919
pipeline(src,dst,common.mustCall(()=>{
920920
assert.strictEqual(src.destroyed,true);
921+
assert.strictEqual(dst.destroyed,false);
922+
}));
923+
src.end();
924+
}
925+
926+
{
927+
constserver=http.createServer((req,res)=>{
928+
});
929+
930+
server.listen(0,()=>{
931+
constreq=http.request({
932+
port: server.address().port
933+
});
934+
935+
constbody=newPassThrough();
936+
pipeline(
937+
body,
938+
req,
939+
common.mustCall((err)=>{
940+
assert(!err);
941+
assert(!req.res);
942+
assert(!req.aborted);
943+
req.abort();
944+
server.close();
945+
})
946+
);
947+
body.end();
948+
});
949+
}
950+
951+
{
952+
constsrc=newPassThrough();
953+
constdst=newPassThrough();
954+
pipeline(src,dst,common.mustCall((err)=>{
955+
assert(!err);
956+
assert.strictEqual(dst.destroyed,false);
957+
}));
958+
src.end();
959+
}
960+
961+
{
962+
constsrc=newPassThrough();
963+
constdst=newPassThrough();
964+
dst.readable=false;
965+
pipeline(src,dst,common.mustCall((err)=>{
966+
assert(!err);
921967
assert.strictEqual(dst.destroyed,true);
922968
}));
923969
src.end();

0 commit comments

Comments
(0)