Skip to content

Commit 0bd5595

Browse files
committed
stream: simplify Transform stream implementation
Significantly simplified Transform stream implementation by using mostly standard stream code. PR-URL: #32763 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]>
1 parent f22a9ca commit 0bd5595

File tree

4 files changed

+56
-114
lines changed

4 files changed

+56
-114
lines changed

‎lib/_stream_transform.js‎

Lines changed: 52 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -65,66 +65,32 @@
6565

6666
const{
6767
ObjectSetPrototypeOf,
68+
Symbol
6869
}=primordials;
6970

7071
module.exports=Transform;
7172
const{
72-
ERR_METHOD_NOT_IMPLEMENTED,
73-
ERR_MULTIPLE_CALLBACK,
74-
ERR_TRANSFORM_ALREADY_TRANSFORMING,
75-
ERR_TRANSFORM_WITH_LENGTH_0
73+
ERR_METHOD_NOT_IMPLEMENTED
7674
}=require('internal/errors').codes;
7775
constDuplex=require('_stream_duplex');
7876
ObjectSetPrototypeOf(Transform.prototype,Duplex.prototype);
7977
ObjectSetPrototypeOf(Transform,Duplex);
8078

81-
82-
functionafterTransform(er,data){
83-
constts=this._transformState;
84-
ts.transforming=false;
85-
86-
constcb=ts.writecb;
87-
88-
if(cb===null){
89-
returnthis.emit('error',newERR_MULTIPLE_CALLBACK());
90-
}
91-
92-
ts.writechunk=null;
93-
ts.writecb=null;
94-
95-
if(data!=null)// Single equals check for both `null` and `undefined`
96-
this.push(data);
97-
98-
cb(er);
99-
100-
constrs=this._readableState;
101-
rs.reading=false;
102-
if(rs.needReadable||rs.length<rs.highWaterMark){
103-
this._read(rs.highWaterMark);
104-
}
105-
}
106-
79+
constkCallback=Symbol('kCallback');
10780

