Skip to content

Commit 34684a1

Browse files
benjamingrronag
authored andcommitted
stream: support some and every
This continues on the iterator-helpers work by adding `.some` and `.every` to readable streams. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #41573 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 86272ca commit 34684a1

File tree

3 files changed

+235
-1
lines changed

3 files changed

+235
-1
lines changed

‎doc/api/stream.md‎

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1864,7 +1864,7 @@ import{Resolver } from 'dns/promises'
18641864
awaitReadable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
18651865

18661866
// Make dns queries concurrently using .map and collect
1867-
// the results into an aray using toArray
1867+
// the results into an array using toArray
18681868
constdnsResults=awaitReadable.from([
18691869
'nodejs.org',
18701870
'openjsf.org',
@@ -1875,6 +1875,104 @@ const dnsResults = await Readable.from([
18751875
},{concurrency:2 }).toArray();
18761876
```
18771877

1878+
### `readable.some(fn[, options])`
1879+
1880+
<!-- YAML
1881+
added: REPLACEME
1882+
-->
1883+
1884+
> Stability: 1 - Experimental
1885+
1886+
*`fn`{Function|AsyncFunction} a function to call on each item of the stream.
1887+
*`data`{any} a chunk of data from the stream.
1888+
*`options`{Object}
1889+
*`signal`{AbortSignal} aborted if the stream is destroyed allowing to
1890+
abort the `fn` call early.
1891+
*`options`{Object}
1892+
*`concurrency`{number} the maximum concurrent invocation of `fn` to call
1893+
on the stream at once. **Default:**`1`.
1894+
*`signal`{AbortSignal} allows destroying the stream if the signal is
1895+
aborted.
1896+
* Returns:{Promise} a promise evaluating to `true` if `fn` returned a truthy
1897+
value for at least one of the chunks.
1898+
1899+
This method is similar to `Array.prototype.some` and calls `fn` on each chunk
1900+
in the stream until the awaited return value is `true` (or any truthy value).
1901+
Once an `fn` call on a chunk awaited return value is truthy, the stream is
1902+
destroyed and the promise is fulfilled with `true`. If none of the `fn`
1903+
calls on the chunks return a truthy value, the promise is fulfilled with
1904+
`false`.
1905+
1906+
```mjs
1907+
import{Readable } from'stream';
1908+
import{stat } from'fs/promises';
1909+
1910+
// With a synchronous predicate.
1911+
awaitReadable.from([1, 2, 3, 4]).some((x) => x >2); // true
1912+
awaitReadable.from([1, 2, 3, 4]).some((x) => x <0); // false
1913+
1914+
// With an asynchronous predicate, making at most 2 file checks at a time.
1915+
constanyBigFile=awaitReadable.from([
1916+
'file1',
1917+
'file2',
1918+
'file3',
1919+
]).some(async (fileName) =>{
1920+
conststats=awaitstat(fileName);
1921+
returnstat.size>1024*1024;
1922+
},{concurrency:2 });
1923+
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
1924+
console.log('done'); // Stream has finished
1925+
```
1926+
1927+
### `readable.every(fn[, options])`
1928+
1929+
<!-- YAML
1930+
added: REPLACEME
1931+
-->
1932+
1933+
> Stability: 1 - Experimental
1934+
1935+
*`fn`{Function|AsyncFunction} a function to call on each item of the stream.
1936+
*`data`{any} a chunk of data from the stream.
1937+
*`options`{Object}
1938+
*`signal`{AbortSignal} aborted if the stream is destroyed allowing to
1939+
abort the `fn` call early.
1940+
*`options`{Object}
1941+
*`concurrency`{number} the maximum concurrent invocation of `fn` to call
1942+
on the stream at once. **Default:**`1`.
1943+
*`signal`{AbortSignal} allows destroying the stream if the signal is
1944+
aborted.
1945+
* Returns:{Promise} a promise evaluating to `true` if `fn` returned a truthy
1946+
value for all of the chunks.
1947+
1948+
This method is similar to `Array.prototype.every` and calls `fn` on each chunk
1949+
in the stream to check if all awaited return values are truthy value for `fn`.
1950+
Once an `fn` call on a chunk awaited return value is falsy, the stream is
1951+
destroyed and the promise is fulfilled with `false`. If all of the `fn` calls
1952+
on the chunks return a truthy value, the promise is fulfilled with `true`.
1953+
1954+
```mjs
1955+
import{Readable } from'stream';
1956+
import{stat } from'fs/promises';
1957+
1958+
// With a synchronous predicate.
1959+
awaitReadable.from([1, 2, 3, 4]).every((x) => x >2); // false
1960+
awaitReadable.from([1, 2, 3, 4]).every((x) => x >0); // true
1961+
1962+
// With an asynchronous predicate, making at most 2 file checks at a time.
1963+
constallBigFiles=awaitReadable.from([
1964+
'file1',
1965+
'file2',
1966+
'file3',
1967+
]).every(async (fileName) =>{
1968+
conststats=awaitstat(fileName);
1969+
returnstat.size>1024*1024;
1970+
},{concurrency:2 });
1971+
// `true` if all files in the list are bigger than 1MiB
1972+
console.log(allBigFiles);
1973+
console.log('done'); // Stream has finished
1974+
```
1975+
18781976
### Duplex and transform streams
18791977

18801978
#### Class: `stream.Duplex`

‎lib/internal/streams/operators.js‎

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const{
1010
AbortError,
1111
}=require('internal/errors');
1212
const{ validateInteger }=require('internal/validators');
13+
const{ kWeakHandler }=require('internal/event_target');
1314

1415
const{
1516
ArrayPrototypePush,
@@ -47,6 +48,10 @@ async function * map(fn, options){
4748
constsignalOpt={ signal };
4849

4950
constabort=()=>ac.abort();
51+
if(options?.signal?.aborted){
52+
abort();
53+
}
54+
5055
options?.signal?.addEventListener('abort',abort);
5156

5257
letnext;
@@ -150,6 +155,40 @@ async function * map(fn, options){
150155
}
151156
}
152157

158+
asyncfunctionsome(fn,options){
159+
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
160+
// Note that some does short circuit but also closes the iterator if it does
161+
constac=newAbortController();
162+
if(options?.signal){
163+
if(options.signal.aborted){
164+
ac.abort();
165+
}
166+
options.signal.addEventListener('abort',()=>ac.abort(),{
167+
[kWeakHandler]: this,
168+
once: true,
169+
});
170+
}
171+
constmapped=this.map(fn,{ ...options,signal: ac.signal});
172+
forawait(constresultofmapped){
173+
if(result){
174+
ac.abort();
175+
returntrue;
176+
}
177+
}
178+
returnfalse;
179+
}
180+
181+
asyncfunctionevery(fn,options){
182+
if(typeoffn!=='function'){
183+
thrownewERR_INVALID_ARG_TYPE(
184+
'fn',['Function','AsyncFunction'],fn);
185+
}
186+
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
187+
return!(awaitsome.call(this,async(x)=>{
188+
return!(awaitfn(x));
189+
},options));
190+
}
191+
153192
asyncfunctionforEach(fn,options){
154193
if(typeoffn!=='function'){
155194
thrownewERR_INVALID_ARG_TYPE(
@@ -196,6 +235,8 @@ module.exports.streamReturningOperators ={
196235
};
197236

198237
module.exports.promiseReturningOperators={
238+
every,
199239
forEach,
200240
toArray,
241+
some,
201242
};
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
'use strict';
2+
3+
constcommon=require('../common');
4+
const{
5+
Readable,
6+
}=require('stream');
7+
constassert=require('assert');
8+
9+
functiononeTo5(){
10+
returnReadable.from([1,2,3,4,5]);
11+
}
12+
13+
functiononeTo5Async(){
14+
returnoneTo5().map(async(x)=>{
15+
awaitPromise.resolve();
16+
returnx;
17+
});
18+
}
19+
{
20+
// Some and every work with a synchronous stream and predicate
21+
(async()=>{
22+
assert.strictEqual(awaitoneTo5().some((x)=>x>3),true);
23+
assert.strictEqual(awaitoneTo5().every((x)=>x>3),false);
24+
assert.strictEqual(awaitoneTo5().some((x)=>x>6),false);
25+
assert.strictEqual(awaitoneTo5().every((x)=>x<6),true);
26+
assert.strictEqual(awaitReadable.from([]).some((x)=>true),false);
27+
assert.strictEqual(awaitReadable.from([]).every((x)=>true),true);
28+
})().then(common.mustCall());
29+
}
30+
31+
{
32+
// Some and every work with an asynchronous stream and synchronous predicate
33+
(async()=>{
34+
assert.strictEqual(awaitoneTo5Async().some((x)=>x>3),true);
35+
assert.strictEqual(awaitoneTo5Async().every((x)=>x>3),false);
36+
assert.strictEqual(awaitoneTo5Async().some((x)=>x>6),false);
37+
assert.strictEqual(awaitoneTo5Async().every((x)=>x<6),true);
38+
})().then(common.mustCall());
39+
}
40+
41+
{
42+
// Some and every work on asynchronous streams with an asynchronous predicate
43+
(async()=>{
44+
assert.strictEqual(awaitoneTo5().some(async(x)=>x>3),true);
45+
assert.strictEqual(awaitoneTo5().every(async(x)=>x>3),false);
46+
assert.strictEqual(awaitoneTo5().some(async(x)=>x>6),false);
47+
assert.strictEqual(awaitoneTo5().every(async(x)=>x<6),true);
48+
})().then(common.mustCall());
49+
}
50+
51+
{
52+
// Some and every short circuit
53+
(async()=>{
54+
awaitoneTo5().some(common.mustCall((x)=>x>2,3));
55+
awaitoneTo5().every(common.mustCall((x)=>x<3,3));
56+
// When short circuit isn't possible the whole stream is iterated
57+
awaitoneTo5().some(common.mustCall((x)=>x>6,5));
58+
// The stream is destroyed afterwards
59+
conststream=oneTo5();
60+
awaitstream.some(common.mustCall((x)=>x>2,3));
61+
assert.strictEqual(stream.destroyed,true);
62+
})().then(common.mustCall());
63+
}
64+
65+
{
66+
// Support for AbortSignal
67+
constac=newAbortController();
68+
assert.rejects(Readable.from([1,2,3]).some(
69+
()=>newPromise(()=>{}),
70+
{signal: ac.signal}
71+
),{
72+
name: 'AbortError',
73+
}).then(common.mustCall());
74+
ac.abort();
75+
}
76+
{
77+
// Support for pre-aborted AbortSignal
78+
assert.rejects(Readable.from([1,2,3]).some(
79+
()=>newPromise(()=>{}),
80+
{signal: AbortSignal.abort()}
81+
),{
82+
name: 'AbortError',
83+
}).then(common.mustCall());
84+
}
85+
{
86+
// Error cases
87+
assert.rejects(async()=>{
88+
awaitReadable.from([1]).every(1);
89+
},/ERR_INVALID_ARG_TYPE/).then(common.mustCall());
90+
assert.rejects(async()=>{
91+
awaitReadable.from([1]).every((x)=>x,{
92+
concurrency: 'Foo'
93+
});
94+
},/ERR_OUT_OF_RANGE/).then(common.mustCall());
95+
}

0 commit comments

Comments
(0)