Skip to content

Commit d6bcf8b

Browse files
mafintoshtargos
authored andcommitted
stream: add auto-destroy mode
PR-URL: #22795 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Jeremiah Senkpiel <[email protected]>
1 parent 7a2134c commit d6bcf8b

File tree

5 files changed

+158
-12
lines changed

5 files changed

+158
-12
lines changed

‎doc/api/stream.md‎

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,6 +1493,11 @@ changes:
14931493
pr-url: https://github.com/nodejs/node/pull/18438
14941494
description: >
14951495
Add `emitClose` option to specify if `'close'` is emitted on destroy
1496+
- version: REPLACEME
1497+
pr-url: https://github.com/nodejs/node/pull/22795
1498+
description: >
1499+
Add `autoDestroy` option to automatically `destroy()` the stream
1500+
when it emits `'finish'` or errors
14961501
-->
14971502

14981503
*`options`{Object}
@@ -1521,6 +1526,8 @@ changes:
15211526
[`stream._destroy()`][writable-_destroy] method.
15221527
*`final`{Function} Implementation for the
15231528
[`stream._final()`][stream-_final] method.
1529+
*`autoDestroy`{boolean} Whether this stream should automatically call
1530+
`.destroy()` on itself after ending. **Default:**`false`.
15241531

15251532
```js
15261533
const{Writable } =require('stream');
@@ -1756,6 +1763,14 @@ Custom `Readable` streams *must* call the `new stream.Readable([options])`
17561763
constructor and implement the `readable._read()` method.
17571764

17581765
#### new stream.Readable([options])
1766+
<!-- YAML
1767+
changes:
1768+
- version: REPLACEME
1769+
pr-url: https://github.com/nodejs/node/pull/22795
1770+
description: >
1771+
Add `autoDestroy` option to automatically `destroy()` the stream
1772+
when it emits `'end'` or errors
1773+
-->
17591774

17601775
*`options`{Object}
17611776
*`highWaterMark`{number} The maximum [number of bytes][hwm-gotcha] to store
@@ -1770,6 +1785,8 @@ constructor and implement the `readable._read()` method.
17701785
method.
17711786
*`destroy`{Function} Implementation for the
17721787
[`stream._destroy()`][readable-_destroy] method.
1788+
*`autoDestroy`{boolean} Whether this stream should automatically call
1789+
`.destroy()` on itself after ending. **Default:**`false`.
17731790

17741791
```js
17751792
const{Readable } =require('stream');

‎lib/_stream_readable.js‎

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ let createReadableStreamAsyncIterator;
4646

4747
util.inherits(Readable,Stream);
4848

49+
const{ errorOrDestroy }=destroyImpl;
4950
constkProxyEvents=['error','close','destroy','pause','resume'];
5051

