Skip to content

Commit 6206957

Browse files
ronagUlisesGascon
authored andcommitted
stream: optimize creation
Refs: nodejs/performance#79 PR-URL: #50337 Reviewed-By: Yagiz Nizipli <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Moshe Atlow <[email protected]> Reviewed-By: Vinícius Lourenço Claro Cardoso <[email protected]> Reviewed-By: Luigi Pinca <[email protected]>
1 parent 4aaaff4 commit 6206957

File tree

4 files changed

+83
-57
lines changed

4 files changed

+83
-57
lines changed

‎lib/internal/streams/duplex.js‎

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,17 @@ const{
3535

3636
module.exports=Duplex;
3737

38+
constStream=require('internal/streams/legacy').Stream;
3839
constReadable=require('internal/streams/readable');
3940
constWritable=require('internal/streams/writable');
4041

42+
const{
43+
addAbortSignal,
44+
}=require('internal/streams/add-abort-signal');
45+
46+
constdestroyImpl=require('internal/streams/destroy');
47+
const{ kOnConstructed }=require('internal/streams/utils');
48+
4149
ObjectSetPrototypeOf(Duplex.prototype,Readable.prototype);
4250
ObjectSetPrototypeOf(Duplex,Readable);
4351

@@ -55,8 +63,8 @@ function Duplex(options){
5563
if(!(thisinstanceofDuplex))
5664
returnnewDuplex(options);
5765

58-
Readable.call(this,options);
59-
Writable.call(this,options);
66+
this._readableState=newReadable.ReadableState(options,this,true);
67+
this._writableState=newWritable.WritableState(options,this,true);
6068

6169
if(options){
6270
this.allowHalfOpen=options.allowHalfOpen!==false;
@@ -73,9 +81,39 @@ function Duplex(options){
7381
this._writableState.ended=true;
7482
this._writableState.finished=true;
7583
}
84+
85+
if(typeofoptions.read==='function')
86+
this._read=options.read;
87+
88+
if(typeofoptions.write==='function')
89+
this._write=options.write;
90+
91+
if(typeofoptions.writev==='function')
92+
this._writev=options.writev;
93+
94+
if(typeofoptions.destroy==='function')
95+
this._destroy=options.destroy;
96+
97+
if(typeofoptions.final==='function')
98+
this._final=options.final;
99+
100+
if(typeofoptions.construct==='function')
101+
this._construct=options.construct;
102+
103+
if(options.signal)
104+
addAbortSignal(options.signal,this);
76105
}else{
77106
this.allowHalfOpen=true;
78107
}
108+
109+
Stream.call(this,options);
110+
111+
if(this._construct!=null){
112+
destroyImpl.construct(this,()=>{
113+
this._readableState[kOnConstructed](this);
114+
this._writableState[kOnConstructed](this);
115+
});
116+
}
79117
}
80118

81119
ObjectDefineProperties(Duplex.prototype,{

‎lib/internal/streams/readable.js‎

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const{
7171
AbortError,
7272
}=require('internal/errors');
7373
const{ validateObject }=require('internal/validators');
74+
const{ kOnConstructed }=require('internal/streams/utils');
7475

7576
constkState=Symbol('kState');
7677

@@ -251,20 +252,14 @@ ObjectDefineProperties(ReadableState.prototype,{
251252

252253

253254
functionReadableState(options,stream,isDuplex){
254-
// Duplex streams are both readable and writable, but share
255-
// the same options object.
256-
// However, some cases require setting options to different
257-
// values for the readable and the writable sides of the duplex stream.
258-
// These options can be provided separately as readableXXX and writableXXX.
259-
if(typeofisDuplex!=='boolean')
260-
isDuplex=streaminstanceofStream.Duplex;
261-
262255
// Bit map field to store ReadableState more effciently with 1 bit per field
263256
// instead of a V8 slot per field.
264257
this[kState]=kEmitClose|kAutoDestroy|kConstructed|kSync;
258+
265259
// Object stream flag. Used to make read(n) ignore n and to
266260
// make all the buffer merging and length checks go away.
267-
if(options&&options.objectMode)this[kState]|=kObjectMode;
261+
if(options&&options.objectMode)
262+
this[kState]|=kObjectMode;
268263

269264
if(isDuplex&&options&&options.readableObjectMode)
270265
this[kState]|=kObjectMode;
@@ -310,16 +305,17 @@ function ReadableState(options, stream, isDuplex){
310305
}
311306
}
312307

308+
ReadableState.prototype[kOnConstructed]=functiononConstructed(stream){
309+
if((this[kState]&kNeedReadable)!==0){
310+
maybeReadMore(stream,this);
311+
}
312+
};
313313

314314
functionReadable(options){
315315
if(!(thisinstanceofReadable))
316316
returnnewReadable(options);
317317

318-
// Checking for a Stream.Duplex instance is faster here instead of inside
319-
// the ReadableState constructor, at least with V8 6.5.
320-
constisDuplex=thisinstanceofStream.Duplex;
321-
322-
this._readableState=newReadableState(options,this,isDuplex);
318+
this._readableState=newReadableState(options,this,false);
323319

324320
if(options){
325321
if(typeofoptions.read==='function')
@@ -331,17 +327,17 @@ function Readable(options){
331327
if(typeofoptions.construct==='function')
332328
this._construct=options.construct;
333329

334-
if(options.signal&&!isDuplex)
330+
if(options.signal)
335331
addAbortSignal(options.signal,this);
336332
}
337333

338334
Stream.call(this,options);
339335

340-
destroyImpl.construct(this,()=>{
341-
if(this._readableState.needReadable){
342-
maybeReadMore(this,this._readableState);
343-
}
344-
});
336+
if(this._construct!=null){
337+
destroyImpl.construct(this,()=>{
338+
this._readableState[kOnConstructed](this);
339+
});
340+
}
345341
}
346342

347343
Readable.prototype.destroy=destroyImpl.destroy;

‎lib/internal/streams/utils.js‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const{
44
SymbolAsyncIterator,
55
SymbolIterator,
66
SymbolFor,
7+
Symbol,
78
}=primordials;
89

910
// We need to use SymbolFor to make these globally available
@@ -16,6 +17,8 @@ const kIsReadable = SymbolFor('nodejs.stream.readable');
1617
constkIsWritable=SymbolFor('nodejs.stream.writable');
1718
constkIsDisturbed=SymbolFor('nodejs.stream.disturbed');
1819

20+
constkOnConstructed=Symbol('kOnConstructed');
21+
1922
constkIsClosedPromise=SymbolFor('nodejs.webstream.isClosedPromise');
2023
constkControllerErrorFunction=SymbolFor('nodejs.webstream.controllerErrorFunction');
2124

@@ -303,6 +306,7 @@ function isErrored(stream){
303306
}
304307

305308
module.exports={
309+
kOnConstructed,
306310
isDestroyed,
307311
kIsDestroyed,
308312
isDisturbed,

‎lib/internal/streams/writable.js‎

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const EE = require('events');
4444
constStream=require('internal/streams/legacy').Stream;
4545
const{ Buffer }=require('buffer');
4646
constdestroyImpl=require('internal/streams/destroy');
47+
const{ kOnConstructed }=require('internal/streams/utils');
4748

4849
const{
4950
addAbortSignal,
@@ -290,20 +291,15 @@ ObjectDefineProperties(WritableState.prototype,{
290291
});
291292

292293
functionWritableState(options,stream,isDuplex){
293-
// Duplex streams are both readable and writable, but share
294-
// the same options object.
295-
// However, some cases require setting options to different
296-
// values for the readable and the writable sides of the duplex stream,
297-
// e.g. options.readableObjectMode vs. options.writableObjectMode, etc.
298-
if(typeofisDuplex!=='boolean')
299-
isDuplex=streaminstanceofStream.Duplex;
300-
301294
// Bit map field to store WritableState more effciently with 1 bit per field
302295
// instead of a V8 slot per field.
303296
this[kState]=kSync|kConstructed|kEmitClose|kAutoDestroy;
304297

305-
if(options&&options.objectMode)this[kState]|=kObjectMode;
306-
if(isDuplex&&options&&options.writableObjectMode)this[kState]|=kObjectMode;
298+
if(options&&options.objectMode)
299+
this[kState]|=kObjectMode;
300+
301+
if(isDuplex&&options&&options.writableObjectMode)
302+
this[kState]|=kObjectMode;
307303

308304
// The point at which write() starts returning false
309305
// Note: 0 is a valid value, means that we always return false if
@@ -323,7 +319,7 @@ function WritableState(options, stream, isDuplex){
323319
// Crypto is kind of old and crusty. Historically, its default string
324320
// encoding is 'binary' so we have to make this configurable.
325321
// Everything else in the universe uses 'utf8', though.
326-
constdefaultEncoding=options?.defaultEncoding;
322+
constdefaultEncoding=options ? options.defaultEncoding : null;
327323
if(defaultEncoding==null||defaultEncoding==='utf8'||defaultEncoding==='utf-8'){
328324
this[kState]|=kDefaultUTF8Encoding;
329325
}elseif(Buffer.isEncoding(defaultEncoding)){
@@ -372,23 +368,21 @@ ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount',{
372368
},
373369
});
374370

375-
functionWritable(options){
376-
// Writable ctor is applied to Duplexes, too.
377-
// `realHasInstance` is necessary because using plain `instanceof`
378-
// would return false, as no `_writableState` property is attached.
379-
380-
// Trying to use the custom `instanceof` for Writable here will also break the
381-
// Node.js LazyTransform implementation, which has a non-trivial getter for
382-
// `_writableState` that would lead to infinite recursion.
371+
WritableState.prototype[kOnConstructed]=functiononConstructed(stream){
372+
if((this[kState]&kWriting)===0){
373+
clearBuffer(stream,this);
374+
}
383375

384-
// Checking for a Stream.Duplex instance is faster here instead of inside
385-
// the WritableState constructor, at least with V8 6.5.
386-
constisDuplex=(thisinstanceofStream.Duplex);
376+
if((this[kState]&kEnding)!==0){
377+
finishMaybe(stream,this);
378+
}
379+
};
387380

388-
if(!isDuplex&&!FunctionPrototypeSymbolHasInstance(Writable,this))
381+
functionWritable(options){
382+
if(!(thisinstanceofWritable))
389383
returnnewWritable(options);
390384

391-
this._writableState=newWritableState(options,this,isDuplex);
385+
this._writableState=newWritableState(options,this,false);
392386

393387
if(options){
394388
if(typeofoptions.write==='function')
@@ -412,17 +406,11 @@ function Writable(options){
412406

413407
Stream.call(this,options);
414408

415-
destroyImpl.construct(this,()=>{
416-
conststate=this._writableState;
417-
418-
if((state[kState]&kWriting)===0){
419-
clearBuffer(this,state);
420-
}
421-
422-
if((state[kState]&kEnding)!==0){
423-
finishMaybe(this,state);
424-
}
425-
});
409+
if(this._construct!=null){
410+
destroyImpl.construct(this,()=>{
411+
this._writableState[kOnConstructed](this);
412+
});
413+
}
426414
}
427415

428416
ObjectDefineProperty(Writable,SymbolHasInstance,{

0 commit comments

Comments
(0)