Uh oh!
There was an error while loading. Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork 34.2k
stream: fix writable.end callback behavior#34101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Uh oh!
There was an error while loading. Please reload this page.
Changes from all commits
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading. Please reload this page.
Jump to
Uh oh!
There was an error while loading. Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -412,6 +412,9 @@ Is `true` after [`writable.destroy()`][writable-destroy] has been called. | ||
| <!-- YAML | ||
| added: v0.9.4 | ||
| changes: | ||
| - version: REPLACEME | ||
| pr-url: https://github.com/nodejs/node/pull/34101 | ||
| description: The `callback` is invoked before 'finish' or on error. | ||
| - version: v14.0.0 | ||
| pr-url: https://github.com/nodejs/node/pull/29747 | ||
| description: The `callback` is invoked if 'finish' or 'error' is emitted. | ||
| @@ -428,15 +431,13 @@ changes: | ||
| `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value | ||
| other than `null`. | ||
| * `encoding`{string} The encoding if `chunk` is a string | ||
| * `callback`{Function} Optional callback for when the stream finishes | ||
| or errors | ||
| * `callback`{Function} Callback for when the stream is finished. | ||
ronag marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| * Returns:{this} | ||
| Calling the `writable.end()` method signals that no more data will be written | ||
| to the [`Writable`][]. The optional `chunk` and `encoding` arguments allow one | ||
| final additional chunk of data to be written immediately before closing the | ||
| stream. If provided, the optional `callback` function is attached as a listener | ||
| for the [`'finish'`][] and the `'error'` event. | ||
| stream. | ||
| Calling the [`stream.write()`][stream-write] method after calling | ||
| [`stream.end()`][stream-end] will raise an error. | ||
| @@ -592,7 +593,7 @@ changes: | ||
| `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value | ||
| other than `null`. | ||
| * `encoding`{string} The encoding, if `chunk` is a string. **Default:** `'utf8'` | ||
| * `callback`{Function} Callback for when this chunk of data is flushed | ||
| * `callback`{Function} Callback for when this chunk of data is flushed. | ||
| * Returns:{boolean} `false` if the stream wishes for the calling code to | ||
| wait for the `'drain'` event to be emitted before continuing to write | ||
| additional data; otherwise `true`. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -64,6 +64,8 @@ ObjectSetPrototypeOf(Writable, Stream); | ||
| function nop(){} | ||
| const kOnFinished = Symbol('kOnFinished'); | ||
| function WritableState(options, stream, isDuplex){ | ||
| // Duplex streams are both readable and writable, but share | ||
| // the same options object. | ||
| @@ -185,6 +187,8 @@ function WritableState(options, stream, isDuplex){ | ||
| // True if close has been emitted or would have been emitted | ||
| // depending on emitClose. | ||
| this.closeEmitted = false; | ||
| this[kOnFinished] = []; | ||
MemberAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possible optimization to start this as | ||
| } | ||
| function resetBuffer(state){ | ||
| @@ -411,7 +415,7 @@ function onwriteError(stream, state, er, cb){ | ||
| // not enabled. Passing `er` here doesn't make sense since | ||
| // it's related to one specific write, not to the buffered | ||
| // writes. | ||
| errorBuffer(state, new ERR_STREAM_DESTROYED('write')); | ||
| errorBuffer(state); | ||
| // This can emit error, but error must always follow cb. | ||
| errorOrDestroy(stream, er); | ||
| } | ||
| @@ -487,14 +491,14 @@ function afterWrite(stream, state, count, cb){ | ||
| } | ||
| if (state.destroyed){ | ||
| errorBuffer(state, new ERR_STREAM_DESTROYED('write')); | ||
| errorBuffer(state); | ||
| } | ||
| finishMaybe(stream, state); | ||
| } | ||
| // If there's something in the buffer waiting, then invoke callbacks. | ||
| function errorBuffer(state, err){ | ||
| function errorBuffer(state){ | ||
| if (state.writing){ | ||
| return; | ||
| } | ||
| @@ -503,7 +507,11 @@ function errorBuffer(state, err){ | ||
| const{chunk, callback } = state.buffered[n]; | ||
| const len = state.objectMode ? 1 : chunk.length; | ||
| state.length -= len; | ||
| callback(err); | ||
| callback(new ERR_STREAM_DESTROYED('write')); | ||
| } | ||
| for (const callback of state[kOnFinished].splice(0)){ | ||
| callback(new ERR_STREAM_DESTROYED('end')); | ||
| } | ||
| resetBuffer(state); | ||
| @@ -611,10 +619,11 @@ Writable.prototype.end = function(chunk, encoding, cb){ | ||
| } | ||
| if (typeof cb === 'function'){ | ||
| if (err || state.finished) | ||
| if (err || state.finished){ | ||
| process.nextTick(cb, err); | ||
| else | ||
| onFinished(this, cb); | ||
| } else{ | ||
| state[kOnFinished].push(cb); | ||
| } | ||
| } | ||
| return this; | ||
| @@ -636,6 +645,9 @@ function callFinal(stream, state){ | ||
| stream._final((err) =>{ | ||
| state.pendingcb--; | ||
| if (err){ | ||
| for (const callback of state[kOnFinished].splice(0)){ | ||
| callback(err); | ||
| } | ||
| errorOrDestroy(stream, err, state.sync); | ||
| } else if (needFinish(state)){ | ||
| state.prefinished = true; | ||
| @@ -683,6 +695,11 @@ function finish(stream, state){ | ||
| return; | ||
| state.finished = true; | ||
| for (const callback of state[kOnFinished].splice(0)){ | ||
| callback(); | ||
| } | ||
| stream.emit('finish'); | ||
| if (state.autoDestroy){ | ||
| @@ -701,26 +718,6 @@ function finish(stream, state){ | ||
| } | ||
| } | ||
| // TODO(ronag): Avoid using events to implement internal logic. | ||
| function onFinished(stream, cb){ | ||
| function onerror(err){ | ||
| stream.removeListener('finish', onfinish); | ||
| stream.removeListener('error', onerror); | ||
| cb(err); | ||
| if (stream.listenerCount('error') === 0){ | ||
| stream.emit('error', err); | ||
| } | ||
| } | ||
| function onfinish(){ | ||
| stream.removeListener('finish', onfinish); | ||
| stream.removeListener('error', onerror); | ||
| cb(); | ||
| } | ||
| stream.on('finish', onfinish); | ||
| stream.prependListener('error', onerror); | ||
| } | ||
| ObjectDefineProperties(Writable.prototype,{ | ||
| destroyed:{ | ||
| @@ -800,7 +797,7 @@ const destroy = destroyImpl.destroy; | ||
| Writable.prototype.destroy = function(err, cb){ | ||
| const state = this._writableState; | ||
| if (!state.destroyed){ | ||
| process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write')); | ||
| process.nextTick(errorBuffer, state); | ||
| } | ||
| destroy.call(this, err, cb); | ||
| return this; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -354,7 +354,7 @@ const assert = require('assert'); | ||
| assert.strictEqual(err.message, 'asd'); | ||
| })); | ||
| write.end('asd', common.mustCall((err) =>{ | ||
| assert.strictEqual(err.message, 'asd'); | ||
| assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); | ||
mcollina marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| })); | ||
| write.destroy(new Error('asd')); | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.