Skip to content

Commit 9e0d18f

Browse files
clshortfuseMylesBorins
authored andcommitted
http2: use and support non-empty DATA frame with END_STREAM flag
Adds support for reading from a stream where the final frame is a non-empty DATA frame with the END_STREAM flag set, instead of hanging waiting for another frame. When writing to a stream, uses a END_STREAM flag on final DATA frame instead of adding an empty DATA frame. BREAKING: http2 client now expects servers to properly support END_STREAM flag Fixes: #31309Fixes: #33891 Refs: https://nghttp2.org/documentation/types.html#c.nghttp2_on_data_chunk_recv_callback PR-URL: #33875 Backport-PR-URL: #34838 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent a16f0f4 commit 9e0d18f

File tree

6 files changed

+191
-47
lines changed

6 files changed

+191
-47
lines changed

‎lib/internal/http2/core.js‎

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,6 +1136,7 @@ class Http2Session extends EventEmitter{
11361136
streams: newMap(),
11371137
pendingStreams: newSet(),
11381138
pendingAck: 0,
1139+
shutdownWritableCalled: false,
11391140
writeQueueSize: 0,
11401141
originSet: undefined
11411142
};
@@ -1702,6 +1703,26 @@ function afterShutdown(status){
17021703
stream[kMaybeDestroy]();
17031704
}
17041705

