Skip to content

Commit 2944cda

Browse files
committed
streams: use Array for Readable buffer
1 parent 25576b5 commit 2944cda

File tree

1 file changed

+108
-22
lines changed

1 file changed

+108
-22
lines changed

‎lib/internal/streams/readable.js‎

Lines changed: 108 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const{
7373
const{ validateObject }=require('internal/validators');
7474

7575
constkState=Symbol('kState');
76+
constFastBuffer=Buffer[Symbol.species];
7677

7778
const{ StringDecoder }=require('string_decoder');
7879
constfrom=require('internal/streams/from');
@@ -278,7 +279,8 @@ function ReadableState(options, stream, isDuplex){
278279
// A linked list is used to store data chunks instead of an array because the
279280
// linked list can remove elements from the beginning faster than
280281
// array.shift().
281-
this.buffer=newBufferList();
282+
this.buffer=[];
283+
this.bufferIndex=0;
282284
this.length=0;
283285
this.pipes=[];
284286

@@ -546,10 +548,15 @@ function addChunk(stream, state, chunk, addToFront){
546548
}else{
547549
// Update the buffer info.
548550
state.length+=(state[kState]&kObjectMode)!==0 ? 1 : chunk.length;
549-
if(addToFront)
550-
state.buffer.unshift(chunk);
551-
else
551+
if(addToFront){
552+
if(state.bufferIndex>0){
553+
state.buffer[--state.bufferIndex]=chunk;
554+
}else{
555+
state.buffer.unshift(chunk);// Slow path
556+
}
557+
}else{
552558
state.buffer.push(chunk);
559+
}
553560

554561
if((state[kState]&kNeedReadable)!==0)
555562
emitReadable(stream);
@@ -564,21 +571,24 @@ Readable.prototype.isPaused = function(){
564571

565572
// Backwards compatibility.
566573
Readable.prototype.setEncoding=function(enc){
574+
conststate=this._readableState;
575+
567576
constdecoder=newStringDecoder(enc);
568-
this._readableState.decoder=decoder;
577+
state.decoder=decoder;
569578
// If setEncoding(null), decoder.encoding equals utf8.
570-
this._readableState.encoding=this._readableState.decoder.encoding;
579+
state.encoding=state.decoder.encoding;
571580

572-
constbuffer=this._readableState.buffer;
573581
// Iterate over current buffer to convert already stored Buffers:
574582
letcontent='';
575-
for(constdataofbuffer){
583+
for(constdataofstate.buffer.slice(state.bufferIndex)){
576584
content+=decoder.write(data);
577585
}
578-
buffer.clear();
586+
state.buffer.length=0;
587+
state.bufferIndex=0;
588+
579589
if(content!=='')
580590
buffer.push(content);
581-
this._readableState.length=content.length;
591+
state.length=content.length;
582592
returnthis;
583593
};
584594

@@ -611,7 +621,7 @@ function howMuchToRead(n, state){
611621
if(NumberIsNaN(n)){
612622
// Only flow one buffer at a time.
613623
if((state[kState]&kFlowing)!==0&&state.length)
614-
returnstate.buffer.first().length;
624+
returnstate.buffer[state.bufferIndex].length;
615625
returnstate.length;
616626
}
617627
if(n<=state.length)
@@ -1550,20 +1560,96 @@ function fromList(n, state){
15501560
returnnull;
15511561

15521562
letret;
1553-
if(state.objectMode)
1554-
ret=state.buffer.shift();
1555-
elseif(!n||n>=state.length){
1563+
if((state[kState]&kObjectMode)!==0){
1564+
ret=state.buffer[state.bufferIndex++];
1565+
}elseif(!n||n>=state.length){
15561566
// Read it all, truncate the list.
1557-
if(state.decoder)
1558-
ret=state.buffer.join('');
1559-
elseif(state.buffer.length===1)
1560-
ret=state.buffer.first();
1561-
else
1562-
ret=state.buffer.concat(state.length);
1563-
state.buffer.clear();
1567+
if((state[kState]&kDecoder)!==0){
1568+
ret=''
1569+
for(letn=state.bufferIndex;n<state.buffer.length;n++){
1570+
ret+=state.buffer[n];
1571+
}
1572+
}elseif(state.buffer.length-state.bufferIndex===0){
1573+
ret=Buffer.alloc(0)
1574+
}elseif(state.buffer.length-state.bufferIndex===1){
1575+
ret=state.buffer[state.bufferIndex];
1576+
}else{
1577+
ret=Buffer.allocUnsafe(n>>>0);
1578+
leti=0;
1579+
for(letn=state.bufferIndex;n<state.buffer.length;n++){
1580+
constdata=state.buffer[n];
1581+
ret.set(data,i);
1582+
i+=data.length;
1583+
}
1584+
}
1585+
state.buffer.length=0;
1586+
state.bufferIndex=0;
15641587
}else{
15651588
// read part of list.
1566-
ret=state.buffer.consume(n,state.decoder);
1589+
1590+
constdata=state.buffer[state.bufferIndex];
1591+
1592+
if(n<data.length){
1593+
// `slice` is the same for buffers and strings.
1594+
constslice=data.slice(0,n);
1595+
state.buffer[state.bufferIndex]=data.slice(n);
1596+
returnslice;
1597+
}
1598+
1599+
if(n===data.length){
1600+
// First chunk is a perfect match.
1601+
returnstate.buffer[state.bufferIndex++];
1602+
}
1603+
1604+
if((state[kState]&kDecoder)!==0){
1605+
ret='';
1606+
while(state.bufferIndex<state.buffer.length){
1607+
conststr=state.buffer[state.bufferIndex];
1608+
if(n>str.length){
1609+
ret+=str;
1610+
n-=str.length;
1611+
state.bufferIndex++;
1612+
}else{
1613+
if(n===buf.length){
1614+
ret+=str;
1615+
state.bufferIndex++;
1616+
}else{
1617+
ret+=str.slice(0,n);
1618+
state.buffer[state.bufferIndex]=str.slice(n);
1619+
}
1620+
break;
1621+
}
1622+
}
1623+
}else{
1624+
ret=Buffer.allocUnsafe(n);
1625+
1626+
constretLen=n;
1627+
while(state.bufferIndex<state.buffer.length){
1628+
constbuf=state.buffer[state.bufferIndex];
1629+
if(n>buf.length){
1630+
ret.set(buf,retLen-n);
1631+
n-=buf.length;
1632+
state.bufferIndex++;
1633+
}else{
1634+
if(n===buf.length){
1635+
ret.set(buf,retLen-n);
1636+
state.bufferIndex++;
1637+
}else{
1638+
ret.set(newFastBuffer(buf.buffer,buf.byteOffset,n),retLen-n);
1639+
state.buffer[state.bufferIndex]=newFastBuffer(buf.buffer,buf.byteOffset+n);
1640+
}
1641+
break;
1642+
}
1643+
}
1644+
}
1645+
1646+
if(state.bufferIndex===state.buffer.length){
1647+
state.buffer.length=0;
1648+
state.bufferIndex=0
1649+
}elseif(state.bufferIndex>256){
1650+
state.buffer=state.buffer.slice(state.bufferIndex);
1651+
state.bufferIndex=0;
1652+
}
15671653
}
15681654

15691655
returnret;

0 commit comments

Comments
(0)