Skip to content

Commit e592c32

Browse files
apapirovskigibfahn
authored andcommitted
http2: fix several timeout related issues
* correctly reset write timers: currently reset timers on both session & stream when write starts and when it ends. * prevent large writes from timing out: when writing a large chunk of data in http2, once the data is handed off to C++, the JS session & stream lose all track of the write and will timeout if the write doesn't complete within the timeout window Fix this issue by tracking whether a write request is ongoing and also tracking how many chunks have been sent since the most recent write started. (Since each write call resets the timer.) PR-URL: #16525 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 1b08ae8 commit e592c32

File tree

6 files changed

+274
-6
lines changed

6 files changed

+274
-6
lines changed

‎lib/internal/http2/core.js‎

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,8 @@ class Http2Session extends EventEmitter{
746746
shutdown: false,
747747
shuttingDown: false,
748748
pendingAck: 0,
749-
maxPendingAck: Math.max(1,(options.maxPendingAck|0)||10)
749+
maxPendingAck: Math.max(1,(options.maxPendingAck|0)||10),
750+
writeQueueSize: 0
750751
};
751752

752753
this[kType]=type;
@@ -1080,6 +1081,22 @@ class Http2Session extends EventEmitter{
10801081
}
10811082

10821083
_onTimeout(){
1084+
// This checks whether a write is currently in progress and also whether
1085+
// that write is actually sending data across the write. The kHandle
1086+
// stored `chunksSentSinceLastWrite` is only updated when a timeout event
1087+
// happens, meaning that if a write is ongoing it should never equal the
1088+
// newly fetched, updated value.
1089+
if(this[kState].writeQueueSize>0){
1090+
consthandle=this[kHandle];
1091+
constchunksSentSinceLastWrite=handle!==undefined ?
1092+
handle.chunksSentSinceLastWrite : null;
1093+
if(chunksSentSinceLastWrite!==null&&
1094+
chunksSentSinceLastWrite!==handle.updateChunksSent()){
1095+
_unrefActive(this);
1096+
return;
1097+
}
1098+
}
1099+
10831100
process.nextTick(emit,this,'timeout');
10841101
}
10851102
}
@@ -1199,8 +1216,27 @@ function createWriteReq(req, handle, data, encoding){
11991216
}
12001217
}
12011218

