Skip to content

Commit 5a95fa4

Browse files
ronagcodebytere
authored andcommitted
stream: normalize async iterator stream destroy
PR-URL: #31316 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Rich Trott <[email protected]> Reviewed-By: Minwoo Jung <[email protected]>
1 parent 20d0a0e commit 5a95fa4

File tree

2 files changed

+30
-6
lines changed

2 files changed

+30
-6
lines changed

‎lib/internal/streams/async_iterator.js‎

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ const kStream = Symbol('stream');
2222

2323
letReadable;
2424

25+
functiondestroy(stream,err){
26+
// request.destroy just do .end - .abort is what we want
27+
if(typeofstream.abort==='function')returnstream.abort();
28+
if(stream.req&&
29+
typeofstream.req.abort==='function')returnstream.req.abort();
30+
if(typeofstream.destroy==='function')returnstream.destroy(err);
31+
if(typeofstream.close==='function')returnstream.close();
32+
}
33+
2534
functioncreateIterResult(value,done){
2635
return{ value, done };
2736
}
@@ -141,7 +150,7 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
141150
resolve(createIterResult(undefined,true));
142151
}
143152
});
144-
stream.destroy();
153+
destroy(stream);
145154
});
146155
},
147156
},AsyncIteratorPrototype);
@@ -156,11 +165,7 @@ const createReadableStreamAsyncIterator = (stream) =>{
156165

157166
constsrc=stream;
158167
stream=newReadable({objectMode: true}).wrap(src);
159-
finished(stream,(err)=>{
160-
if(typeofsrc.destroy==='function'){
161-
src.destroy(err);
162-
}
163-
});
168+
finished(stream,(err)=>destroy(src,err));
164169
}
165170

166171
constiterator=ObjectCreate(ReadableStreamAsyncIteratorPrototype,{

‎test/parallel/test-stream-readable-async-iterators.js‎

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,25 @@ async function tests(){
5656
}));
5757
}
5858

59+
{
60+
// Non standard stream cleanup
61+
62+
constreadable=newReadable({autoDestroy: false,read(){}});
63+
readable.push('asd');
64+
readable.push('asd');
65+
readable.destroy=null;
66+
readable.close=common.mustCall(()=>{
67+
readable.emit('close');
68+
});
69+
70+
await(async()=>{
71+
forawait(constdofreadable){
72+
d;
73+
return;
74+
}
75+
})();
76+
}
77+
5978
{
6079
constreadable=newReadable({objectMode: true,read(){}});
6180
readable.push(0);

0 commit comments

Comments
(0)