Skip to content

Commit 197ba21

Browse files
benjamingrdanielleadams
authored andcommitted
stream: support abort signal
PR-URL: #36061 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Robert Nagy <[email protected]>
1 parent b39d150 commit 197ba21

File tree

10 files changed

+183
-13
lines changed

10 files changed

+183
-13
lines changed

‎doc/api/stream.md‎

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ There are four fundamental stream types within Node.js:
4545
is written and read (for example, [`zlib.createDeflate()`][]).
4646

4747
Additionally, this module includes the utility functions
48-
[`stream.pipeline()`][], [`stream.finished()`][] and
49-
[`stream.Readable.from()`][].
48+
[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][]
49+
and [`stream.addAbortSignal()`][].
5050

5151
### Streams Promises API
5252
<!-- YAML
@@ -1799,6 +1799,55 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
17991799
the strings or buffers be iterated to match the other streams semantics
18001800
for performance reasons.
18011801

1802+
### `stream.addAbortSignal(signal, stream)`
1803+
<!-- YAML
1804+
added: REPLACEME
1805+
-->
1806+
*`signal`{AbortSignal} A signal representing possible cancellation
1807+
*`stream`{Stream} a stream to attach a signal to
1808+
1809+
Attaches an AbortSignal to a readable or writeable stream. This lets code
1810+
control stream destruction using an `AbortController`.
1811+
1812+
Calling `abort` on the `AbortController` corresponding to the passed
1813+
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
1814+
on the stream.
1815+
1816+
```js
1817+
constfs=require('fs');
1818+
1819+
constcontroller=newAbortController();
1820+
constread=addAbortSignal(
1821+
controller.signal,
1822+
fs.createReadStream(('object.json'))
1823+
);
1824+
// Later, abort the operation closing the stream
1825+
controller.abort();
1826+
```
1827+
1828+
Or using an `AbortSignal` with a readable stream as an async iterable:
1829+
1830+
```js
1831+
constcontroller=newAbortController();
1832+
setTimeout(() =>controller.abort(), 10_000); // set a timeout
1833+
conststream=addAbortSignal(
1834+
controller.signal,
1835+
fs.createReadStream(('object.json'))
1836+
);
1837+
(async () =>{
1838+
try{
1839+
forawait (constchunkofstream){
1840+
awaitprocess(chunk);
1841+
}
1842+
} catch (e){
1843+
if (e.name==='AbortError'){
1844+
// The operation was cancelled
1845+
} else{
1846+
throw e;
1847+
}
1848+
}
1849+
})();
1850+
```
18021851
## API for stream implementers
18031852

18041853
<!--type=misc-->
@@ -3123,6 +3172,7 @@ contain multi-byte characters.
31233172
[`stream.finished()`]: #stream_stream_finished_stream_options_callback
31243173
[`stream.pipe()`]: #stream_readable_pipe_destination_options
31253174
[`stream.pipeline()`]: #stream_stream_pipeline_source_transforms_destination_callback
3175+
[`stream.addAbortSignal()`]: #stream_stream_addabortsignal_signal_stream
31263176
[`stream.uncork()`]: #stream_writable_uncork
31273177
[`stream.unpipe()`]: #stream_readable_unpipe_destination
31283178
[`stream.wrap()`]: #stream_readable_wrap_stream

