Skip to content

Commit 2b9003b

Browse files
ronagnodejs-github-bot
authored andcommitted
stream: don't destroy on async iterator success
Destroying on async iterator completion ignores autoDestroy. PR-URL: #35122 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 5461794 commit 2b9003b

File tree

2 files changed

+163
-14
lines changed

2 files changed

+163
-14
lines changed

‎lib/internal/streams/readable.js‎

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,7 +1066,7 @@ Readable.prototype[SymbolAsyncIterator] = function(){
10661066
objectMode: true,
10671067
destroy(err,callback){
10681068
destroyImpl.destroyer(src,err);
1069-
callback();
1069+
callback(err);
10701070
}
10711071
}).wrap(src);
10721072
}
@@ -1088,24 +1088,39 @@ async function* createAsyncIterator(stream){
10881088
}
10891089
}
10901090

1091+
conststate=stream._readableState;
1092+
1093+
leterror=state.errored;
1094+
leterrorEmitted=state.errorEmitted;
1095+
letendEmitted=state.endEmitted;
1096+
letcloseEmitted=state.closeEmitted;
1097+
10911098
stream
10921099
.on('readable',next)
1093-
.on('error',next)
1094-
.on('end',next)
1095-
.on('close',next);
1100+
.on('error',function(err){
1101+
error=err;
1102+
errorEmitted=true;
1103+
next.call(this);
1104+
})
1105+
.on('end',function(){
1106+
endEmitted=true;
1107+
next.call(this);
1108+
})
1109+
.on('close',function(){
1110+
closeEmitted=true;
1111+
next.call(this);
1112+
});
10961113

10971114
try{
1098-
conststate=stream._readableState;
10991115
while(true){
11001116
constchunk=stream.read();
11011117
if(chunk!==null){
11021118
yieldchunk;
1103-
}elseif(state.errored){
1104-
throwstate.errored;
1105-
}elseif(state.ended){
1119+
}elseif(errorEmitted){
1120+
throwerror;
1121+
}elseif(endEmitted){
11061122
break;
1107-
}elseif(state.closed){
1108-
// TODO(ronag): ERR_PREMATURE_CLOSE?
1123+
}elseif(closeEmitted){
11091124
break;
11101125
}else{
11111126
awaitnewPromise(next);
@@ -1115,7 +1130,10 @@ async function* createAsyncIterator(stream){
11151130
destroyImpl.destroyer(stream,err);
11161131
throwerr;
11171132
}finally{
1118-
destroyImpl.destroyer(stream,null);
1133+
if(state.autoDestroy||!endEmitted){
1134+
// TODO(ronag): ERR_PREMATURE_CLOSE?
1135+
destroyImpl.destroyer(stream,null);
1136+
}
11191137
}
11201138
}
11211139

‎test/parallel/test-stream-readable-async-iterators.js‎

Lines changed: 134 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const{
99
pipeline
1010
}=require('stream');
1111
constassert=require('assert');
12+
consthttp=require('http');
1213

1314
asyncfunctiontests(){
1415
{
@@ -44,9 +45,11 @@ async function tests(){
4445
constiter=Readable.prototype[Symbol.asyncIterator].call(stream);
4546
awaititer.next();
4647
awaititer.next();
47-
awaititer.next().catch(common.mustCall((err)=>{
48-
assert.strictEqual(err.message,'asd');
49-
}));
48+
awaititer.next()
49+
.then(common.mustNotCall())
50+
.catch(common.mustCall((err)=>{
51+
assert.strictEqual(err.message,'asd');
52+
}));
5053
}
5154

5255
{
@@ -581,6 +584,61 @@ async function tests(){
581584
assert.strictEqual(err,_err);
582585
}));
583586
}
587+
588+
{
589+
// Don't destroy if no auto destroy.
590+
// https://github.com/nodejs/node/issues/35116
591+
592+
constr=newReadable({
593+
autoDestroy: false,
594+
read(){
595+
this.push('asd');
596+
this.push(null);
597+
}
598+
});
599+
600+
forawait(constchunkofr){
601+
chunk;
602+
}
603+
assert.strictEqual(r.destroyed,false);
604+
}
605+
606+
{
607+
// Destroy if no auto destroy and premature break.
608+
// https://github.com/nodejs/node/pull/35122/files#r485678318
609+
610+
constr=newReadable({
611+
autoDestroy: false,
612+
read(){
613+
this.push('asd');
614+
}
615+
});
616+
617+
forawait(constchunkofr){
618+
chunk;
619+
break;
620+
}
621+
assert.strictEqual(r.destroyed,true);
622+
}
623+
624+
{
625+
// Don't destroy before 'end'.
626+
627+
constr=newReadable({
628+
read(){
629+
this.push('asd');
630+
this.push(null);
631+
}
632+
}).on('end',()=>{
633+
assert.strictEqual(r.destroyed,false);
634+
});
635+
636+
forawait(constchunkofr){
637+
chunk;
638+
}
639+
640+
assert.strictEqual(r.destroyed,true);
641+
}
584642
}
585643

586644
{
@@ -643,5 +701,78 @@ async function tests(){
643701
});
644702
}
645703

704+
{
705+
let_req;
706+
constserver=http.createServer((request,response)=>{
707+
response.statusCode=404;
708+
response.write('never ends');
709+
});
710+
711+
server.listen(()=>{
712+
_req=http.request(`http://localhost:${server.address().port}`)
713+
.on('response',common.mustCall(async(res)=>{
714+
setTimeout(()=>{
715+
_req.destroy(newError('something happened'));
716+
},100);
717+
718+
res.on('error',common.mustCall());
719+
720+
let_err;
721+
try{
722+
forawait(constchunkofres){
723+
chunk;
724+
}
725+
}catch(err){
726+
_err=err;
727+
}
728+
729+
assert.strictEqual(_err.code,'ECONNRESET');
730+
server.close();
731+
}))
732+
.on('error',common.mustCall())
733+
.end();
734+
});
735+
}
736+
737+
{
738+
asyncfunctiongetParsedBody(request){
739+
letbody='';
740+
741+
forawait(constdataofrequest){
742+
body+=data;
743+
}
744+
745+
try{
746+
returnJSON.parse(body);
747+
}catch{
748+
return{};
749+
}
750+
}
751+
752+
conststr=JSON.stringify({asd: true});
753+
constserver=http.createServer(async(request,response)=>{
754+
constbody=awaitgetParsedBody(request);
755+
response.statusCode=200;
756+
assert.strictEqual(JSON.stringify(body),str);
757+
response.end(JSON.stringify(body));
758+
}).listen(()=>{
759+
http
760+
.request({
761+
method: 'POST',
762+
hostname: 'localhost',
763+
port: server.address().port,
764+
})
765+
.end(str)
766+
.on('response',async(res)=>{
767+
letbody='';
768+
forawait(constchunkofres){
769+
body+=chunk;
770+
}
771+
assert.strictEqual(body,str);
772+
server.close();
773+
});
774+
});
775+
}
776+
646777
// To avoid missing some tests if a promise does not resolve
647778
tests().then(common.mustCall());

0 commit comments

Comments
(0)