Skip to content

Commit 46ec74d

Browse files
benjamingrronag
authored andcommitted
stream: support flatMap
Support the `flatMap` method from the iterator helper TC39 proposal on readable streams. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #41612 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent ae7df17 commit 46ec74d

File tree

3 files changed

+188
-0
lines changed

3 files changed

+188
-0
lines changed

‎doc/api/stream.md‎

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1973,6 +1973,55 @@ console.log(allBigFiles);
19731973
console.log('done'); // Stream has finished
19741974
```
19751975

1976+
### `readable.flatMap(fn[, options])`
1977+
1978+
<!-- YAML
1979+
added: REPLACEME
1980+
-->
1981+
1982+
> Stability: 1 - Experimental
1983+
1984+
*`fn`{Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
1985+
every item in the stream.
1986+
*`data`{any} a chunk of data from the stream.
1987+
*`options`{Object}
1988+
*`signal`{AbortSignal} aborted if the stream is destroyed allowing to
1989+
abort the `fn` call early.
1990+
*`options`{Object}
1991+
*`concurrency`{number} the maximum concurrent invocation of `fn` to call
1992+
on the stream at once. **Default:**`1`.
1993+
*`signal`{AbortSignal} allows destroying the stream if the signal is
1994+
aborted.
1995+
* Returns:{Readable} a stream flat-mapped with the function `fn`.
1996+
1997+
This method returns a new stream by applying the given callback to each
1998+
chunk of the stream and then flattening the result.
1999+
2000+
It is possible to return a stream or another iterable or async iterable from
2001+
`fn` and the result streams will be merged (flattened) into the returned
2002+
stream.
2003+
2004+
```mjs
2005+
import{Readable } from'stream';
2006+
import{createReadStream } from'fs';
2007+
2008+
// With a synchronous mapper.
2009+
forawait (constitemofReadable.from([1, 2, 3, 4]).flatMap((x) => [x, x])){
2010+
console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4
2011+
}
2012+
// With an asynchronous mapper, combine the contents of 4 files
2013+
constconcatResult=Readable.from([
2014+
'./1.mjs',
2015+
'./2.mjs',
2016+
'./3.mjs',
2017+
'./4.mjs',
2018+
]).flatMap((fileName) =>createReadStream(fileName));
2019+
forawait (constresultofconcatResult){
2020+
// This will contain the contents (all chunks) of all 4 files
2021+
console.log(result);
2022+
}
2023+
```
2024+
19762025
### Duplex and transform streams
19772026

19782027
#### Class: `stream.Duplex`

‎lib/internal/streams/operators.js‎

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,16 @@ async function toArray(options){
229229
}
230230
returnresult;
231231
}
232+
233+
asyncfunction*flatMap(fn,options){
234+
forawait(constvalofthis.map(fn,options)){
235+
yield*val;
236+
}
237+
}
238+
232239
module.exports.streamReturningOperators={
233240
filter,
241+
flatMap,
234242
map,
235243
};
236244

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
'use strict';
2+
3+
constcommon=require('../common');
4+
constfixtures=require('../common/fixtures');
5+
const{
6+
Readable,
7+
}=require('stream');
8+
constassert=require('assert');
9+
const{ setTimeout }=require('timers/promises');
10+
const{ createReadStream }=require('fs');
11+
12+
functiononeTo5(){
13+
returnReadable.from([1,2,3,4,5]);
14+
}
15+
16+
{
17+
// flatMap works on synchronous streams with a synchronous mapper
18+
(async()=>{
19+
assert.deepStrictEqual(
20+
awaitoneTo5().flatMap((x)=>[x+x]).toArray(),
21+
[2,4,6,8,10]
22+
);
23+
assert.deepStrictEqual(
24+
awaitoneTo5().flatMap(()=>[]).toArray(),
25+
[]
26+
);
27+
assert.deepStrictEqual(
28+
awaitoneTo5().flatMap((x)=>[x,x]).toArray(),
29+
[1,1,2,2,3,3,4,4,5,5]
30+
);
31+
})().then(common.mustCall());
32+
}
33+
34+
35+
{
36+
// flatMap works on sync/async streams with an asynchronous mapper
37+
(async()=>{
38+
assert.deepStrictEqual(
39+
awaitoneTo5().flatMap(async(x)=>[x,x]).toArray(),
40+
[1,1,2,2,3,3,4,4,5,5]
41+
);
42+
constasyncOneTo5=oneTo5().map(async(x)=>x);
43+
assert.deepStrictEqual(
44+
awaitasyncOneTo5.flatMap(async(x)=>[x,x]).toArray(),
45+
[1,1,2,2,3,3,4,4,5,5]
46+
);
47+
})().then(common.mustCall());
48+
}
49+
{
50+
// flatMap works on a stream where mapping returns a stream
51+
(async()=>{
52+
constresult=awaitoneTo5().flatMap(async(x)=>{
53+
returnReadable.from([x,x]);
54+
}).toArray();
55+
assert.deepStrictEqual(result,[1,1,2,2,3,3,4,4,5,5]);
56+
})().then(common.mustCall());
57+
// flatMap works on an objectMode stream where mappign returns a stream
58+
(async()=>{
59+
constresult=awaitoneTo5().flatMap(()=>{
60+
returncreateReadStream(fixtures.path('x.txt'));
61+
}).toArray();
62+
// The resultant stream is in object mode so toArray shouldn't flatten
63+
assert.strictEqual(result.length,5);
64+
assert.deepStrictEqual(
65+
Buffer.concat(result).toString(),
66+
'xyz\n'.repeat(5)
67+
);
68+
69+
})().then(common.mustCall());
70+
71+
}
72+
73+
{
74+
// Concurrency + AbortSignal
75+
constac=newAbortController();
76+
conststream=oneTo5().flatMap(common.mustNotCall(async(_,{ signal })=>{
77+
awaitsetTimeout(100,{ signal });
78+
}),{signal: ac.signal,concurrency: 2});
79+
// pump
80+
assert.rejects(async()=>{
81+
forawait(constitemofstream){
82+
// nope
83+
console.log(item);
84+
}
85+
},{
86+
name: 'AbortError',
87+
}).then(common.mustCall());
88+
89+
queueMicrotask(()=>{
90+
ac.abort();
91+
});
92+
}
93+
94+
{
95+
// Already aborted AbortSignal
96+
conststream=oneTo5().flatMap(common.mustNotCall(async(_,{ signal })=>{
97+
awaitsetTimeout(100,{ signal });
98+
}),{signal: AbortSignal.abort()});
99+
// pump
100+
assert.rejects(async()=>{
101+
forawait(constitemofstream){
102+
// nope
103+
console.log(item);
104+
}
105+
},{
106+
name: 'AbortError',
107+
}).then(common.mustCall());
108+
}
109+
110+
{
111+
// Error cases
112+
assert.rejects(async()=>{
113+
// eslint-disable-next-line no-unused-vars
114+
forawait(constunusedofReadable.from([1]).flatMap(1));
115+
},/ERR_INVALID_ARG_TYPE/).then(common.mustCall());
116+
assert.rejects(async()=>{
117+
// eslint-disable-next-line no-unused-vars
118+
forawait(const_ofReadable.from([1]).flatMap((x)=>x,{
119+
concurrency: 'Foo'
120+
}));
121+
},/ERR_OUT_OF_RANGE/).then(common.mustCall());
122+
assert.rejects(async()=>{
123+
// eslint-disable-next-line no-unused-vars
124+
forawait(const_ofReadable.from([1]).flatMap((x)=>x,1));
125+
},/ERR_INVALID_ARG_TYPE/).then(common.mustCall());
126+
}
127+
{
128+
// Test result is a Readable
129+
conststream=oneTo5().flatMap((x)=>x);
130+
assert.strictEqual(stream.readable,true);
131+
}

0 commit comments

Comments
(0)