Skip to content

Commit 2330b4f

Browse files
committed
streams: optimize creation
Refs: nodejs/performance#79 PR-URL: #50337
1 parent 25576b5 commit 2330b4f

File tree

5 files changed

+69
-54
lines changed

5 files changed

+69
-54
lines changed

‎benchmark/streams/creation.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const{
88
}=require('stream');
99

1010
constbench=common.createBenchmark(main,{
11-
n: [50e6],
11+
n: [5e6],
1212
kind: ['duplex','readable','transform','writable'],
1313
});
1414

‎lib/internal/streams/duplex.js‎

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,12 @@ const{
3535

3636
module.exports=Duplex;
3737

38+
constStream=require('internal/streams/legacy').Stream;
3839
constReadable=require('internal/streams/readable');
3940
constWritable=require('internal/streams/writable');
4041

42+
const{ kOnConstructed }=require('internal/streams/utils');
43+
4144
ObjectSetPrototypeOf(Duplex.prototype,Readable.prototype);
4245
ObjectSetPrototypeOf(Duplex,Readable);
4346

@@ -55,8 +58,8 @@ function Duplex(options){
5558
if(!(thisinstanceofDuplex))
5659
returnnewDuplex(options);
5760

58-
Readable.call(this,options);
59-
Writable.call(this,options);
61+
this._readableState=newReadable.ReadableState(options,this,true);
62+
this._writableState=newWritable.WritableState(options,this,true);
6063

6164
if(options){
6265
this.allowHalfOpen=options.allowHalfOpen!==false;
@@ -73,9 +76,39 @@ function Duplex(options){
7376
this._writableState.ended=true;
7477
this._writableState.finished=true;
7578
}
79+
80+
if(typeofoptions.read==='function')
81+
this._read=options.read;
82+
83+
if(typeofoptions.write==='function')
84+
this._write=options.write;
85+
86+
if(typeofoptions.writev==='function')
87+
this._writev=options.writev;
88+
89+
if(typeofoptions.destroy==='function')
90+
this._destroy=options.destroy;
91+
92+
if(typeofoptions.final==='function')
93+
this._final=options.final;
94+
95+
if(typeofoptions.construct==='function')
96+
this._construct=options.construct;
97+
98+
if(options.signal)
99+
addAbortSignal(options.signal,this);
76100
}else{
77101
this.allowHalfOpen=true;
78102
}
103+
104+
Stream.call(this,options);
105+
106+
if(this._construct!=null){
107+
destroyImpl.construct(this,()=>{
108+
this._readableState[kOnConstructed](this);
109+
this._writableState[kOnConstructed](this);
110+
});
111+
}
79112
}
80113

81114
ObjectDefineProperties(Duplex.prototype,{

‎lib/internal/streams/readable.js‎

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const{
7171
AbortError,
7272
}=require('internal/errors');
7373
const{ validateObject }=require('internal/validators');
74+
const{ kOnConstructed }=require('internal/streams/utils');
7475

7576
constkState=Symbol('kState');
7677

@@ -251,14 +252,6 @@ ObjectDefineProperties(ReadableState.prototype,{
251252

252253

253254
functionReadableState(options,stream,isDuplex){
254-
// Duplex streams are both readable and writable, but share
255-
// the same options object.
256-
// However, some cases require setting options to different
257-
// values for the readable and the writable sides of the duplex stream.
258-
// These options can be provided separately as readableXXX and writableXXX.
259-
if(typeofisDuplex!=='boolean')
260-
isDuplex=streaminstanceofStream.Duplex;
261-
262255
// Bit map field to store ReadableState more effciently with 1 bit per field
263256
// instead of a V8 slot per field.
264257
this[kState]=kEmitClose|kAutoDestroy|kConstructed|kSync;
@@ -310,16 +303,17 @@ function ReadableState(options, stream, isDuplex){
310303
}
311304
}
312305

306+
ReadableState.prototype[kOnConstructed]=functiononConstructed(stream){
307+
if((this.state&kNeedReadable)!==0){
308+
maybeReadMore(stream,this);
309+
}
310+
}
313311

314312
functionReadable(options){
315313
if(!(thisinstanceofReadable))
316314
returnnewReadable(options);
317315

318-
// Checking for a Stream.Duplex instance is faster here instead of inside
319-
// the ReadableState constructor, at least with V8 6.5.
320-
constisDuplex=thisinstanceofStream.Duplex;
321-
322-
this._readableState=newReadableState(options,this,isDuplex);
316+
this._readableState=newReadableState(options,this,false);
323317

324318
if(options){
325319
if(typeofoptions.read==='function')
@@ -331,17 +325,17 @@ function Readable(options){
331325
if(typeofoptions.construct==='function')
332326
this._construct=options.construct;
333327

334-
if(options.signal&&!isDuplex)
328+
if(options.signal)
335329
addAbortSignal(options.signal,this);
336330
}
337331

338332
Stream.call(this,options);
339333

340-
destroyImpl.construct(this,()=>{
341-
if(this._readableState.needReadable){
342-
maybeReadMore(this,this._readableState);
343-
}
344-
});
334+
if(this._construct!=null){
335+
destroyImpl.construct(this,()=>{
336+
this._readableState[kOnConstructed](this);
337+
});
338+
}
345339
}
346340

347341
Readable.prototype.destroy=destroyImpl.destroy;

‎lib/internal/streams/utils.js‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ const kIsReadable = SymbolFor('nodejs.stream.readable');
1616
constkIsWritable=SymbolFor('nodejs.stream.writable');
1717
constkIsDisturbed=SymbolFor('nodejs.stream.disturbed');
1818

19+
constkOnConstructed=Symbol('kOnConstructed')
20+
1921
constkIsClosedPromise=SymbolFor('nodejs.webstream.isClosedPromise');
2022
constkControllerErrorFunction=SymbolFor('nodejs.webstream.controllerErrorFunction');
2123

@@ -303,6 +305,7 @@ function isErrored(stream){
303305
}
304306

305307
module.exports={
308+
kOnConstructed,
306309
isDestroyed,
307310
kIsDestroyed,
308311
isDisturbed,

‎lib/internal/streams/writable.js‎

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const EE = require('events');
4444
constStream=require('internal/streams/legacy').Stream;
4545
const{ Buffer }=require('buffer');
4646
constdestroyImpl=require('internal/streams/destroy');
47+
const{ kOnConstructed }=require('internal/streams/utils');
4748

4849
const{
4950
addAbortSignal,
@@ -290,14 +291,6 @@ ObjectDefineProperties(WritableState.prototype,{
290291
});
291292

292293
functionWritableState(options,stream,isDuplex){
293-
// Duplex streams are both readable and writable, but share
294-
// the same options object.
295-
// However, some cases require setting options to different
296-
// values for the readable and the writable sides of the duplex stream,
297-
// e.g. options.readableObjectMode vs. options.writableObjectMode, etc.
298-
if(typeofisDuplex!=='boolean')
299-
isDuplex=streaminstanceofStream.Duplex;
300-
301294
// Bit map field to store WritableState more effciently with 1 bit per field
302295
// instead of a V8 slot per field.
303296
this[kState]=kSync|kConstructed|kEmitClose|kAutoDestroy;
@@ -372,23 +365,21 @@ ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount',{
372365
},
373366
});
374367

375-
functionWritable(options){
376-
// Writable ctor is applied to Duplexes, too.
377-
// `realHasInstance` is necessary because using plain `instanceof`
378-
// would return false, as no `_writableState` property is attached.
379-
380-
// Trying to use the custom `instanceof` for Writable here will also break the
381-
// Node.js LazyTransform implementation, which has a non-trivial getter for
382-
// `_writableState` that would lead to infinite recursion.
368+
WritableState.prototype[kOnConstructed]=functiononConstructed(stream){
369+
if((this[kState]&kWriting)===0){
370+
clearBuffer(stream,this);
371+
}
383372

384-
// Checking for a Stream.Duplex instance is faster here instead of inside
385-
// the WritableState constructor, at least with V8 6.5.
386-
constisDuplex=(thisinstanceofStream.Duplex);
373+
if((this[kState]&kEnding)!==0){
374+
finishMaybe(stream,this);
375+
}
376+
}
387377

388-
if(!isDuplex&&!FunctionPrototypeSymbolHasInstance(Writable,this))
378+
functionWritable(options){
379+
if(!(thisinstanceofWritable))
389380
returnnewWritable(options);
390381

391-
this._writableState=newWritableState(options,this,isDuplex);
382+
this._writableState=newWritableState(options,this,false);
392383

393384
if(options){
394385
if(typeofoptions.write==='function')
@@ -412,17 +403,11 @@ function Writable(options){
412403

413404
Stream.call(this,options);
414405

415-
destroyImpl.construct(this,()=>{
416-
conststate=this._writableState;
417-
418-
if((state[kState]&kWriting)===0){
419-
clearBuffer(this,state);
420-
}
421-
422-
if((state[kState]&kEnding)!==0){
423-
finishMaybe(this,state);
424-
}
425-
});
406+
if(this._construct!=null){
407+
destroyImpl.construct(this,()=>{
408+
this._writableState[kOnConstructed](this);
409+
});
410+
}
426411
}
427412

428413
ObjectDefineProperty(Writable,SymbolHasInstance,{

0 commit comments

Comments
(0)