1219+
functiontrackWriteState(stream,bytes){
1220+
constsession=stream[kSession];
1221+
stream[kState].writeQueueSize+=bytes;
1222+
session[kState].writeQueueSize+=bytes;
1223+
session[kHandle].chunksSentSinceLastWrite=0;
1224+
}
1225+
12021226
functionafterDoStreamWrite(status,handle,req){
1203-
_unrefActive(handle[kOwner]);
1227+
constsession=handle[kOwner];
1228+
_unrefActive(session);
1229+
1230+
conststate=session[kState];
1231+
const{ bytes }=req;
1232+
state.writeQueueSize-=bytes;
1233+
1234+
conststream=state.streams.get(req.stream);
1235+
if(stream!==undefined){
1236+
_unrefActive(stream);
1237+
stream[kState].writeQueueSize-=bytes;
1238+
}
1239+
12041240
if(typeofreq.callback==='function')
12051241
req.callback();
12061242
this.handle=undefined;
@@ -1312,7 +1348,8 @@ class Http2Stream extends Duplex{
13121348
headersSent: false,
13131349
headRequest: false,
13141350
aborted: false,
1315-
closeHandler: onSessionClose.bind(this)
1351+
closeHandler: onSessionClose.bind(this),
1352+
writeQueueSize: 0
13161353
};
13171354

13181355
this.once('ready',streamOnceReady);
@@ -1359,6 +1396,23 @@ class Http2Stream extends Duplex{
13591396
}
13601397

13611398
_onTimeout(){
1399+
// This checks whether a write is currently in progress and also whether
1400+
// that write is actually sending data across the write. The kHandle
1401+
// stored `chunksSentSinceLastWrite` is only updated when a timeout event
1402+
// happens, meaning that if a write is ongoing it should never equal the
1403+
// newly fetched, updated value.
1404+
if(this[kState].writeQueueSize>0){
1405+
consthandle=this[kSession][kHandle];
1406+
constchunksSentSinceLastWrite=handle!==undefined ?
1407+
handle.chunksSentSinceLastWrite : null;
1408+
if(chunksSentSinceLastWrite!==null&&
1409+
chunksSentSinceLastWrite!==handle.updateChunksSent()){
1410+
_unrefActive(this);
1411+
_unrefActive(this[kSession]);
1412+
return;
1413+
}
1414+
}
1415+
13621416
process.nextTick(emit,this,'timeout');
13631417
}
13641418

@@ -1396,10 +1450,11 @@ class Http2Stream extends Duplex{
13961450
this.once('ready',this._write.bind(this,data,encoding,cb));
13971451
return;
13981452
}
1399-
_unrefActive(this);
14001453
if(!this[kState].headersSent)
14011454
this[kProceed]();
14021455
constsession=this[kSession];
1456+
_unrefActive(this);
1457+
_unrefActive(session);
14031458
consthandle=session[kHandle];
14041459
constreq=newWriteWrap();
14051460
req.stream=this[kID];
@@ -1410,18 +1465,19 @@ class Http2Stream extends Duplex{
14101465
consterr=createWriteReq(req,handle,data,encoding);
14111466
if(err)
14121467
throwutil._errnoException(err,'write',req.error);
1413-
this._bytesDispatched+=req.bytes;
1468+
trackWriteState(this,req.bytes);
14141469
}
14151470

14161471
_writev(data,cb){
14171472
if(this[kID]===undefined){
14181473
this.once('ready',this._writev.bind(this,data,cb));
14191474
return;
14201475
}
1421-
_unrefActive(this);
14221476
if(!this[kState].headersSent)
14231477
this[kProceed]();
14241478
constsession=this[kSession];
1479+
_unrefActive(this);
1480+
_unrefActive(session);
14251481
consthandle=session[kHandle];
14261482
constreq=newWriteWrap();
14271483
req.stream=this[kID];
@@ -1438,6 +1494,7 @@ class Http2Stream extends Duplex{
14381494
consterr=handle.writev(req,chunks);
14391495
if(err)
14401496
throwutil._errnoException(err,'write',req.error);
1497+
trackWriteState(this,req.bytes);
14411498
}
14421499

14431500
_read(nread){
@@ -1531,6 +1588,10 @@ class Http2Stream extends Duplex{
15311588
return;
15321589
}
15331590

1591+
conststate=this[kState];
1592+
session[kState].writeQueueSize-=state.writeQueueSize;
1593+
state.writeQueueSize=0;
1594+
15341595
constserver=session[kServer];
15351596
if(server!==undefined&&err){
15361597
server.emit('streamError',err,this);
@@ -1625,7 +1686,12 @@ function processRespondWithFD(fd, headers, offset = 0, length = -1,
16251686
if(ret<0){
16261687
err=newNghttpError(ret);
16271688
process.nextTick(emit,this,'error',err);
1689+
break;
16281690
}
1691+
// exact length of the file doesn't matter here, since the
1692+
// stream is closing anyway — just use 1 to signify that
1693+
// a write does exist
1694+
trackWriteState(this,1);
16291695
}
16301696
}
16311697

‎src/env.h‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class ModuleWrap;
111111
V(callback_string, "callback") \
112112
V(change_string, "change") \
113113
V(channel_string, "channel") \
114+
V(chunks_sent_since_last_write_string, "chunksSentSinceLastWrite") \
114115
V(constants_string, "constants") \
115116
V(oncertcb_string, "oncertcb") \
116117
V(onclose_string, "_onclose") \

‎src/node_http2.cc‎

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,8 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo<Value>& args){
603603
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID);
604604
}
605605

606+
session->chunks_sent_since_last_write_ = 0;
607+
606608
Headers list(isolate, context, headers);
607609

608610
args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(),
@@ -757,6 +759,23 @@ void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args){
757759
stream->FlushDataChunks();
758760
}
759761

762+
voidHttp2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args){
763+
Http2Session* session;
764+
Environment* env = Environment::GetCurrent(args);
765+
Isolate* isolate = env->isolate();
766+
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
767+
768+
HandleScope scope(isolate);
769+
770+
uint32_t length = session->chunks_sent_since_last_write_;
771+
772+
session->object()->Set(env->context(),
773+
env->chunks_sent_since_last_write_string(),
774+
Integer::NewFromUnsigned(isolate, length)).FromJust();
775+
776+
args.GetReturnValue().Set(length);
777+
}
778+
760779
voidHttp2Session::SubmitPushPromise(const FunctionCallbackInfo<Value>& args){
761780
Http2Session* session;
762781
Environment* env = Environment::GetCurrent(args);
@@ -811,6 +830,8 @@ int Http2Session::DoWrite(WriteWrap* req_wrap,
811830
}
812831
}
813832

833+
chunks_sent_since_last_write_ = 0;
834+
814835
nghttp2_stream_write_t* req = newnghttp2_stream_write_t;
815836
req->data = req_wrap;
816837

@@ -846,6 +867,7 @@ void Http2Session::Send(uv_buf_t* buf, size_t length){
846867
this,
847868
AfterWrite);
848869

