Skip to content

Commit 36a4f54

Browse files
vadzimBethGriggs
authored andcommitted
stream: close iterator in Readable.from
Call iterator.return() if not all of its values are consumed. Fixes: #32842 PR-URL: #32844 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Zeyu Yang <[email protected]>
1 parent 7f49812 commit 36a4f54

File tree

2 files changed

+229
-1
lines changed

2 files changed

+229
-1
lines changed

‎lib/internal/streams/from.js‎

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,51 @@ function from(Readable, iterable, opts){
3434
objectMode: true,
3535
...opts
3636
});
37+
3738
// Reading boolean to protect against _read
3839
// being called before last iteration completion.
3940
letreading=false;
41+
42+
// needToClose boolean if iterator needs to be explicitly closed
43+
letneedToClose=false;
44+
4045
readable._read=function(){
4146
if(!reading){
4247
reading=true;
4348
next();
4449
}
4550
};
51+
52+
readable._destroy=function(error,cb){
53+
if(needToClose){
54+
needToClose=false;
55+
close().then(
56+
()=>process.nextTick(cb,error),
57+
(e)=>process.nextTick(cb,error||e),
58+
);
59+
}else{
60+
cb(error);
61+
}
62+
};
63+
64+
asyncfunctionclose(){
65+
if(typeofiterator.return==='function'){
66+
const{ value }=awaititerator.return();
67+
awaitvalue;
68+
}
69+
}
70+
4671
asyncfunctionnext(){
4772
try{
73+
needToClose=false;
4874
const{ value, done }=awaititerator.next();
75+
needToClose=!done;
76+
constresolved=awaitvalue;
4977
if(done){
5078
readable.push(null);
51-
}elseif(readable.push(awaitvalue)){
79+
}elseif(readable.destroyed){
80+
awaitclose();
81+
}elseif(readable.push(resolved)){
5282
next();
5383
}else{
5484
reading=false;
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
'use strict';
2+
3+
const{ mustCall, mustNotCall }=require('../common');
4+
const{ Readable }=require('stream');
5+
const{ strictEqual }=require('assert');
6+
7+
asyncfunctionasyncSupport(){
8+
constfinallyMustCall=mustCall();
9+
constbodyMustCall=mustCall();
10+
11+
asyncfunction*infiniteGenerate(){
12+
try{
13+
while(true)yield'a';
14+
}finally{
15+
finallyMustCall();
16+
}
17+
}
18+
19+
conststream=Readable.from(infiniteGenerate());
20+
21+
forawait(constchunkofstream){
22+
bodyMustCall();
23+
strictEqual(chunk,'a');
24+
break;
25+
}
26+
}
27+
28+
asyncfunctionsyncSupport(){
29+
constfinallyMustCall=mustCall();
30+
constbodyMustCall=mustCall();
31+
32+
function*infiniteGenerate(){
33+
try{
34+
while(true)yield'a';
35+
}finally{
36+
finallyMustCall();
37+
}
38+
}
39+
40+
conststream=Readable.from(infiniteGenerate());
41+
42+
forawait(constchunkofstream){
43+
bodyMustCall();
44+
strictEqual(chunk,'a');
45+
break;
46+
}
47+
}
48+
49+
asyncfunctionsyncPromiseSupport(){
50+
constreturnMustBeAwaited=mustCall();
51+
constbodyMustCall=mustCall();
52+
53+
function*infiniteGenerate(){
54+
try{
55+
while(true)yieldPromise.resolve('a');
56+
}finally{
57+
// eslint-disable-next-line no-unsafe-finally
58+
return{then(cb){
59+
returnMustBeAwaited();
60+
cb();
61+
}};
62+
}
63+
}
64+
65+
conststream=Readable.from(infiniteGenerate());
66+
67+
forawait(constchunkofstream){
68+
bodyMustCall();
69+
strictEqual(chunk,'a');
70+
break;
71+
}
72+
}
73+
74+
asyncfunctionsyncRejectedSupport(){
75+
constreturnMustBeAwaited=mustCall();
76+
constbodyMustNotCall=mustNotCall();
77+
constcatchMustCall=mustCall();
78+
constsecondNextMustNotCall=mustNotCall();
79+
80+
function*generate(){
81+
try{
82+
yieldPromise.reject('a');
83+
secondNextMustNotCall();
84+
}finally{
85+
// eslint-disable-next-line no-unsafe-finally
86+
return{then(cb){
87+
returnMustBeAwaited();
88+
cb();
89+
}};
90+
}
91+
}
92+
93+
conststream=Readable.from(generate());
94+
95+
try{
96+
forawait(constchunkofstream){
97+
bodyMustNotCall(chunk);
98+
}
99+
}catch{
100+
catchMustCall();
101+
}
102+
}
103+
104+
asyncfunctionnoReturnAfterThrow(){
105+
constreturnMustNotCall=mustNotCall();
106+
constbodyMustNotCall=mustNotCall();
107+
constcatchMustCall=mustCall();
108+
constnextMustCall=mustCall();
109+
110+
conststream=Readable.from({
111+
[Symbol.asyncIterator](){returnthis;},
112+
asyncnext(){
113+
nextMustCall();
114+
thrownewError('a');
115+
},
116+
asyncreturn(){
117+
returnMustNotCall();
118+
return{done: true};
119+
},
120+
});
121+
122+
try{
123+
forawait(constchunkofstream){
124+
bodyMustNotCall(chunk);
125+
}
126+
}catch{
127+
catchMustCall();
128+
}
129+
}
130+
131+
asyncfunctioncloseStreamWhileNextIsPending(){
132+
constfinallyMustCall=mustCall();
133+
constdataMustCall=mustCall();
134+
135+
letresolveDestroy;
136+
constdestroyed=
137+
newPromise((resolve)=>{resolveDestroy=mustCall(resolve);});
138+
letresolveYielded;
139+
constyielded=
140+
newPromise((resolve)=>{resolveYielded=mustCall(resolve);});
141+
142+
asyncfunction*infiniteGenerate(){
143+
try{
144+
while(true){
145+
yield'a';
146+
resolveYielded();
147+
awaitdestroyed;
148+
}
149+
}finally{
150+
finallyMustCall();
151+
}
152+
}
153+
154+
conststream=Readable.from(infiniteGenerate());
155+
156+
stream.on('data',(data)=>{
157+
dataMustCall();
158+
strictEqual(data,'a');
159+
});
160+
161+
yielded.then(()=>{
162+
stream.destroy();
163+
resolveDestroy();
164+
});
165+
}
166+
167+
asyncfunctioncloseAfterNullYielded(){
168+
constfinallyMustCall=mustCall();
169+
constdataMustCall=mustCall(3);
170+
171+
function*infiniteGenerate(){
172+
try{
173+
yield'a';
174+
yield'a';
175+
yield'a';
176+
while(true)yieldnull;
177+
}finally{
178+
finallyMustCall();
179+
}
180+
}
181+
182+
conststream=Readable.from(infiniteGenerate());
183+
184+
stream.on('data',(chunk)=>{
185+
dataMustCall();
186+
strictEqual(chunk,'a');
187+
});
188+
}
189+
190+
Promise.all([
191+
asyncSupport(),
192+
syncSupport(),
193+
syncPromiseSupport(),
194+
syncRejectedSupport(),
195+
noReturnAfterThrow(),
196+
closeStreamWhileNextIsPending(),
197+
closeAfterNullYielded(),
198+
]).then(mustCall());

0 commit comments

Comments
(0)