Skip to content

Commit 08a0c6c

Browse files
rluvatonUlisesGascon
authored andcommitted
stream: improve from perf
PR-URL: #50359 Reviewed-By: Vinícius Lourenço Claro Cardoso <[email protected]> Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]>
1 parent 099f5cf commit 08a0c6c

File tree

1 file changed

+113
-18
lines changed

1 file changed

+113
-18
lines changed

‎lib/internal/streams/from.js‎

Lines changed: 113 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ function from(Readable, iterable, opts){
3636
thrownewERR_INVALID_ARG_TYPE('iterable',['Iterable'],iterable);
3737
}
3838

39+
3940
constreadable=newReadable({
4041
objectMode: true,
4142
highWaterMark: 1,
@@ -46,11 +47,19 @@ function from(Readable, iterable, opts){
4647
// Flag to protect against _read
4748
// being called before last iteration completion.
4849
letreading=false;
50+
letisAsyncValues=false;
4951

5052
readable._read=function(){
5153
if(!reading){
5254
reading=true;
53-
next();
55+
56+
if(isAsync){
57+
nextAsync();
58+
}elseif(isAsyncValues){
59+
nextSyncWithAsyncValues();
60+
}else{
61+
nextSyncWithSyncValues();
62+
}
5463
}
5564
};
5665

@@ -78,29 +87,115 @@ function from(Readable, iterable, opts){
7887
}
7988
}
8089

81-
asyncfunctionnext(){
90+
// There are a lot of duplication here, it's done on purpose for performance
91+
// reasons - avoid await when not needed.
92+
93+
functionnextSyncWithSyncValues(){
94+
for(;;){
95+
try{
96+
const{ value, done }=iterator.next();
97+
98+
if(done){
99+
readable.push(null);
100+
return;
101+
}
102+
103+
if(value&&
104+
typeofvalue.then==='function'){
105+
returnchangeToAsyncValues(value);
106+
}
107+
108+
if(value===null){
109+
reading=false;
110+
thrownewERR_STREAM_NULL_VALUES();
111+
}
112+
113+
if(readable.push(value)){
114+
continue;
115+
}
116+
117+
reading=false;
118+
}catch(err){
119+
readable.destroy(err);
120+
}
121+
break;
122+
}
123+
}
124+
125+
asyncfunctionchangeToAsyncValues(value){
126+
isAsyncValues=true;
127+
128+
try{
129+
constres=awaitvalue;
130+
131+
if(res===null){
132+
reading=false;
133+
thrownewERR_STREAM_NULL_VALUES();
134+
}
135+
136+
if(readable.push(res)){
137+
nextSyncWithAsyncValues();
138+
return;
139+
}
140+
141+
reading=false;
142+
}catch(err){
143+
readable.destroy(err);
144+
}
145+
}
146+
147+
asyncfunctionnextSyncWithAsyncValues(){
82148
for(;;){
83149
try{
84-
const{ value, done }=isAsync ?
85-
awaititerator.next() :
86-
iterator.next();
150+
const{ value, done }=iterator.next();
87151

88152
if(done){
89153
readable.push(null);
90-
}else{
91-
constres=(value&&
92-
typeofvalue.then==='function') ?
93-
awaitvalue :
94-
value;
95-
if(res===null){
96-
reading=false;
97-
thrownewERR_STREAM_NULL_VALUES();
98-
}elseif(readable.push(res)){
99-
continue;
100-
}else{
101-
reading=false;
102-
}
154+
return;
155+
}
156+
157+
constres=(value&&
158+
typeofvalue.then==='function') ?
159+
awaitvalue :
160+
value;
161+
162+
if(res===null){
163+
reading=false;
164+
thrownewERR_STREAM_NULL_VALUES();
103165
}
166+
167+
if(readable.push(res)){
168+
continue;
169+
}
170+
171+
reading=false;
172+
}catch(err){
173+
readable.destroy(err);
174+
}
175+
break;
176+
}
177+
}
178+
179+
asyncfunctionnextAsync(){
180+
for(;;){
181+
try{
182+
const{ value, done }=awaititerator.next();
183+
184+
if(done){
185+
readable.push(null);
186+
return;
187+
}
188+
189+
if(value===null){
190+
reading=false;
191+
thrownewERR_STREAM_NULL_VALUES();
192+
}
193+
194+
if(readable.push(value)){
195+
continue;
196+
}
197+
198+
reading=false;
104199
}catch(err){
105200
readable.destroy(err);
106201
}

0 commit comments

Comments
(0)