Skip to content

Commit 3dd82b1

Browse files
atlowChemiruyadorno
authored andcommitted
stream: use addAbortListener
PR-URL: #48550 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent cb51ef2 commit 3dd82b1

File tree

5 files changed

+39
-26
lines changed

5 files changed

+39
-26
lines changed

‎lib/internal/streams/add-abort-signal.js‎

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
'use strict';
22

3+
const{
4+
SymbolDispose,
5+
}=primordials;
6+
37
const{
48
AbortError,
59
codes,
@@ -13,6 +17,7 @@ const{
1317

1418
consteos=require('internal/streams/end-of-stream');
1519
const{ERR_INVALID_ARG_TYPE}=codes;
20+
letaddAbortListener;
1621

1722
// This method is inlined here for readable-stream
1823
// It also does not allow for signal to not exist on the stream
@@ -46,8 +51,9 @@ module.exports.addAbortSignalNoValidate = function(signal, stream){
4651
if(signal.aborted){
4752
onAbort();
4853
}else{
49-
signal.addEventListener('abort',onAbort);
50-
eos(stream,()=>signal.removeEventListener('abort',onAbort));
54+
addAbortListener??=require('events').addAbortListener;
55+
constdisposable=addAbortListener(signal,onAbort);
56+
eos(stream,disposable[SymbolDispose]);
5157
}
5258
returnstream;
5359
};

‎lib/internal/streams/end-of-stream.js‎

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ const{
2222
validateBoolean,
2323
}=require('internal/validators');
2424

25-
const{ Promise, PromisePrototypeThen }=primordials;
25+
const{
26+
Promise,
27+
PromisePrototypeThen,
28+
SymbolDispose,
29+
}=primordials;
2630

2731
const{
2832
isClosed,
@@ -40,6 +44,7 @@ const{
4044
willEmitClose: _willEmitClose,
4145
kIsClosedPromise,
4246
}=require('internal/streams/utils');
47+
letaddAbortListener;
4348

4449
functionisRequest(stream){
4550
returnstream.setHeader&&typeofstream.abort==='function';
@@ -249,12 +254,13 @@ function eos(stream, options, callback){
249254
if(options.signal.aborted){
250255
process.nextTick(abort);
251256
}else{
257+
addAbortListener??=require('events').addAbortListener;
258+
constdisposable=addAbortListener(options.signal,abort);
252259
constoriginalCallback=callback;
253260
callback=once((...args)=>{
254-
options.signal.removeEventListener('abort',abort);
261+
disposable[SymbolDispose]();
255262
originalCallback.apply(stream,args);
256263
});
257-
options.signal.addEventListener('abort',abort);
258264
}
259265
}
260266

@@ -272,12 +278,13 @@ function eosWeb(stream, options, callback){
272278
if(options.signal.aborted){
273279
process.nextTick(abort);
274280
}else{
281+
addAbortListener??=require('events').addAbortListener;
282+
constdisposable=addAbortListener(options.signal,abort);
275283
constoriginalCallback=callback;
276284
callback=once((...args)=>{
277-
options.signal.removeEventListener('abort',abort);
285+
disposable[SymbolDispose]();
278286
originalCallback.apply(stream,args);
279287
});
280-
options.signal.addEventListener('abort',abort);
281288
}
282289
}
283290
constresolverFn=(...args)=>{

‎lib/internal/streams/operators.js‎

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict';
22

3-
const{ AbortController }=require('internal/abort_controller');
3+
const{ AbortController, AbortSignal}=require('internal/abort_controller');
44

55
const{
66
codes: {
@@ -16,7 +16,7 @@ const{
1616
validateInteger,
1717
validateObject,
1818
}=require('internal/validators');
19-
const{ kWeakHandler }=require('internal/event_target');
19+
const{ kWeakHandler, kResistStopPropagation}=require('internal/event_target');
2020
const{ finished }=require('internal/streams/end-of-stream');
2121
conststaticCompose=require('internal/streams/compose');
2222
const{
@@ -27,6 +27,7 @@ const{deprecate } = require('internal/util');
2727

2828
const{
2929
ArrayPrototypePush,
30+
Boolean,
3031
MathFloor,
3132
Number,
3233
NumberIsNaN,
@@ -84,19 +85,11 @@ function map(fn, options){
8485
validateInteger(concurrency,'concurrency',1);
8586

8687
returnasyncfunction*map(){
87-
constac=newAbortController();
88+
constsignal=AbortSignal.any([options?.signal].filter(Boolean));
8889
conststream=this;
8990
constqueue=[];
90-
constsignal=ac.signal;
9191
constsignalOpt={ signal };
9292

93-
constabort=()=>ac.abort();
94-
if(options?.signal?.aborted){
95-
abort();
96-
}
97-
98-
options?.signal?.addEventListener('abort',abort);
99-
10093
letnext;
10194
letresume;
10295
letdone=false;
@@ -153,7 +146,6 @@ function map(fn, options){
153146
next();
154147
next=null;
155148
}
156-
options?.signal?.removeEventListener('abort',abort);
157149
}
158150
}
159151

@@ -188,8 +180,6 @@ function map(fn, options){
188180
});
189181
}
190182
}finally{
191-
ac.abort();
192-
193183
done=true;
194184
if(resume){
195185
resume();
@@ -301,7 +291,7 @@ async function reduce(reducer, initialValue, options){
301291
constac=newAbortController();
302292
constsignal=ac.signal;
303293
if(options?.signal){
304-
constopts={once: true,[kWeakHandler]: this};
294+
constopts={once: true,[kWeakHandler]: this,[kResistStopPropagation]: true};
305295
options.signal.addEventListener('abort',()=>ac.abort(),opts);
306296
}
307297
letgotAnyItemFromStream=false;

‎lib/internal/streams/pipeline.js‎

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const{
77
ArrayIsArray,
88
Promise,
99
SymbolAsyncIterator,
10+
SymbolDispose,
1011
}=primordials;
1112

1213
consteos=require('internal/streams/end-of-stream');
@@ -44,6 +45,7 @@ const{AbortController } = require('internal/abort_controller');
4445

4546
letPassThrough;
4647
letReadable;
48+
letaddAbortListener;
4749

4850
functiondestroyer(stream,reading,writing){
4951
letfinished=false;
@@ -206,7 +208,11 @@ function pipelineImpl(streams, callback, opts){
206208
finishImpl(newAbortError());
207209
}
208210

209-
outerSignal?.addEventListener('abort',abort);
211+
addAbortListener??=require('events').addAbortListener;
212+
letdisposable;
213+
if(outerSignal){
214+
disposable=addAbortListener(outerSignal,abort);
215+
}
210216

211217
leterror;
212218
letvalue;
@@ -231,7 +237,7 @@ function pipelineImpl(streams, callback, opts){
231237
destroys.shift()(error);
232238
}
233239

234-
outerSignal?.removeEventListener('abort',abort);
240+
disposable?.[SymbolDispose]();
235241
ac.abort();
236242

237243
if(final){

‎lib/internal/webstreams/readablestream.js‎

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const{
2222
SafePromiseAll,
2323
Symbol,
2424
SymbolAsyncIterator,
25+
SymbolDispose,
2526
SymbolToStringTag,
2627
Uint8Array,
2728
}=primordials;
@@ -140,6 +141,7 @@ const kRelease = Symbol('kRelease');
140141

141142
letreleasedError;
142143
letreleasingError;
144+
letaddAbortListener;
143145

144146
constuserModuleRegExp=/^{4}at(?:[^/\\(]+\()(?!node:(.+):\d+:\d+\)$).*/gm;
145147

@@ -1259,6 +1261,7 @@ function readableStreamPipeTo(
12591261

12601262
letreader;
12611263
letwriter;
1264+
letdisposable;
12621265
// Both of these can throw synchronously. We want to capture
12631266
// the error and return a rejected promise instead.
12641267
try{
@@ -1291,7 +1294,7 @@ function readableStreamPipeTo(
12911294
writableStreamDefaultWriterRelease(writer);
12921295
readableStreamReaderGenericRelease(reader);
12931296
if(signal!==undefined)
1294-
signal.removeEventListener('abort',abortAlgorithm);
1297+
disposable?.[SymbolDispose]();
12951298
if(rejected)
12961299
promise.reject(error);
12971300
else
@@ -1418,7 +1421,8 @@ function readableStreamPipeTo(
14181421
abortAlgorithm();
14191422
returnpromise.promise;
14201423
}
1421-
signal.addEventListener('abort',abortAlgorithm,{once: true});
1424+
addAbortListener??=require('events').addAbortListener;
1425+
disposable=addAbortListener(signal,abortAlgorithm);
14221426
}
14231427

14241428
setPromiseHandled(run());

0 commit comments

Comments
(0)