10881
functionTransform(options){
10982
if(!(thisinstanceofTransform))
11083
returnnewTransform(options);
11184

11285
Duplex.call(this,options);
11386

114-
this._transformState={
115-
afterTransform: afterTransform.bind(this),
116-
needTransform: false,
117-
transforming: false,
118-
writecb: null,
119-
writechunk: null,
120-
writeencoding: null
121-
};
122-
12387
// We have implemented the _read method, and done the other things
12488
// that Readable wants before the first _read call, so unset the
12589
// sync guard flag.
12690
this._readableState.sync=false;
12791

92+
this[kCallback]=null;
93+
12894
if(options){
12995
if(typeofoptions.transform==='function')
13096
this._transform=options.transform;
@@ -133,89 +99,67 @@ function Transform(options){
13399
this._flush=options.flush;
134100
}
135101

136-
// When the writable side finishes, then flush out anything remaining.
102+
// TODO(ronag): Unfortunately _final is invoked asynchronously.
103+
// Use `prefinish` hack. `prefinish` is emitted synchronously when
104+
// and only when `_final` is not defined. Implementing `_final`
105+
// to a Transform should be an error.
137106
this.on('prefinish',prefinish);
138107
}
139108

140109
functionprefinish(){
141-
if(typeofthis._flush==='function'&&!this._readableState.destroyed){
110+
if(typeofthis._flush==='function'&&!this.destroyed){
142111
this._flush((er,data)=>{
143-
done(this,er,data);
112+
if(er){
113+
this.destroy(er);
114+
return;
115+
}
116+
117+
if(data!=null){
118+
this.push(data);
119+
}
120+
this.push(null);
144121
});
145122
}else{
146-
done(this,null,null);
123+
this.push(null);
147124
}
148125
}
149126

150-
Transform.prototype.push=function(chunk,encoding){
151-
this._transformState.needTransform=false;
152-
returnDuplex.prototype.push.call(this,chunk,encoding);
153-
};
154-
155-
// This is the part where you do stuff!
156-
// override this function in implementation classes.
157-
// 'chunk' is an input chunk.
158-
//
159-
// Call `push(newChunk)` to pass along transformed output
160-
// to the readable side. You may call 'push' zero or more times.
161-
//
162-
// Call `cb(err)` when you are done with this chunk. If you pass
163-
// an error, then that'll put the hurt on the whole operation. If you
164-
// never call cb(), then you'll never get another chunk.
165-
Transform.prototype._transform=function(chunk,encoding,cb){
127+
Transform.prototype._transform=function(chunk,encoding,callback){
166128
thrownewERR_METHOD_NOT_IMPLEMENTED('_transform()');
167129
};
168130

169-
Transform.prototype._write=function(chunk,encoding,cb){
170-
constts=this._transformState;
171-
ts.writecb=cb;
172-
ts.writechunk=chunk;
173-
ts.writeencoding=encoding;
174-
if(!ts.transforming){
175-
constrs=this._readableState;
176-
if(ts.needTransform||
177-
rs.needReadable||
178-
rs.length<rs.highWaterMark)
179-
this._read(rs.highWaterMark);
180-
}
131+
Transform.prototype._write=function(chunk,encoding,callback){
132+
constrState=this._readableState;
133+
constwState=this._writableState;
134+
constlength=rState.length;
135+
136+
this._transform(chunk,encoding,(err,val)=>{
137+
if(err){
138+
callback(err);
139+
return;
140+
}
141+
142+
if(val!=null){
143+
this.push(val);
144+
}
145+
146+
if(
147+
wState.ended||// Backwards compat.
148+
length===rState.length||// Backwards compat.
149+
rState.length<rState.highWaterMark||
150+
rState.length===0
151+
){
152+
callback();
153+
}else{
154+
this[kCallback]=callback;
155+
}
156+
});
181157
};
182158

183-
// Doesn't matter what the args are here.
184-
// _transform does all the work.
185-
// That we got here means that the readable side wants more data.
186-
Transform.prototype._read=function(n){
187-
constts=this._transformState;
188-
189-
if(ts.writechunk!==null&&!ts.transforming){
190-
ts.transforming=true;
191-
this._transform(ts.writechunk,ts.writeencoding,ts.afterTransform);
192-
}else{
193-
// Mark that we need a transform, so that any data that comes in
194-
// will get processed, now that we've asked for it.
195-
ts.needTransform=true;
159+
Transform.prototype._read=function(){
160+
if(this[kCallback]){
161+
constcallback=this[kCallback];
162+
this[kCallback]=null;
163+
callback();
196164
}
197165
};
198-
199-
200-
Transform.prototype._destroy=function(err,cb){
201-
Duplex.prototype._destroy.call(this,err,(err2)=>{
202-
cb(err2);
203-
});
204-
};
205-
206-
207-
functiondone(stream,er,data){
208-
if(er)
209-
returnstream.emit('error',er);
210-
211-
if(data!=null)// Single equals check for both `null` and `undefined`
212-
stream.push(data);
213-
214-
// These two error cases are coherence checks that can likely not be tested.
215-
if(stream._writableState.length)
216-
thrownewERR_TRANSFORM_WITH_LENGTH_0();
217-
218-
if(stream._transformState.transforming)
219-
thrownewERR_TRANSFORM_ALREADY_TRANSFORMING();
220-
returnstream.push(null);
221-
}

‎lib/internal/errors.js‎

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,12 +1363,8 @@ E('ERR_TLS_SNI_FROM_SERVER',
13631363
E('ERR_TRACE_EVENTS_CATEGORY_REQUIRED',
13641364
'At least one category is required',TypeError);
13651365
E('ERR_TRACE_EVENTS_UNAVAILABLE','Trace events are unavailable',Error);
1366-
E('ERR_TRANSFORM_ALREADY_TRANSFORMING',
1367-
'Calling transform done when still transforming',Error);
13681366

13691367
// This should probably be a `RangeError`.
1370-
E('ERR_TRANSFORM_WITH_LENGTH_0',
1371-
'Calling transform done when writableState.length != 0',Error);
13721368
E('ERR_TTY_INIT_FAILED','TTY initialization failed',SystemError);
13731369
E('ERR_UNCAUGHT_EXCEPTION_CAPTURE_ALREADY_SET',
13741370
'`process.setupUncaughtExceptionCapture()` was called while a capture '+

‎test/parallel/test-stream2-transform.js‎

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ const Transform = require('_stream_transform');
4545

4646
assert.strictEqual(tx.readableLength,10);
4747
assert.strictEqual(transformed,10);
48-
assert.strictEqual(tx._transformState.writechunk.length,5);
4948
assert.deepStrictEqual(tx.writableBuffer.map(function(c){
5049
returnc.chunk.length;
51-
}),[6,7,8,9,10]);
50+
}),[5,6,7,8,9,10]);
5251
}
5352

5453
{

‎test/parallel/test-zlib-flush-drain.js‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ const ws = deflater._writableState;
2828
constbeforeFlush=ws.needDrain;
2929
letafterFlush=ws.needDrain;
3030

31+
deflater.on('data',()=>{
32+
});
33+
3134
deflater.flush(function(err){
3235
afterFlush=ws.needDrain;
3336
});

0 commit comments

Comments
(0)