870+
chunks_sent_since_last_write_++;
849871
uv_buf_t actual = uv_buf_init(buf->base, length);
850872
if (stream_->DoWrite(write_req, &actual, 1, nullptr)){
851873
write_req->Dispose();
@@ -1255,6 +1277,8 @@ void Initialize(Local<Object> target,
12551277
Http2Session::DestroyStream);
12561278
env->SetProtoMethod(session, "flushData",
12571279
Http2Session::FlushData);
1280+
env->SetProtoMethod(session, "updateChunksSent",
1281+
Http2Session::UpdateChunksSent);
12581282
StreamBase::AddMethods<Http2Session>(env, session,
12591283
StreamBase::kFlagHasWritev |
12601284
StreamBase::kFlagNoShutdown);

‎src/node_http2.h‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ class Http2Session : public AsyncWrap,
475475
staticvoidSubmitGoaway(const FunctionCallbackInfo<Value>& args);
476476
staticvoidDestroyStream(const FunctionCallbackInfo<Value>& args);
477477
staticvoidFlushData(const FunctionCallbackInfo<Value>& args);
478+
staticvoidUpdateChunksSent(const FunctionCallbackInfo<Value>& args);
478479

479480
template <get_setting fn>
480481
staticvoidGetSettings(const FunctionCallbackInfo<Value>& args);
@@ -493,6 +494,9 @@ class Http2Session : public AsyncWrap,
493494
StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
494495
padding_strategy_type padding_strategy_ = PADDING_STRATEGY_NONE;
495496

497+
// use this to allow timeout tracking during long-lasting writes
498+
uint32_t chunks_sent_since_last_write_ = 0;
499+
496500
char stream_buf_[kAllocBufferSize];
497501
};
498502

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
'use strict';
2+
constcommon=require('../common');
3+
if(!common.hasCrypto)
4+
common.skip('missing crypto');
5+
constassert=require('assert');
6+
constfixtures=require('../common/fixtures');
7+
constfs=require('fs');
8+
consthttp2=require('http2');
9+
constpath=require('path');
10+
11+
common.refreshTmpDir();
12+
13+
// This test assesses whether long-running writes can complete
14+
// or timeout because the session or stream are not aware that the
15+
// backing stream is still writing.
16+
// To simulate a slow client, we write a really large chunk and
17+
// then proceed through the following cycle:
18+
// 1) Receive first 'data' event and record currently written size
19+
// 2) Once we've read up to currently written size recorded above,
20+
// we pause the stream and wait longer than the server timeout
21+
// 3) Socket.prototype._onTimeout triggers and should confirm
22+
// that the backing stream is still active and writing
23+
// 4) Our timer fires, we resume the socket and start at 1)
24+
25+
constwriteSize=3000000;
26+
constminReadSize=500000;
27+
constserverTimeout=common.platformTimeout(500);
28+
letoffsetTimeout=common.platformTimeout(100);
29+
letdidReceiveData=false;
30+
31+
constcontent=Buffer.alloc(writeSize,0x44);
32+
constfilepath=path.join(common.tmpDir,'http2-large-write.tmp');
33+
fs.writeFileSync(filepath,content,'binary');
34+
constfd=fs.openSync(filepath,'r');
35+
36+
constserver=http2.createSecureServer({
37+
key: fixtures.readKey('agent1-key.pem'),
38+
cert: fixtures.readKey('agent1-cert.pem')
39+
});
40+
server.on('stream',common.mustCall((stream)=>{
41+
stream.respondWithFD(fd,{
42+
'Content-Type': 'application/octet-stream',
43+
'Content-Length': content.length.toString(),
44+
'Vary': 'Accept-Encoding'
45+
});
46+
stream.setTimeout(serverTimeout);
47+
stream.on('timeout',()=>{
48+
assert.strictEqual(didReceiveData,false,'Should not timeout');
49+
});
50+
stream.end();
51+
}));
52+
server.setTimeout(serverTimeout);
53+
server.on('timeout',()=>{
54+
assert.strictEqual(didReceiveData,false,'Should not timeout');
55+
});
56+
57+
server.listen(0,common.mustCall(()=>{
58+
constclient=http2.connect(`https://localhost:${server.address().port}`,
59+
{rejectUnauthorized: false});
60+
61+
constreq=client.request({':path': '/'});
62+
req.end();
63+
64+
constresume=()=>req.resume();
65+
letreceivedBufferLength=0;
66+
letfirstReceivedAt;
67+
req.on('data',common.mustCallAtLeast((buf)=>{
68+
if(receivedBufferLength===0){
69+
didReceiveData=false;
70+
firstReceivedAt=Date.now();
71+
}
72+
receivedBufferLength+=buf.length;
73+
if(receivedBufferLength>=minReadSize&&
74+
receivedBufferLength<writeSize){
75+
didReceiveData=true;
76+
receivedBufferLength=0;
77+
req.pause();
78+
setTimeout(
79+
resume,
80+
serverTimeout+offsetTimeout-(Date.now()-firstReceivedAt)
81+
);
82+
offsetTimeout=0;
83+
}
84+
},1));
85+
req.on('end',common.mustCall(()=>{
86+
client.destroy();
87+
server.close();
88+
}));
89+
}));

0 commit comments

Comments
(0)