5152
functionprependListener(emitter,event,fn){
@@ -117,6 +118,9 @@ function ReadableState(options, stream, isDuplex){
117118
// Should close be emitted on destroy. Defaults to true.
118119
this.emitClose=options.emitClose!==false;
119120

121+
// Should .destroy() be called after 'end' (and potentially 'finish')
122+
this.autoDestroy=!!options.autoDestroy;
123+
120124
// has it been destroyed
121125
this.destroyed=false;
122126

@@ -235,7 +239,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck){
235239
if(!skipChunkCheck)
236240
er=chunkInvalid(state,chunk);
237241
if(er){
238-
stream.emit('error',er);
242+
errorOrDestroy(stream,er);
239243
}elseif(state.objectMode||chunk&&chunk.length>0){
240244
if(typeofchunk!=='string'&&
241245
!state.objectMode&&
@@ -245,11 +249,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck){
245249

246250
if(addToFront){
247251
if(state.endEmitted)
248-
stream.emit('error',newERR_STREAM_UNSHIFT_AFTER_END_EVENT());
252+
errorOrDestroy(stream,newERR_STREAM_UNSHIFT_AFTER_END_EVENT());
249253
else
250254
addChunk(stream,state,chunk,true);
251255
}elseif(state.ended){
252-
stream.emit('error',newERR_STREAM_PUSH_AFTER_EOF());
256+
errorOrDestroy(stream,newERR_STREAM_PUSH_AFTER_EOF());
253257
}elseif(state.destroyed){
254258
returnfalse;
255259
}else{
@@ -581,7 +585,7 @@ function maybeReadMore_(stream, state){
581585
// for virtual (non-string, non-buffer) streams, "length" is somewhat
582586
// arbitrary, and perhaps not very meaningful.
583587
Readable.prototype._read=function(n){
584-
this.emit('error',newERR_METHOD_NOT_IMPLEMENTED('_read()'));
588+
errorOrDestroy(this,newERR_METHOD_NOT_IMPLEMENTED('_read()'));
585589
};
586590

587591
Readable.prototype.pipe=function(dest,pipeOpts){
@@ -687,7 +691,7 @@ Readable.prototype.pipe = function(dest, pipeOpts){
687691
unpipe();
688692
dest.removeListener('error',onerror);
689693
if(EE.listenerCount(dest,'error')===0)
690-
dest.emit('error',er);
694+
errorOrDestroy(dest,er);
691695
}
692696

693697
// Make sure our error handler is attached before userland ones.
@@ -1092,5 +1096,14 @@ function endReadableNT(state, stream){
10921096
state.endEmitted=true;
10931097
stream.readable=false;
10941098
stream.emit('end');
1099+
1100+
if(state.autoDestroy){
1101+
// In case of duplex streams we need a way to detect
1102+
// if the writable side is ready for autoDestroy as well
1103+
constwState=stream._writableState;
1104+
if(!wState||(wState.autoDestroy&&wState.finished)){
1105+
stream.destroy();
1106+
}
1107+
}
10951108
}
10961109
}

‎lib/_stream_writable.js‎

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ const{
4545
ERR_UNKNOWN_ENCODING
4646
}=require('internal/errors').codes;
4747

48+
const{ errorOrDestroy }=destroyImpl;
49+
4850
util.inherits(Writable,Stream);
4951

5052
functionnop(){}
@@ -147,6 +149,9 @@ function WritableState(options, stream, isDuplex){
147149
// Should close be emitted on destroy. Defaults to true.
148150
this.emitClose=options.emitClose!==false;
149151

152+
// Should .destroy() be called after 'finish' (and potentially 'end')
153+
this.autoDestroy=!!options.autoDestroy;
154+
150155
// count buffered requests
151156
this.bufferedRequestCount=0;
152157

@@ -235,14 +240,14 @@ function Writable(options){
235240

236241
// Otherwise people can pipe Writable streams, which is just wrong.
237242
Writable.prototype.pipe=function(){
238-
this.emit('error',newERR_STREAM_CANNOT_PIPE());
243+
errorOrDestroy(this,newERR_STREAM_CANNOT_PIPE());
239244
};
240245

241246

242247
functionwriteAfterEnd(stream,cb){
243248
varer=newERR_STREAM_WRITE_AFTER_END();
244249
// TODO: defer error events consistently everywhere, not just the cb
245-
stream.emit('error',er);
250+
errorOrDestroy(stream,er);
246251
process.nextTick(cb,er);
247252
}
248253

@@ -258,7 +263,7 @@ function validChunk(stream, state, chunk, cb){
258263
er=newERR_INVALID_ARG_TYPE('chunk',['string','Buffer'],chunk);
259264
}
260265
if(er){
261-
stream.emit('error',er);
266+
errorOrDestroy(stream,er);
262267
process.nextTick(cb,er);
263268
returnfalse;
264269
}
@@ -422,13 +427,13 @@ function onwriteError(stream, state, sync, er, cb){
422427
// after error
423428
process.nextTick(finishMaybe,stream,state);
424429
stream._writableState.errorEmitted=true;
425-
stream.emit('error',er);
430+
errorOrDestroy(stream,er);
426431
}else{
427432
// the caller expect this to happen before if
428433
// it is async
429434
cb(er);
430435
stream._writableState.errorEmitted=true;
431-
stream.emit('error',er);
436+
errorOrDestroy(stream,er);
432437
// this can emit finish, but finish must
433438
// always follow error
434439
finishMaybe(stream,state);
@@ -612,7 +617,7 @@ function callFinal(stream, state){
612617
stream._final((err)=>{
613618
state.pendingcb--;
614619
if(err){
615-
stream.emit('error',err);
620+
errorOrDestroy(stream,err);
616621
}
617622
state.prefinished=true;
618623
stream.emit('prefinish');
@@ -639,6 +644,15 @@ function finishMaybe(stream, state){
639644
if(state.pendingcb===0){
640645
state.finished=true;
641646
stream.emit('finish');
647+
648+
if(state.autoDestroy){
649+
// In case of duplex streams we need a way to detect
650+
// if the readable side is ready for autoDestroy as well
651+
constrState=stream._readableState;
652+
if(!rState||(rState.autoDestroy&&rState.endEmitted)){
653+
stream.destroy();
654+
}
655+
}
642656
}
643657
}
644658
returnneed;

‎lib/internal/streams/destroy.js‎

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,25 @@ function emitErrorNT(self, err){
8282
self.emit('error',err);
8383
}
8484

85+
functionerrorOrDestroy(stream,err){
86+
// We have tests that rely on errors being emitted
87+
// in the same tick, so changing this is semver major.
88+
// For now when you opt-in to autoDestroy we allow
89+
// the error to be emitted nextTick. In a future
90+
// semver major update we should change the default to this.
91+
92+
constrState=stream._readableState;
93+
constwState=stream._writableState;
94+
95+
if((rState&&rState.autoDestroy)||(wState&&wState.autoDestroy))
96+
stream.destroy(err);
97+
else
98+
stream.emit('error',err);
99+
}
100+
101+
85102
module.exports={
86103
destroy,
87-
undestroy
104+
undestroy,
105+
errorOrDestroy
88106
};
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
'use strict';
2+
constcommon=require('../common');
3+
conststream=require('stream');
4+
constassert=require('assert');
5+
6+
{
7+
constr=newstream.Readable({
8+
autoDestroy: true,
9+
read(){
10+
this.push('hello');
11+
this.push('world');
12+
this.push(null);
13+
},
14+
destroy: common.mustCall((err,cb)=>cb())
15+
});
16+
17+
letended=false;
18+
19+
r.resume();
20+
21+
r.on('end',common.mustCall(()=>{
22+
ended=true;
23+
}));
24+
25+
r.on('close',common.mustCall(()=>{
26+
assert(ended);
27+
}));
28+
}
29+
30+
{
31+
constw=newstream.Writable({
32+
autoDestroy: true,
33+
write(data,enc,cb){
34+
cb(null);
35+
},
36+
destroy: common.mustCall((err,cb)=>cb())
37+
});
38+
39+
letfinished=false;
40+
41+
w.write('hello');
42+
w.write('world');
43+
w.end();
44+
45+
w.on('finish',common.mustCall(()=>{
46+
finished=true;
47+
}));
48+
49+
w.on('close',common.mustCall(()=>{
50+
assert(finished);
51+
}));
52+
}
53+
54+
{
55+
constt=newstream.Transform({
56+
autoDestroy: true,
57+
transform(data,enc,cb){
58+
cb(null,data);
59+
},
60+
destroy: common.mustCall((err,cb)=>cb())
61+
});
62+
63+
letended=false;
64+
letfinished=false;
65+
66+
t.write('hello');
67+
t.write('world');
68+
t.end();
69+
70+
t.resume();
71+
72+
t.on('end',common.mustCall(()=>{
73+
ended=true;
74+
}));
75+
76+
t.on('finish',common.mustCall(()=>{
77+
finished=true;
78+
}));
79+
80+
t.on('close',common.mustCall(()=>{
81+
assert(ended);
82+
assert(finished);
83+
}));
84+
}

0 commit comments

Comments
(0)