‎lib/_http_client.js‎

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ const{Buffer } = require('buffer');
5151
const{ defaultTriggerAsyncIdScope }=require('internal/async_hooks');
5252
const{URL, urlToOptions, searchParamsSymbol }=require('internal/url');
5353
const{ kOutHeaders, kNeedDrain }=require('internal/http');
54-
const{AbortError,connResetException, codes }=require('internal/errors');
54+
const{ connResetException, codes }=require('internal/errors');
5555
const{
5656
ERR_HTTP_HEADERS_SENT,
5757
ERR_INVALID_ARG_TYPE,
@@ -61,14 +61,15 @@ const{
6161
}=codes;
6262
const{
6363
validateInteger,
64-
validateAbortSignal,
6564
}=require('internal/validators');
6665
const{ getTimerDuration }=require('internal/timers');
6766
const{
6867
DTRACE_HTTP_CLIENT_REQUEST,
6968
DTRACE_HTTP_CLIENT_RESPONSE
7069
}=require('internal/dtrace');
7170

71+
const{ addAbortSignal }=require('stream');
72+
7273
constINVALID_PATH_REGEX=/[^\u0021-\u00ff]/;
7374
constkError=Symbol('kError');
7475

@@ -174,12 +175,7 @@ function ClientRequest(input, options, cb){
174175

175176
constsignal=options.signal;
176177
if(signal){
177-
validateAbortSignal(signal,'options.signal');
178-
constlistener=(e)=>this.destroy(newAbortError());
179-
signal.addEventListener('abort',listener);
180-
this.once('close',()=>{
181-
signal.removeEventListener('abort',listener);
182-
});
178+
addAbortSignal(signal,this);
183179
}
184180
letmethod=options.method;
185181
constmethodIsString=(typeofmethod==='string');
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
'use strict';
2+
3+
const{
4+
AbortError,
5+
codes,
6+
}=require('internal/errors');
7+
8+
consteos=require('internal/streams/end-of-stream');
9+
const{ERR_INVALID_ARG_TYPE}=codes;
10+
11+
// This method is inlined here for readable-stream
12+
// https://github.com/nodejs/node/pull/36061#discussion_r533718029
13+
constvalidateAbortSignal=(signal,name)=>{
14+
if(signal!==undefined&&
15+
(signal===null||
16+
typeofsignal!=='object'||
17+
!('aborted'insignal))){
18+
thrownewERR_INVALID_ARG_TYPE(name,'AbortSignal',signal);
19+
}
20+
};
21+
22+
functionisStream(obj){
23+
return!!(obj&&typeofobj.pipe==='function');
24+
}
25+
26+
module.exports=functionaddAbortSignal(signal,stream){
27+
validateAbortSignal(signal,'signal');
28+
if(!isStream(stream)){
29+
thrownewERR_INVALID_ARG_TYPE('stream','stream.Stream',stream);
30+
}
31+
constonAbort=()=>{
32+
stream.destroy(newAbortError());
33+
};
34+
if(signal.aborted){
35+
onAbort();
36+
}else{
37+
signal.addEventListener('abort',onAbort);
38+
eos(stream,()=>signal.removeEventListener('abort',onAbort));
39+
}
40+
returnstream;
41+
};

‎lib/internal/streams/readable.js‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ const{
5050
getHighWaterMark,
5151
getDefaultHighWaterMark
5252
}=require('internal/streams/state');
53+
5354
const{
5455
ERR_INVALID_ARG_TYPE,
5556
ERR_STREAM_PUSH_AFTER_EOF,

‎lib/stream.js‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex');
4343
Stream.Transform=require('internal/streams/transform');
4444
Stream.PassThrough=require('internal/streams/passthrough');
4545
Stream.pipeline=pipeline;
46+
Stream.addAbortSignal=require('internal/streams/add-abort-signal');
4647
Stream.finished=eos;
4748

4849
functionlazyLoadPromises(){

‎node.gyp‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@
245245
'lib/internal/worker/js_transferable.js',
246246
'lib/internal/watchdog.js',
247247
'lib/internal/streams/lazy_transform.js',
248+
'lib/internal/streams/add-abort-signal.js',
248249
'lib/internal/streams/buffer_list.js',
249250
'lib/internal/streams/duplexpair.js',
250251
'lib/internal/streams/from.js',

‎test/parallel/test-bootstrap-modules.js‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ const expectedModules = new Set([
7878
'NativeModule internal/process/warning',
7979
'NativeModule internal/querystring',
8080
'NativeModule internal/source_map/source_map_cache',
81+
'NativeModule internal/streams/add-abort-signal',
8182
'NativeModule internal/streams/buffer_list',
8283
'NativeModule internal/streams/destroy',
8384
'NativeModule internal/streams/duplex',

‎test/parallel/test-stream-pipeline.js‎

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ const{
88
Transform,
99
pipeline,
1010
PassThrough,
11-
Duplex
11+
Duplex,
12+
addAbortSignal,
1213
}=require('stream');
1314
constassert=require('assert');
1415
consthttp=require('http');
@@ -1261,3 +1262,32 @@ const net = require('net');
12611262
()=>common.mustNotCall(),
12621263
);
12631264
}
1265+
1266+
1267+
{
1268+
constac=newAbortController();
1269+
constr=Readable.from(asyncfunction*(){
1270+
for(leti=0;i<10;i++){
1271+
awaitPromise.resolve();
1272+
yieldString(i);
1273+
if(i===5){
1274+
ac.abort();
1275+
}
1276+
}
1277+
}());
1278+
letres='';
1279+
constw=newWritable({
1280+
write(chunk,encoding,callback){
1281+
res+=chunk;
1282+
callback();
1283+
}
1284+
});
1285+
constcb=common.mustCall((err)=>{
1286+
assert.strictEqual(err.name,'AbortError');
1287+
assert.strictEqual(res,'012345');
1288+
assert.strictEqual(w.destroyed,true);
1289+
assert.strictEqual(r.destroyed,true);
1290+
assert.strictEqual(pipelined.destroyed,true);
1291+
});
1292+
constpipelined=addAbortSignal(ac.signal,pipeline([r,w],cb));
1293+
}

‎test/parallel/test-stream-readable-destroy.js‎

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict';
22

33
constcommon=require('../common');
4-
const{ Readable }=require('stream');
4+
const{ Readable, addAbortSignal}=require('stream');
55
constassert=require('assert');
66

77
{
@@ -268,3 +268,38 @@ const assert = require('assert');
268268
}));
269269
read.resume();
270270
}
271+
272+
{
273+
constcontroller=newAbortController();
274+
constread=addAbortSignal(controller.signal,newReadable({
275+
read(){
276+
this.push('asd');
277+
},
278+
}));
279+
280+
read.on('error',common.mustCall((e)=>{
281+
assert.strictEqual(e.name,'AbortError');
282+
}));
283+
controller.abort();
284+
read.on('data',common.mustNotCall());
285+
}
286+
287+
{
288+
constcontroller=newAbortController();
289+
constread=addAbortSignal(controller.signal,newReadable({
290+
objectMode: true,
291+
read(){
292+
returnfalse;
293+
}
294+
}));
295+
read.push('asd');
296+
297+
read.on('error',common.mustCall((e)=>{
298+
assert.strictEqual(e.name,'AbortError');
299+
}));
300+
assert.rejects((async()=>{
301+
/* eslint-disable-next-line no-unused-vars */
302+
forawait(constchunkofread){}
303+
})(),/AbortError/);
304+
setTimeout(()=>controller.abort(),0);
305+
}

‎test/parallel/test-stream-writable-destroy.js‎

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict';
22

33
constcommon=require('../common');
4-
const{ Writable }=require('stream');
4+
const{ Writable, addAbortSignal}=require('stream');
55
constassert=require('assert');
66

77
{
@@ -417,3 +417,17 @@ const assert = require('assert');
417417
}));
418418
write.write('asd');
419419
}
420+
421+
{
422+
constac=newAbortController();
423+
constwrite=addAbortSignal(ac.signal,newWritable({
424+
write(chunk,enc,cb){cb();}
425+
}));
426+
427+
write.on('error',common.mustCall((e)=>{
428+
assert.strictEqual(e.name,'AbortError');
429+
assert.strictEqual(write.destroyed,true);
430+
}));
431+
write.write('asd');
432+
ac.abort();
433+
}

0 commit comments

Comments
(0)