1706+
functionshutdownWritable(callback){
1707+
consthandle=this[kHandle];
1708+
if(!handle)returncallback();
1709+
conststate=this[kState];
1710+
if(state.shutdownWritableCalled){
1711+
// Backport v14.x: Session required for debugging stream object
1712+
// debugStreamObj(this, 'shutdownWritable() already called');
1713+
returncallback();
1714+
}
1715+
state.shutdownWritableCalled=true;
1716+
1717+
constreq=newShutdownWrap();
1718+
req.oncomplete=afterShutdown;
1719+
req.callback=callback;
1720+
req.handle=handle;
1721+
consterr=handle.shutdown(req);
1722+
if(err===1)// synchronous finish
1723+
returnafterShutdown.call(req,0);
1724+
}
1725+
17051726
functionfinishSendTrailers(stream,headersList){
17061727
// The stream might be destroyed and in that case
17071728
// there is nothing to do.
@@ -1962,10 +1983,48 @@ class Http2Stream extends Duplex{
19621983

19631984
letreq;
19641985

1986+
letwaitingForWriteCallback=true;
1987+
letwaitingForEndCheck=true;
1988+
letwriteCallbackErr;
1989+
letendCheckCallbackErr;
1990+
constdone=()=>{
1991+
if(waitingForEndCheck||waitingForWriteCallback)return;
1992+
consterr=writeCallbackErr||endCheckCallbackErr;
1993+
// writeGeneric does not destroy on error and
1994+
// we cannot enable autoDestroy,
1995+
// so make sure to destroy on error.
1996+
if(err){
1997+
this.destroy(err);
1998+
}
1999+
cb(err);
2000+
};
2001+
constwriteCallback=(err)=>{
2002+
waitingForWriteCallback=false;
2003+
writeCallbackErr=err;
2004+
done();
2005+
};
2006+
constendCheckCallback=(err)=>{
2007+
waitingForEndCheck=false;
2008+
endCheckCallbackErr=err;
2009+
done();
2010+
};
2011+
// Shutdown write stream right after last chunk is sent
2012+
// so final DATA frame can include END_STREAM flag
2013+
process.nextTick(()=>{
2014+
if(writeCallbackErr||
2015+
!this._writableState.ending||
2016+
this._writableState.buffered.length||
2017+
(this[kState].flags&STREAM_FLAGS_HAS_TRAILERS))
2018+
returnendCheckCallback();
2019+
// Backport v14.x: Session required for debugging stream object
2020+
// debugStreamObj(this, 'shutting down writable on last write');
2021+
shutdownWritable.call(this,endCheckCallback);
2022+
});
2023+
19652024
if(writev)
1966-
req=writevGeneric(this,data,cb);
2025+
req=writevGeneric(this,data,writeCallback);
19672026
else
1968-
req=writeGeneric(this,data,encoding,cb);
2027+
req=writeGeneric(this,data,encoding,writeCallback);
19692028

19702029
trackWriteState(this,req.bytes);
19712030
}
@@ -1979,21 +2038,13 @@ class Http2Stream extends Duplex{
19792038
}
19802039

19812040
_final(cb){
1982-
consthandle=this[kHandle];
19832041
if(this.pending){
19842042
this.once('ready',()=>this._final(cb));
1985-
}elseif(handle!==undefined){
1986-
debugStreamObj(this,'_final shutting down');
1987-
constreq=newShutdownWrap();
1988-
req.oncomplete=afterShutdown;
1989-
req.callback=cb;
1990-
req.handle=handle;
1991-
consterr=handle.shutdown(req);
1992-
if(err===1)// synchronous finish
1993-
returnafterShutdown.call(req,0);
1994-
}else{
1995-
cb();
2043+
return;
19962044
}
2045+
// Backport v14.x: Session required for debugging stream object
2046+
// debugStreamObj(this, 'shutting down writable on _final');
2047+
shutdownWritable.call(this,cb);
19972048
}
19982049

19992050
_read(nread){
@@ -2098,11 +2149,20 @@ class Http2Stream extends Duplex{
20982149
debugStream(this[kID]||'pending',session[kType],'destroying stream');
20992150

21002151
conststate=this[kState];
2101-
constsessionCode=session[kState].goawayCode||
2102-
session[kState].destroyCode;
2103-
constcode=err!=null ?
2104-
sessionCode||NGHTTP2_INTERNAL_ERROR :
2105-
state.rstCode||sessionCode;
2152+
constsessionState=session[kState];
2153+
constsessionCode=sessionState.goawayCode||sessionState.destroyCode;
2154+
2155+
// If a stream has already closed successfully, there is no error
2156+
// to report from this stream, even if the session has errored.
2157+
// This can happen if the stream was already in process of destroying
2158+
// after a successful close, but the session had a error between
2159+
// this stream's close and destroy operations.
2160+
// Previously, this always overrode a successful close operation code
2161+
// NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator.
2162+
constcode=(err!=null ?
2163+
(sessionCode||NGHTTP2_INTERNAL_ERROR) :
2164+
(this.closed ? this.rstCode : sessionCode)
2165+
);
21062166
consthasHandle=handle!==undefined;
21072167

21082168
if(!this.closed)
@@ -2111,13 +2171,13 @@ class Http2Stream extends Duplex{
21112171

21122172
if(hasHandle){
21132173
handle.destroy();
2114-
session[kState].streams.delete(id);
2174+
sessionState.streams.delete(id);
21152175
}else{
2116-
session[kState].pendingStreams.delete(this);
2176+
sessionState.pendingStreams.delete(this);
21172177
}
21182178

21192179
// Adjust the write queue size for accounting
2120-
session[kState].writeQueueSize-=state.writeQueueSize;
2180+
sessionState.writeQueueSize-=state.writeQueueSize;
21212181
state.writeQueueSize=0;
21222182

21232183
// RST code 8 not emitted as an error as its used by clients to signify

‎src/node_http2.cc‎

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ ssize_t Http2Session::OnMaxFrameSizePadding(size_t frameLen,
732732
// quite expensive. This is a potential performance optimization target later.
733733
ssize_tHttp2Session::ConsumeHTTP2Data(){
734734
CHECK_NOT_NULL(stream_buf_.base);
735-
CHECK_LT(stream_buf_offset_, stream_buf_.len);
735+
CHECK_LE(stream_buf_offset_, stream_buf_.len);
736736
size_t read_len = stream_buf_.len - stream_buf_offset_;
737737

738738
// multiple side effects.
@@ -753,11 +753,11 @@ ssize_t Http2Session::ConsumeHTTP2Data(){
753753
CHECK_GT(ret, 0);
754754
CHECK_LE(static_cast<size_t>(ret), read_len);
755755

756-
if (static_cast<size_t>(ret) < read_len){
757-
//Mark the remainder of the data as available for later consumption.
758-
stream_buf_offset_ += ret;
759-
return ret;
760-
}
756+
// Mark the remainder of the data as available for later consumption.
757+
//Even if all bytes were received, a paused stream may delay the
758+
// nghttp2_on_frame_recv_callback which may have an END_STREAM flag.
759+
stream_buf_offset_ += ret;
760+
return ret;
761761
}
762762

763763
// We are done processing the current input chunk.
@@ -1093,6 +1093,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
10931093
if (session->is_write_in_progress()){
10941094
CHECK(session->is_reading_stopped());
10951095
session->set_receive_paused();
1096+
Debug(session, "receive paused");
10961097
return NGHTTP2_ERR_PAUSE;
10971098
}
10981099

‎test/parallel/test-http2-misbehaving-multiplex.js‎

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Flags: --expose-internals
33

44
constcommon=require('../common');
5+
constassert=require('assert');
56

67
if(!common.hasCrypto)
78
common.skip('missing crypto');
@@ -13,16 +14,36 @@ const h2test = require('../common/http2');
1314
letclient;
1415

1516
constserver=h2.createServer();
17+
letgotFirstStreamId1;
1618
server.on('stream',common.mustCall((stream)=>{
1719
stream.respond();
1820
stream.end('ok');
1921

20-
// The error will be emitted asynchronously
21-
stream.on('error',common.expectsError({
22-
constructor: NghttpError,
23-
code: 'ERR_HTTP2_ERROR',
24-
message: 'Stream was already closed or invalid'
25-
}));
22+
// Http2Server should be fast enough to respond to and close
23+
// the first streams with ID 1 and ID 3 without errors.
24+
25+
// Test for errors in 'close' event to ensure no errors on some streams.
26+
stream.on('error',()=>{});
27+
stream.on('close',(err)=>{
28+
if(stream.id===1){
29+
if(gotFirstStreamId1){
30+
// We expect our outgoing frames to fail on Stream ID 1 the second time
31+
// because a stream with ID 1 was already closed before.
32+
common.expectsError({
33+
constructor: NghttpError,
34+
code: 'ERR_HTTP2_ERROR',
35+
message: 'Stream was already closed or invalid'
36+
});
37+
return;
38+
}
39+
gotFirstStreamId1=true;
40+
}
41+
assert.strictEqual(err,undefined);
42+
});
43+
44+
// Stream ID 5 should never reach the server
45+
assert.notStrictEqual(stream.id,5);
46+
2647
},2));
2748

2849
server.on('session',common.mustCall((session)=>{
@@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) =>{
3556

3657
constsettings=newh2test.SettingsFrame();
3758
constsettingsAck=newh2test.SettingsFrame(true);
38-
consthead1=newh2test.HeadersFrame(1,h2test.kFakeRequestHeaders,0,true);
39-
consthead2=newh2test.HeadersFrame(3,h2test.kFakeRequestHeaders,0,true);
40-
consthead3=newh2test.HeadersFrame(1,h2test.kFakeRequestHeaders,0,true);
41-
consthead4=newh2test.HeadersFrame(5,h2test.kFakeRequestHeaders,0,true);
59+
// HeadersFrame(id, payload, padding, END_STREAM)
60+
constid1=newh2test.HeadersFrame(1,h2test.kFakeRequestHeaders,0,true);
61+
constid3=newh2test.HeadersFrame(3,h2test.kFakeRequestHeaders,0,true);
62+
constid5=newh2test.HeadersFrame(5,h2test.kFakeRequestHeaders,0,true);
4263

4364
server.listen(0,()=>{
4465
client=net.connect(server.address().port,()=>{
4566
client.write(h2test.kClientMagic,()=>{
4667
client.write(settings.data,()=>{
4768
client.write(settingsAck.data);
48-
// This will make it ok.
49-
client.write(head1.data,()=>{
50-
// This will make it ok.
51-
client.write(head2.data,()=>{
69+
// Stream ID 1 frame will make it OK.
70+
client.write(id1.data,()=>{
71+
// Stream ID 3 frame will make it OK.
72+
client.write(id3.data,()=>{
73+
// A second Stream ID 1 frame should fail.
5274
// This will cause an error to occur because the client is
5375
// attempting to reuse an already closed stream. This must
5476
// cause the server session to be torn down.
55-
client.write(head3.data,()=>{
56-
// This won't ever make it to the server
57-
client.write(head4.data);
77+
client.write(id1.data,()=>{
78+
// This Stream ID 5 frame will never make it to the server
79+
client.write(id5.data);
5880
});
5981
});
6082
});
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
'use strict';
2+
3+
constcommon=require('../common');
4+
if(!common.hasCrypto)
5+
common.skip('missing crypto');
6+
constassert=require('assert');
7+
consthttp2=require('http2');
8+
9+
const{ PerformanceObserver }=require('perf_hooks');
10+
11+
constserver=http2.createServer();
12+
13+
server.on('stream',(stream,headers)=>{
14+
stream.respond({
15+
'content-type': 'text/html',
16+
':status': 200
17+
});
18+
switch(headers[':path']){
19+
case'/singleEnd':
20+
stream.end('OK');
21+
break;
22+
case'/sequentialEnd':
23+
stream.write('OK');
24+
stream.end();
25+
break;
26+
case'/delayedEnd':
27+
stream.write('OK',()=>stream.end());
28+
break;
29+
}
30+
});
31+
32+
functiontestRequest(path,targetFrameCount,callback){
33+
constobs=newPerformanceObserver((list,observer)=>{
34+
constentry=list.getEntries()[0];
35+
if(entry.name!=='Http2Session')return;
36+
if(entry.type!=='client')return;
37+
assert.strictEqual(entry.framesReceived,targetFrameCount);
38+
observer.disconnect();
39+
callback();
40+
});
41+
obs.observe({entryTypes: ['http2']});
42+
constclient=http2.connect(`http://localhost:${server.address().port}`,()=>{
43+
constreq=client.request({':path': path});
44+
req.resume();
45+
req.end();
46+
req.on('end',()=>client.close());
47+
});
48+
}
49+
50+
// SETTINGS => SETTINGS => HEADERS => DATA
51+
constMIN_FRAME_COUNT=4;
52+
53+
server.listen(0,()=>{
54+
testRequest('/singleEnd',MIN_FRAME_COUNT,()=>{
55+
testRequest('/sequentialEnd',MIN_FRAME_COUNT,()=>{
56+
testRequest('/delayedEnd',MIN_FRAME_COUNT+1,()=>{
57+
server.close();
58+
});
59+
});
60+
});
61+
});

‎test/parallel/test-http2-padding-aligned.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const makeDuplexPair = require('../common/duplexpair');
2626
// The lengths of the expected writes... note that this is highly
2727
// sensitive to how the internals are implemented.
2828
constserverLengths=[24,9,9,32];
29-
constclientLengths=[9,9,48,9,1,21,1,16];
29+
constclientLengths=[9,9,48,9,1,21,1];
3030

3131
// Adjust for the 24-byte preamble and two 9-byte settings frames, and
3232
// the result must be equally divisible by 8

‎test/parallel/test-http2-perf_hooks.js‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const obs = new PerformanceObserver(common.mustCall((items) =>{
3030
break;
3131
case'client':
3232
assert.strictEqual(entry.streamCount,1);
33-
assert.strictEqual(entry.framesReceived,8);
33+
assert.strictEqual(entry.framesReceived,7);
3434
break;
3535
default:
3636
assert.fail('invalid Http2Session type');

0 commit comments

Comments
(0)