Skip to content

Commit 651e450

Browse files
rluvatonronag
authored andcommitted
stream: add highWaterMark for the map operator
this is done so we don't wait for the first items to finish before starting new ones Fixes: #46132 Co-authored-by: Robert Nagy <[email protected]> PR-URL: #49249 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Robert Nagy <[email protected]>
1 parent 3f771ca commit 651e450

File tree

4 files changed

+226
-16
lines changed

4 files changed

+226
-16
lines changed

‎doc/api/stream.md‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2008,6 +2008,10 @@ showBoth();
20082008
added:
20092009
- v17.4.0
20102010
- v16.14.0
2011+
changes:
2012+
- version: REPLACEME
2013+
pr-url: https://github.com/nodejs/node/pull/49249
2014+
description: added `highWaterMark` in options.
20112015
-->
20122016

20132017
> Stability: 1 - Experimental
@@ -2021,6 +2025,8 @@ added:
20212025
*`options`{Object}
20222026
*`concurrency`{number} the maximum concurrent invocation of `fn` to call
20232027
on the stream at once. **Default:**`1`.
2028+
*`highWaterMark`{number} how many items to buffer while waiting for user
2029+
consumption of the mapped items. **Default:**`concurrency * 2 - 1`.
20242030
*`signal`{AbortSignal} allows destroying the stream if the signal is
20252031
aborted.
20262032
* Returns:{Readable} a stream mapped with the function `fn`.
@@ -2055,6 +2061,10 @@ for await (const result of dnsResults){
20552061
added:
20562062
- v17.4.0
20572063
- v16.14.0
2064+
changes:
2065+
- version: REPLACEME
2066+
pr-url: https://github.com/nodejs/node/pull/49249
2067+
description: added `highWaterMark` in options.
20582068
-->
20592069

20602070
> Stability: 1 - Experimental
@@ -2067,6 +2077,8 @@ added:
20672077
*`options`{Object}
20682078
*`concurrency`{number} the maximum concurrent invocation of `fn` to call
20692079
on the stream at once. **Default:**`1`.
2080+
*`highWaterMark`{number} how many items to buffer while waiting for user
2081+
consumption of the filtered items. **Default:**`concurrency * 2 - 1`.
20702082
*`signal`{AbortSignal} allows destroying the stream if the signal is
20712083
aborted.
20722084
* Returns:{Readable} a stream filtered with the predicate `fn`.

‎lib/internal/streams/operators.js‎

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const{
3333
NumberIsNaN,
3434
Promise,
3535
PromiseReject,
36+
PromiseResolve,
3637
PromisePrototypeThen,
3738
Symbol,
3839
}=primordials;
@@ -82,7 +83,15 @@ function map(fn, options){
8283
concurrency=MathFloor(options.concurrency);
8384
}
8485

85-
validateInteger(concurrency,'concurrency',1);
86+
lethighWaterMark=concurrency-1;
87+
if(options?.highWaterMark!=null){
88+
highWaterMark=MathFloor(options.highWaterMark);
89+
}
90+
91+
validateInteger(concurrency,'options.concurrency',1);
92+
validateInteger(highWaterMark,'options.highWaterMark',0);
93+
94+
highWaterMark+=concurrency;
8695

8796
returnasyncfunction*map(){
8897
constsignal=AbortSignal.any([options?.signal].filter(Boolean));
@@ -93,9 +102,28 @@ function map(fn, options){
93102
letnext;
94103
letresume;
95104
letdone=false;
105+
letcnt=0;
96106

97-
functiononDone(){
107+
functiononCatch(){
98108
done=true;
109+
afterItemProcessed();
110+
}
111+
112+
functionafterItemProcessed(){
113+
cnt-=1;
114+
maybeResume();
115+
}
116+
117+
functionmaybeResume(){
118+
if(
119+
resume&&
120+
!done&&
121+
cnt<concurrency&&
122+
queue.length<highWaterMark
123+
){
124+
resume();
125+
resume=null;
126+
}
99127
}
100128

101129
asyncfunctionpump(){
@@ -111,25 +139,27 @@ function map(fn, options){
111139

112140
try{
113141
val=fn(val,signalOpt);
142+
143+
if(val===kEmpty){
144+
continue;
145+
}
146+
147+
val=PromiseResolve(val);
114148
}catch(err){
115149
val=PromiseReject(err);
116150
}
117151

118-
if(val===kEmpty){
119-
continue;
120-
}
152+
cnt+=1;
121153

122-
if(typeofval?.catch==='function'){
123-
val.catch(onDone);
124-
}
154+
PromisePrototypeThen(val,afterItemProcessed,onCatch);
125155

126156
queue.push(val);
127157
if(next){
128158
next();
129159
next=null;
130160
}
131161

132-
if(!done&&queue.length&&queue.length>=concurrency){
162+
if(!done&&(queue.length>=highWaterMark||cnt>=concurrency)){
133163
awaitnewPromise((resolve)=>{
134164
resume=resolve;
135165
});
@@ -138,7 +168,7 @@ function map(fn, options){
138168
queue.push(kEof);
139169
}catch(err){
140170
constval=PromiseReject(err);
141-
PromisePrototypeThen(val,undefined,onDone);
171+
PromisePrototypeThen(val,afterItemProcessed,onCatch);
142172
queue.push(val);
143173
}finally{
144174
done=true;
@@ -169,10 +199,7 @@ function map(fn, options){
169199
}
170200

171201
queue.shift();
172-
if(resume){
173-
resume();
174-
resume=null;
175-
}
202+
maybeResume();
176203
}
177204

178205
awaitnewPromise((resolve)=>{

‎test/parallel/test-stream-forEach.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ const{once } = require('events');
9696
Readable.from([1,2,3,4]).forEach(async(_,{ signal })=>{
9797
calls++;
9898
awaitonce(signal,'abort');
99-
},{signal: ac.signal,concurrency: 2});
99+
},{signal: ac.signal,concurrency: 2,highWaterMark: 0});
100100
// pump
101101
assert.rejects(async()=>{
102102
awaitforEachPromise;

‎test/parallel/test-stream-map.js‎

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,25 @@ const assert = require('assert');
88
const{ once }=require('events');
99
const{ setTimeout }=require('timers/promises');
1010

11+
functioncreateDependentPromises(n){
12+
constpromiseAndResolveArray=[];
13+
14+
for(leti=0;i<n;i++){
15+
letres;
16+
constpromise=newPromise((resolve)=>{
17+
if(i===0){
18+
res=resolve;
19+
return;
20+
}
21+
res=()=>promiseAndResolveArray[i-1][0].then(resolve);
22+
});
23+
24+
promiseAndResolveArray.push([promise,res]);
25+
}
26+
27+
returnpromiseAndResolveArray;
28+
}
29+
1130
{
1231
// Map works on synchronous streams with a synchronous mapper
1332
conststream=Readable.from([1,2,3,4,5]).map((x)=>x+x);
@@ -143,7 +162,7 @@ const{setTimeout } = require('timers/promises');
143162
conststream=range.map(common.mustCall(async(_,{ signal })=>{
144163
awaitonce(signal,'abort');
145164
throwsignal.reason;
146-
},2),{signal: ac.signal,concurrency: 2});
165+
},2),{signal: ac.signal,concurrency: 2,highWaterMark: 0});
147166
// pump
148167
assert.rejects(async()=>{
149168
forawait(constitemofstream){
@@ -173,12 +192,164 @@ const{setTimeout } = require('timers/promises');
173192
})().then(common.mustCall());
174193
}
175194

195+
196+
{
197+
// highWaterMark with small concurrency
198+
constfinishOrder=[];
199+
200+
constpromises=createDependentPromises(4);
201+
202+
constraw=Readable.from([2,0,1,3]);
203+
conststream=raw.map(async(item)=>{
204+
const[promise,resolve]=promises[item];
205+
resolve();
206+
207+
awaitpromise;
208+
finishOrder.push(item);
209+
returnitem;
210+
},{concurrency: 2});
211+
212+
(async()=>{
213+
awaitstream.toArray();
214+
215+
assert.deepStrictEqual(finishOrder,[0,1,2,3]);
216+
})().then(common.mustCall(),common.mustNotCall());
217+
}
218+
219+
{
220+
// highWaterMark with a lot of items and large concurrency
221+
constfinishOrder=[];
222+
223+
constpromises=createDependentPromises(20);
224+
225+
constinput=[10,1,0,3,4,2,5,7,8,9,6,11,12,13,18,15,16,17,14,19];
226+
constraw=Readable.from(input);
227+
// Should be
228+
// 10, 1, 0, 3, 4, 2 | next: 0
229+
// 10, 1, 3, 4, 2, 5 | next: 1
230+
// 10, 3, 4, 2, 5, 7 | next: 2
231+
// 10, 3, 4, 5, 7, 8 | next: 3
232+
// 10, 4, 5, 7, 8, 9 | next: 4
233+
// 10, 5, 7, 8, 9, 6 | next: 5
234+
// 10, 7, 8, 9, 6, 11 | next: 6
235+
// 10, 7, 8, 9, 11, 12 | next: 7
236+
// 10, 8, 9, 11, 12, 13 | next: 8
237+
// 10, 9, 11, 12, 13, 18 | next: 9
238+
// 10, 11, 12, 13, 18, 15 | next: 10
239+
// 11, 12, 13, 18, 15, 16 | next: 11
240+
// 12, 13, 18, 15, 16, 17 | next: 12
241+
// 13, 18, 15, 16, 17, 14 | next: 13
242+
// 18, 15, 16, 17, 14, 19 | next: 14
243+
// 18, 15, 16, 17, 19 | next: 15
244+
// 18, 16, 17, 19 | next: 16
245+
// 18, 17, 19 | next: 17
246+
// 18, 19 | next: 18
247+
// 19 | next: 19
248+
//
249+
250+
conststream=raw.map(async(item)=>{
251+
const[promise,resolve]=promises[item];
252+
resolve();
253+
254+
awaitpromise;
255+
finishOrder.push(item);
256+
returnitem;
257+
},{concurrency: 6});
258+
259+
(async()=>{
260+
constoutputOrder=awaitstream.toArray();
261+
262+
assert.deepStrictEqual(outputOrder,input);
263+
assert.deepStrictEqual(finishOrder,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]);
264+
})().then(common.mustCall(),common.mustNotCall());
265+
}
266+
267+
{
268+
// Custom highWaterMark with a lot of items and large concurrency
269+
constfinishOrder=[];
270+
271+
constpromises=createDependentPromises(20);
272+
273+
constinput=[11,1,0,3,4,2,5,7,8,9,6,10,12,13,18,15,16,17,14,19];
274+
constraw=Readable.from(input);
275+
// Should be
276+
// 11, 1, 0, 3, 4 | next: 0, buffer: []
277+
// 11, 1, 3, 4, 2 | next: 1, buffer: [0]
278+
// 11, 3, 4, 2, 5 | next: 2, buffer: [0, 1]
279+
// 11, 3, 4, 5, 7 | next: 3, buffer: [0, 1, 2]
280+
// 11, 4, 5, 7, 8 | next: 4, buffer: [0, 1, 2, 3]
281+
// 11, 5, 7, 8, 9 | next: 5, buffer: [0, 1, 2, 3, 4]
282+
// 11, 7, 8, 9, 6 | next: 6, buffer: [0, 1, 2, 3, 4, 5]
283+
// 11, 7, 8, 9, 10 | next: 7, buffer: [0, 1, 2, 3, 4, 5, 6] -- buffer full
284+
// 11, 8, 9, 10, 12 | next: 8, buffer: [0, 1, 2, 3, 4, 5, 6]
285+
// 11, 9, 10, 12, 13 | next: 9, buffer: [0, 1, 2, 3, 4, 5, 6]
286+
// 11, 10, 12, 13, 18 | next: 10, buffer: [0, 1, 2, 3, 4, 5, 6]
287+
// 11, 12, 13, 18, 15 | next: 11, buffer: [0, 1, 2, 3, 4, 5, 6]
288+
// 12, 13, 18, 15, 16 | next: 12, buffer: [] -- all items flushed as 11 is consumed and all the items wait for it
289+
// 13, 18, 15, 16, 17 | next: 13, buffer: []
290+
// 18, 15, 16, 17, 14 | next: 14, buffer: []
291+
// 18, 15, 16, 17, 19 | next: 15, buffer: [14]
292+
// 18, 16, 17, 19 | next: 16, buffer: [14, 15]
293+
// 18, 17, 19 | next: 17, buffer: [14, 15, 16]
294+
// 18, 19 | next: 18, buffer: [14, 15, 16, 17]
295+
// 19 | next: 19, buffer: [] -- all items flushed
296+
//
297+
298+
conststream=raw.map(async(item)=>{
299+
const[promise,resolve]=promises[item];
300+
resolve();
301+
302+
awaitpromise;
303+
finishOrder.push(item);
304+
returnitem;
305+
},{concurrency: 5,highWaterMark: 7});
306+
307+
(async()=>{
308+
constoutputOrder=awaitstream.toArray();
309+
310+
assert.deepStrictEqual(outputOrder,input);
311+
assert.deepStrictEqual(finishOrder,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]);
312+
})().then(common.mustCall(),common.mustNotCall());
313+
}
314+
315+
{
316+
// Where there is a delay between the first and the next item it should not wait for filled queue
317+
// before yielding to the user
318+
constpromises=createDependentPromises(3);
319+
320+
constraw=Readable.from([0,1,2]);
321+
322+
conststream=raw
323+
.map(async(item)=>{
324+
if(item!==0){
325+
awaitpromises[item][0];
326+
}
327+
328+
returnitem;
329+
},{concurrency: 2})
330+
.map((item)=>{
331+
// eslint-disable-next-line no-unused-vars
332+
for(const[_,resolve]ofpromises){
333+
resolve();
334+
}
335+
336+
returnitem;
337+
});
338+
339+
(async()=>{
340+
awaitstream.toArray();
341+
})().then(common.mustCall(),common.mustNotCall());
342+
}
343+
176344
{
177345
// Error cases
178346
assert.throws(()=>Readable.from([1]).map(1),/ERR_INVALID_ARG_TYPE/);
179347
assert.throws(()=>Readable.from([1]).map((x)=>x,{
180348
concurrency: 'Foo'
181349
}),/ERR_OUT_OF_RANGE/);
350+
assert.throws(()=>Readable.from([1]).map((x)=>x,{
351+
concurrency: -1
352+
}),/ERR_OUT_OF_RANGE/);
182353
assert.throws(()=>Readable.from([1]).map((x)=>x,1),/ERR_INVALID_ARG_TYPE/);
183354
assert.throws(()=>Readable.from([1]).map((x)=>x,{signal: true}),/ERR_INVALID_ARG_TYPE/);
184355
}

0 commit comments

Comments
(0)