Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions lib/_stream_writable.js
Original file line numberDiff line numberDiff line change
Expand Up@@ -39,6 +39,7 @@ const{
module.exports = Writable;
Writable.WritableState = WritableState;

const debug = require('internal/util/debuglog').debuglog('stream');
const EE = require('events');
const Stream = require('stream');
const{Buffer } = require('buffer');
Expand DownExpand Up@@ -190,6 +191,7 @@ function WritableState(options, stream, isDuplex){
}

WritableState.prototype.getBuffer = function getBuffer(){
debug('getBuffer')
let current = this.bufferedRequest;
const out = [];
while (current){
Expand DownExpand Up@@ -261,6 +263,7 @@ Writable.prototype.pipe = function(){
};

Writable.prototype.write = function(chunk, encoding, cb){
debug('write', chunk, encoding);
const state = this._writableState;

if (typeof encoding === 'function'){
Expand DownExpand Up@@ -310,10 +313,12 @@ Writable.prototype.write = function(chunk, encoding, cb){
};

Writable.prototype.cork = function(){
debug('cork');
this._writableState.corked++;
};

Writable.prototype.uncork = function(){
debug('uncork');
const state = this._writableState;

if (state.corked){
Expand All@@ -328,6 +333,7 @@ Writable.prototype.uncork = function(){
};

Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding){
debug('setDefaultEncoding', encoding);
// node::ParseEncoding() requires lower case.
if (typeof encoding === 'string')
encoding = encoding.toLowerCase();
Expand All@@ -341,6 +347,7 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding){
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, cb){
debug('writeOrBuffer')
const len = state.objectMode ? 1 : chunk.length;

state.length += len;
Expand DownExpand Up@@ -374,6 +381,7 @@ function writeOrBuffer(stream, state, chunk, encoding, cb){
}

function doWrite(stream, state, writev, len, chunk, encoding, cb){
debug('doWrite');
state.writelen = len;
state.writecb = cb;
state.writing = true;
Expand All@@ -388,6 +396,7 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb){
}

function onwriteError(stream, state, er, cb){
debug('onwriteError');
--state.pendingcb;

cb(er);
Expand DownExpand Up@@ -478,6 +487,7 @@ function afterWrite(stream, state, count, cb){

// If there's something in the buffer waiting, then invoke callbacks.
function errorBuffer(state, err){
debug('errorBuffer');
if (state.writing || !state.bufferedRequest){
return;
}
Expand All@@ -498,6 +508,7 @@ function clearBuffer(stream, state){
let entry = state.bufferedRequest;

if (stream._writev && entry && entry.next){
debug('clearBuffer: fast case');
// Fast case, write everything using _writev()
const l = state.bufferedRequestCount;
const buffer = new Array(l);
Expand DownExpand Up@@ -531,6 +542,7 @@ function clearBuffer(stream, state){
}
state.bufferedRequestCount = 0;
} else{
debug('clearBuffer: slow case');
// Slow case, write chunks one-by-one
while (entry){
const chunk = entry.chunk;
Expand DownExpand Up@@ -569,6 +581,7 @@ Writable.prototype._write = function(chunk, encoding, cb){
Writable.prototype._writev = null;

Writable.prototype.end = function(chunk, encoding, cb){
debug('end')
const state = this._writableState;

if (typeof chunk === 'function'){
Expand DownExpand Up@@ -614,6 +627,7 @@ Writable.prototype.end = function(chunk, encoding, cb){
};

function needFinish(state){
debug('needFinish');
return (state.ending &&
state.length === 0 &&
!state.errored &&
Expand All@@ -623,6 +637,7 @@ function needFinish(state){
}

function callFinal(stream, state){
debug('callFinal');
stream._final((err) =>{
state.pendingcb--;
if (err){
Expand All@@ -636,6 +651,7 @@ function callFinal(stream, state){
}

function prefinish(stream, state){
debug('prefinish');
if (!state.prefinished && !state.finalCalled){
if (typeof stream._final === 'function' && !state.destroyed){
state.pendingcb++;
Expand All@@ -649,6 +665,7 @@ function prefinish(stream, state){
}

function finishMaybe(stream, state, sync){
debug('finishMaybe');
const need = needFinish(state);
if (need){
prefinish(stream, state);
Expand All@@ -665,6 +682,7 @@ function finishMaybe(stream, state, sync){
}

function finish(stream, state){
debug('finish');
state.pendingcb--;
state.finished = true;
stream.emit('finish');
Expand All@@ -680,6 +698,7 @@ function finish(stream, state){
}

function endWritable(stream, state, cb){
debug('endWritable');
state.ending = true;
finishMaybe(stream, state, true);
if (cb !== nop){
Expand All@@ -692,6 +711,7 @@ function endWritable(stream, state, cb){
}

function onCorkedFinish(corkReq, state, err){
debug('onCorkedFinish');
let entry = corkReq.entry;
corkReq.entry = null;
while (entry){
Expand All@@ -706,6 +726,7 @@ function onCorkedFinish(corkReq, state, err){
}

function onFinished(stream, state, cb){
debug('onFinished');
function onerror(err){
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
Expand All@@ -716,6 +737,7 @@ function onFinished(stream, state, cb){
}

function onfinish(){
debug('onfinish');
stream.removeListener('finish', onfinish);
stream.removeListener('error', onerror);
cb();
Expand DownExpand Up@@ -798,6 +820,7 @@ ObjectDefineProperties(Writable.prototype,{

const destroy = destroyImpl.destroy;
Writable.prototype.destroy = function(err, cb){
debug('destroy');
const state = this._writableState;
if (!state.destroyed){
process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write'));
Expand Down