Skip to content

Commit 2bb4ac4

Browse files
ronagBethGriggs
authored andcommitted
stream: avoid drain for sync streams
Previously a sync writable receiving chunks larger than highwatermark would unecessarily ping pong needDrain. PR-URL: #32887 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 44c157e commit 2bb4ac4

8 files changed

+25
-17
lines changed

‎benchmark/streams/writable-manywrites.js‎

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ const bench = common.createBenchmark(main,{
77
n: [2e6],
88
sync: ['yes','no'],
99
writev: ['yes','no'],
10-
callback: ['yes','no']
10+
callback: ['yes','no'],
11+
len: [1024,32*1024]
1112
});
1213

13-
functionmain({ n, sync, writev, callback }){
14-
constb=Buffer.allocUnsafe(1024);
14+
functionmain({ n, sync, writev, callback, len}){
15+
constb=Buffer.allocUnsafe(len);
1516
consts=newWritable();
1617
sync=sync==='yes';
1718

‎lib/_stream_writable.js‎

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,6 @@ function writeOrBuffer(stream, state, chunk, encoding, callback){
337337

338338
state.length+=len;
339339

340-
constret=state.length<state.highWaterMark;
341-
// We must ensure that previous needDrain will not be reset to false.
342-
if(!ret)
343-
state.needDrain=true;
344-
345340
if(state.writing||state.corked||state.errored){
346341
state.buffered.push({ chunk, encoding, callback });
347342
if(state.allBuffers&&encoding!=='buffer'){
@@ -359,6 +354,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback){
359354
state.sync=false;
360355
}
361356

357+
constret=state.length<state.highWaterMark;
358+
359+
// We must ensure that previous needDrain will not be reset to false.
360+
if(!ret)
361+
state.needDrain=true;
362+
362363
// Return false if errored or destroyed in order to break
363364
// any synchronous while(stream.write(data)) loops.
364365
returnret&&!state.errored&&!state.destroyed;

‎test/parallel/test-stream-big-packet.js‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ class TestStream extends stream.Transform{
3636
}
3737
}
3838

39-
consts1=newstream.PassThrough();
39+
consts1=newstream.Transform({
40+
transform(chunk,encoding,cb){
41+
process.nextTick(cb,null,chunk);
42+
}
43+
});
4044
consts2=newstream.PassThrough();
4145
consts3=newTestStream();
4246
s1.pipe(s3);

‎test/parallel/test-stream-catch-rejections.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const assert = require('assert');
3030
captureRejections: true,
3131
highWaterMark: 1,
3232
write(chunk,enc,cb){
33-
cb();
33+
process.nextTick(cb);
3434
}
3535
});
3636

‎test/parallel/test-stream-pipe-await-drain-push-while-write.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const writable = new stream.Writable({
1919
});
2020
}
2121

22-
cb();
22+
process.nextTick(cb);
2323
},3)
2424
});
2525

‎test/parallel/test-stream-pipe-await-drain.js‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ reader._read = () =>{};
1919

2020
writer1._write=common.mustCall(function(chunk,encoding,cb){
2121
this.emit('chunk-received');
22-
cb();
22+
process.nextTick(cb);
2323
},1);
2424

2525
writer1.once('chunk-received',()=>{
@@ -42,7 +42,7 @@ writer2._write = common.mustCall((chunk, encoding, cb) =>{
4242
reader._readableState.awaitDrainWriters.size,
4343
1,
4444
'awaitDrain should be 1 after first push, actual is '+
45-
reader._readableState.awaitDrainWriters
45+
reader._readableState.awaitDrainWriters.size
4646
);
4747
// Not calling cb here to "simulate" slow stream.
4848
// This should be called exactly once, since the first .write() call
@@ -54,7 +54,7 @@ writer3._write = common.mustCall((chunk, encoding, cb) =>{
5454
reader._readableState.awaitDrainWriters.size,
5555
2,
5656
'awaitDrain should be 2 after second push, actual is '+
57-
reader._readableState.awaitDrainWriters
57+
reader._readableState.awaitDrainWriters.size
5858
);
5959
// Not calling cb here to "simulate" slow stream.
6060
// This should be called exactly once, since the first .write() call

‎test/parallel/test-stream-writable-needdrain-state.js‎

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ const transform = new stream.Transform({
1010
});
1111

1212
function_transform(chunk,encoding,cb){
13-
assert.strictEqual(transform._writableState.needDrain,true);
14-
cb();
13+
process.nextTick(()=>{
14+
assert.strictEqual(transform._writableState.needDrain,true);
15+
cb();
16+
});
1517
}
1618

1719
assert.strictEqual(transform._writableState.needDrain,false);

‎test/parallel/test-stream2-finish-pipe.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ r._read = function(size){
3030

3131
constw=newstream.Writable();
3232
w._write=function(data,encoding,cb){
33-
cb(null);
33+
process.nextTick(cb,null);
3434
};
3535

3636
r.pipe(w);

0 commit comments

Comments
(0)