Uh oh!
There was an error while loading. Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork 34.3k
fs: synchronize close with other I/O for streams#30837
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Uh oh!
There was an error while loading. Please reload this page.
Changes from all commits
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading. Please reload this page.
Jump to
Uh oh!
There was an error while loading. Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -7,10 +7,12 @@ const{ | ||
| NumberIsSafeInteger, | ||
| ObjectDefineProperty, | ||
| ObjectSetPrototypeOf, | ||
| Symbol, | ||
| } = primordials; | ||
| const{ | ||
| ERR_OUT_OF_RANGE | ||
| ERR_OUT_OF_RANGE, | ||
| ERR_STREAM_DESTROYED | ||
| } = require('internal/errors').codes; | ||
| const internalUtil = require('internal/util'); | ||
| const{validateNumber } = require('internal/validators'); | ||
| @@ -22,6 +24,8 @@ const{ | ||
| } = require('internal/fs/utils'); | ||
| const{Readable, Writable } = require('stream'); | ||
| const{toPathIfFileURL } = require('internal/url'); | ||
| const kIoDone = Symbol('kIoDone'); | ||
| const kIsPerformingIO = Symbol('kIsPerformingIO'); | ||
| const kMinPoolSpace = 128; | ||
| @@ -86,6 +90,7 @@ function ReadStream(path, options){ | ||
| this.pos = undefined; | ||
| this.bytesRead = 0; | ||
| this.closed = false; | ||
| this[kIsPerformingIO] = false; | ||
| if (this.start !== undefined){ | ||
| checkPosition(this.start, 'start'); | ||
| @@ -155,6 +160,8 @@ ReadStream.prototype._read = function(n){ | ||
| }); | ||
| } | ||
| if (this.destroyed) return; | ||
| if (!pool || pool.length - pool.used < kMinPoolSpace){ | ||
| // Discard the old pool. | ||
| allocNewPool(this.readableHighWaterMark); | ||
| @@ -178,7 +185,12 @@ ReadStream.prototype._read = function(n){ | ||
| return this.push(null); | ||
| // the actual read. | ||
| this[kIsPerformingIO] = true; | ||
| fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) =>{ | ||
| this[kIsPerformingIO] = false; | ||
| // Tell ._destroy() that it's safe to close the fd now. | ||
| if (this.destroyed) return this.emit(kIoDone, er); | ||
| ||
| if (er){ | ||
| if (this.autoClose){ | ||
| this.destroy(); | ||
| @@ -224,8 +236,12 @@ ReadStream.prototype._destroy = function(err, cb){ | ||
| return; | ||
| } | ||
| if (this[kIsPerformingIO]){ | ||
| this.once(kIoDone, (er) => closeFsStream(this, cb, err || er)); | ||
| return; | ||
| } | ||
| closeFsStream(this, cb, err); | ||
| this.fd = null; | ||
| }; | ||
| function closeFsStream(stream, cb, err){ | ||
| @@ -236,6 +252,8 @@ function closeFsStream(stream, cb, err){ | ||
| if (!er) | ||
| stream.emit('close'); | ||
| }); | ||
| stream.fd = null; | ||
| } | ||
| ReadStream.prototype.close = function(cb){ | ||
| @@ -274,6 +292,7 @@ function WriteStream(path, options){ | ||
| this.pos = undefined; | ||
| this.bytesWritten = 0; | ||
| this.closed = false; | ||
| this[kIsPerformingIO] = false; | ||
| if (this.start !== undefined){ | ||
| checkPosition(this.start, 'start'); | ||
| @@ -339,7 +358,17 @@ WriteStream.prototype._write = function(data, encoding, cb){ | ||
| }); | ||
| } | ||
| if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write')); | ||
| this[kIsPerformingIO] = true; | ||
| fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) =>{ | ||
| this[kIsPerformingIO] = false; | ||
| // Tell ._destroy() that it's safe to close the fd now. | ||
| if (this.destroyed){ | ||
| cb(er); | ||
| return this.emit(kIoDone, er); | ||
| } | ||
| if (er){ | ||
| if (this.autoClose){ | ||
| this.destroy(); | ||
| @@ -362,7 +391,8 @@ WriteStream.prototype._writev = function(data, cb){ | ||
| }); | ||
| } | ||
| const self = this; | ||
| if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write')); | ||
| const len = data.length; | ||
| const chunks = new Array(len); | ||
| let size = 0; | ||
| @@ -374,12 +404,22 @@ WriteStream.prototype._writev = function(data, cb){ | ||
| size += chunk.length; | ||
| } | ||
| fs.writev(this.fd, chunks, this.pos, function(er, bytes){ | ||
| this[kIsPerformingIO] = true; | ||
| fs.writev(this.fd, chunks, this.pos, (er, bytes) =>{ | ||
| this[kIsPerformingIO] = false; | ||
| // Tell ._destroy() that it's safe to close the fd now. | ||
| if (this.destroyed){ | ||
| cb(er); | ||
| return this.emit(kIoDone, er); | ||
| } | ||
| if (er){ | ||
| self.destroy(); | ||
| if (this.autoClose){ | ||
| this.destroy(); | ||
| } | ||
| return cb(er); | ||
| } | ||
| self.bytesWritten += bytes; | ||
| this.bytesWritten += bytes; | ||
| cb(); | ||
| }); | ||
Uh oh!
There was an error while loading. Please reload this page.