Skip to content

Commit 9b1a379

Browse files
benjamingrronag
authored andcommitted
stream: add drop and take
This adds the `drop` and `take` methods to readable streams allowing users easily drop and take items from the stream. This continues the iterator-helper proposal alignment task. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #41630 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 60eddad commit 9b1a379

File tree

3 files changed

+195
-0
lines changed

3 files changed

+195
-0
lines changed

‎doc/api/stream.md‎

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2020,6 +2020,50 @@ for await (const result of concatResult){
20202020
}
20212021
```
20222022

2023+
### `readable.drop(limit[, options])`
2024+
2025+
<!-- YAML
2026+
added: REPLACEME
2027+
-->
2028+
2029+
> Stability: 1 - Experimental
2030+
2031+
*`limit`{number} the number of chunks to drop from the readable.
2032+
*`options`{Object}
2033+
*`signal`{AbortSignal} allows destroying the stream if the signal is
2034+
aborted.
2035+
* Returns:{Readable} a stream with `limit` chunks dropped.
2036+
2037+
This method returns a new stream with the first `limit` chunks dropped.
2038+
2039+
```mjs
2040+
import{Readable } from'stream';
2041+
2042+
awaitReadable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
2043+
```
2044+
2045+
### `readable.take(limit[, options])`
2046+
2047+
<!-- YAML
2048+
added: REPLACEME
2049+
-->
2050+
2051+
> Stability: 1 - Experimental
2052+
2053+
*`limit`{number} the number of chunks to take from the readable.
2054+
*`options`{Object}
2055+
*`signal`{AbortSignal} allows destroying the stream if the signal is
2056+
aborted.
2057+
* Returns:{Readable} a stream with `limit` chunks taken.
2058+
2059+
This method returns a new stream with the first `limit` chunks.
2060+
2061+
```mjs
2062+
import{Readable } from'stream';
2063+
2064+
awaitReadable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
2065+
```
2066+
20232067
### Duplex and transform streams
20242068

20252069
#### Class: `stream.Duplex`

‎lib/internal/streams/operators.js‎

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const{AbortController } = require('internal/abort_controller');
55
const{
66
codes: {
77
ERR_INVALID_ARG_TYPE,
8+
ERR_OUT_OF_RANGE,
89
},
910
AbortError,
1011
}=require('internal/errors');
@@ -14,6 +15,8 @@ const{kWeakHandler } = require('internal/event_target');
1415
const{
1516
ArrayPrototypePush,
1617
MathFloor,
18+
Number,
19+
NumberIsNaN,
1720
Promise,
1821
PromiseReject,
1922
PromisePrototypeCatch,
@@ -232,10 +235,62 @@ async function* flatMap(fn, options){
232235
}
233236
}
234237

238+
functiontoIntegerOrInfinity(number){
239+
// We coerce here to align with the spec
240+
// https://github.com/tc39/proposal-iterator-helpers/issues/169
241+
number=Number(number);
242+
if(NumberIsNaN(number)){
243+
return0;
244+
}
245+
if(number<0){
246+
thrownewERR_OUT_OF_RANGE('number','>= 0',number);
247+
}
248+
returnnumber;
249+
}
250+
251+
functiondrop(number,options){
252+
number=toIntegerOrInfinity(number);
253+
returnasyncfunction*drop(){
254+
if(options?.signal?.aborted){
255+
thrownewAbortError();
256+
}
257+
forawait(constvalofthis){
258+
if(options?.signal?.aborted){
259+
thrownewAbortError();
260+
}
261+
if(number--<=0){
262+
yieldval;
263+
}
264+
}
265+
}.call(this);
266+
}
267+
268+
269+
functiontake(number,options){
270+
number=toIntegerOrInfinity(number);
271+
returnasyncfunction*take(){
272+
if(options?.signal?.aborted){
273+
thrownewAbortError();
274+
}
275+
forawait(constvalofthis){
276+
if(options?.signal?.aborted){
277+
thrownewAbortError();
278+
}
279+
if(number-->0){
280+
yieldval;
281+
}else{
282+
return;
283+
}
284+
}
285+
}.call(this);
286+
}
287+
235288
module.exports.streamReturningOperators={
289+
drop,
236290
filter,
237291
flatMap,
238292
map,
293+
take,
239294
};
240295

241296
module.exports.promiseReturningOperators={
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
'use strict';
2+
3+
constcommon=require('../common');
4+
const{
5+
Readable,
6+
}=require('stream');
7+
const{ deepStrictEqual, rejects, throws }=require('assert');
8+
9+
const{ from }=Readable;
10+
11+
constfromAsync=(...args)=>from(...args).map(async(x)=>x);
12+
13+
constnaturals=()=>from(asyncfunction*(){
14+
leti=1;
15+
while(true){
16+
yieldi++;
17+
}
18+
}());
19+
20+
{
21+
// Synchronous streams
22+
(async()=>{
23+
deepStrictEqual(awaitfrom([1,2,3]).drop(2).toArray(),[3]);
24+
deepStrictEqual(awaitfrom([1,2,3]).take(1).toArray(),[1]);
25+
deepStrictEqual(awaitfrom([]).drop(2).toArray(),[]);
26+
deepStrictEqual(awaitfrom([]).take(1).toArray(),[]);
27+
deepStrictEqual(awaitfrom([1,2,3]).drop(1).take(1).toArray(),[2]);
28+
deepStrictEqual(awaitfrom([1,2]).drop(0).toArray(),[1,2]);
29+
deepStrictEqual(awaitfrom([1,2]).take(0).toArray(),[]);
30+
})().then(common.mustCall());
31+
// Asynchronous streams
32+
(async()=>{
33+
deepStrictEqual(awaitfromAsync([1,2,3]).drop(2).toArray(),[3]);
34+
deepStrictEqual(awaitfromAsync([1,2,3]).take(1).toArray(),[1]);
35+
deepStrictEqual(awaitfromAsync([]).drop(2).toArray(),[]);
36+
deepStrictEqual(awaitfromAsync([]).take(1).toArray(),[]);
37+
deepStrictEqual(awaitfromAsync([1,2,3]).drop(1).take(1).toArray(),[2]);
38+
deepStrictEqual(awaitfromAsync([1,2]).drop(0).toArray(),[1,2]);
39+
deepStrictEqual(awaitfromAsync([1,2]).take(0).toArray(),[]);
40+
})().then(common.mustCall());
41+
// Infinite streams
42+
// Asynchronous streams
43+
(async()=>{
44+
deepStrictEqual(awaitnaturals().take(1).toArray(),[1]);
45+
deepStrictEqual(awaitnaturals().drop(1).take(1).toArray(),[2]);
46+
constnext10=[11,12,13,14,15,16,17,18,19,20];
47+
deepStrictEqual(awaitnaturals().drop(10).take(10).toArray(),next10);
48+
deepStrictEqual(awaitnaturals().take(5).take(1).toArray(),[1]);
49+
})().then(common.mustCall());
50+
}
51+
52+
{
53+
// Coercion
54+
(async()=>{
55+
// The spec made me do this ^^
56+
deepStrictEqual(awaitnaturals().take('cat').toArray(),[]);
57+
deepStrictEqual(awaitnaturals().take('2').toArray(),[1,2]);
58+
deepStrictEqual(awaitnaturals().take(true).toArray(),[1]);
59+
})().then(common.mustCall());
60+
}
61+
62+
{
63+
// Support for AbortSignal
64+
constac=newAbortController();
65+
rejects(
66+
Readable.from([1,2,3]).take(1,{signal: ac.signal}).toArray(),{
67+
name: 'AbortError',
68+
}).then(common.mustCall());
69+
rejects(
70+
Readable.from([1,2,3]).drop(1,{signal: ac.signal}).toArray(),{
71+
name: 'AbortError',
72+
}).then(common.mustCall());
73+
ac.abort();
74+
}
75+
76+
{
77+
// Support for AbortSignal, already aborted
78+
constsignal=AbortSignal.abort();
79+
rejects(
80+
Readable.from([1,2,3]).take(1,{ signal }).toArray(),{
81+
name: 'AbortError',
82+
}).then(common.mustCall());
83+
}
84+
85+
{
86+
// Error cases
87+
constinvalidArgs=[
88+
-1,
89+
-Infinity,
90+
-40,
91+
];
92+
93+
for(constexampleofinvalidArgs){
94+
throws(()=>from([]).take(example).toArray(),/ERR_OUT_OF_RANGE/);
95+
}
96+
}

0 commit comments

Comments
(0)