Skip to content

Commit 7dc119c

Browse files
authored
Add support for HTTP/1 connection pre-warming (#856)
Motivation This patch adds support for HTTP/1 connection pre-warming. This allows the user to request that the HTTP/1 connection pool create and maintain extra connections, above-and-beyond those strictly needed to run the pool. This pool can be used to absorb small spikes in request traffic without increasing latency to account for connection creation. Modifications - Added new configuration properties for pre-warmed connections. - Amended the HTTP/1 state machine to create new connections where necessary. - Added state machine tests. Results Pre-warmed connections are available.
1 parent 254d340 commit 7dc119c

9 files changed

+993
-102
lines changed

‎Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift‎

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ final class HTTPConnectionPool:
7979
.concurrentHTTP1ConnectionsPerHostSoftLimit,
8080
retryConnectionEstablishment: clientConfiguration.connectionPool.retryConnectionEstablishment,
8181
preferHTTP1: clientConfiguration.httpVersion ==.http1Only,
82-
maximumConnectionUses: clientConfiguration.maximumUsesPerConnection
82+
maximumConnectionUses: clientConfiguration.maximumUsesPerConnection,
83+
preWarmedHTTP1ConnectionCount: clientConfiguration.connectionPool.preWarmedHTTP1ConnectionCount
8384
)
8485
}
8586

@@ -104,6 +105,11 @@ final class HTTPConnectionPool:
104105
enumUnlocked{
105106
case createConnection(Connection.ID, on:EventLoop)
106107
case closeConnection(Connection, isShutdown:StateMachine.ConnectionAction.IsShutdown)
108+
case closeConnectionAndCreateConnection(
109+
close:Connection,
110+
newConnectionID:Connection.ID,
111+
on:EventLoop
112+
)
107113
case cleanupConnections(CleanupContext, isShutdown:StateMachine.ConnectionAction.IsShutdown)
108114
case migration(
109115
createConnections:[(Connection.ID,EventLoop)],
@@ -185,12 +191,27 @@ final class HTTPConnectionPool:
185191
self.locked.connection =.scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop)
186192
case.scheduleTimeoutTimer(let connectionID, on:let eventLoop):
187193
self.locked.connection =.scheduleTimeoutTimer(connectionID, on: eventLoop)
194+
case.scheduleTimeoutTimerAndCreateConnection(let timeoutID,let newConnectionID,let eventLoop):
195+
self.locked.connection =.scheduleTimeoutTimer(timeoutID, on: eventLoop)
196+
self.unlocked.connection =.createConnection(newConnectionID, on: eventLoop)
188197
case.cancelTimeoutTimer(let connectionID):
189198
self.locked.connection =.cancelTimeoutTimer(connectionID)
199+
case.createConnectionAndCancelTimeoutTimer(let createdID, on:let eventLoop, cancelTimerID:let cancelID):
200+
self.unlocked.connection =.createConnection(createdID, on: eventLoop)
201+
self.locked.connection =.cancelTimeoutTimer(cancelID)
190202
case.closeConnection(let connection,let isShutdown):
191203
self.unlocked.connection =.closeConnection(connection, isShutdown: isShutdown)
204+
case.closeConnectionAndCreateConnection(
205+
let closeConnection,
206+
let newConnectionID,
207+
let eventLoop
208+
):
209+
self.unlocked.connection =.closeConnectionAndCreateConnection(
210+
close: closeConnection,
211+
newConnectionID: newConnectionID,
212+
on: eventLoop
213+
)
192214
case.cleanupConnections(var cleanupContext,let isShutdown):
193-
//
194215
self.locked.connection =.cancelBackoffTimers(cleanupContext.connectBackoff)
195216
cleanupContext.connectBackoff =[]
196217
self.unlocked.connection =.cleanupConnections(cleanupContext, isShutdown: isShutdown)
@@ -287,6 +308,23 @@ final class HTTPConnectionPool:
287308
self.delegate.connectionPoolDidShutdown(self, unclean: unclean)
288309
}
289310

311+
case.closeConnectionAndCreateConnection(
312+
let connectionToClose,
313+
let newConnectionID,
314+
let eventLoop
315+
):
316+
self.logger.trace(
317+
"closing and creating connection",
318+
metadata:[
319+
"ahc-connection-id":"\(connectionToClose.id)"
320+
]
321+
)
322+
323+
self.createConnection(newConnectionID, on: eventLoop)
324+
325+
// we are not interested in the close promise...
326+
connectionToClose.close(promise:nil)
327+
290328
case.cleanupConnections(let cleanupContext,let isShutdown):
291329
forconnectionin cleanupContext.close {
292330
connection.close(promise:nil)
@@ -400,7 +438,7 @@ final class HTTPConnectionPool:
400438
self.modifyStateAndRunActions{ stateMachine in
401439
ifself._idleTimer.removeValue(forKey: connectionID)!=nil{
402440
// The timer still exists. State Machines assumes it is alive
403-
return stateMachine.connectionIdleTimeout(connectionID)
441+
return stateMachine.connectionIdleTimeout(connectionID, on: eventLoop)
404442
}
405443
return.none
406444
}

‎Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift‎

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -262,32 +262,30 @@ extension HTTPConnectionPool{
262262
privatevaroverflowIndex:Array<HTTP1ConnectionState>.Index
263263
/// The number of times each connection can be used before it is closed and replaced.
264264
privateletmaximumConnectionUses:Int?
265-
266-
init(maximumConcurrentConnections:Int, generator:Connection.ID.Generator, maximumConnectionUses:Int?){
265+
/// How many pre-warmed connections we should create.
266+
privateletpreWarmedConnectionCount:Int
267+
268+
init(
269+
maximumConcurrentConnections:Int,
270+
generator:Connection.ID.Generator,
271+
maximumConnectionUses:Int?,
272+
preWarmedHTTP1ConnectionCount:Int
273+
){
267274
self.connections =[]
268275
self.connections.reserveCapacity(min(maximumConcurrentConnections,1024))
269276
self.overflowIndex =self.connections.endIndex
270277
self.maximumConcurrentConnections = maximumConcurrentConnections
271278
self.generator = generator
272279
self.maximumConnectionUses = maximumConnectionUses
280+
self.preWarmedConnectionCount = preWarmedHTTP1ConnectionCount
273281
}
274282

275283
varstats:Stats{
276-
varstats=Stats()
277-
// all additions here can be unchecked, since we will have at max self.connections.count
278-
// which itself is an Int. For this reason we will never overflow.
279-
forconnectionStateinself.connections {
280-
if connectionState.isConnecting {
281-
stats.connecting &+=1
282-
}elseif connectionState.isBackingOff {
283-
stats.backingOff &+=1
284-
}elseif connectionState.isLeased {
285-
stats.leased &+=1
286-
}elseif connectionState.isIdle {
287-
stats.idle &+=1
288-
}
289-
}
290-
return stats
284+
self.connectionStats(in:self.connections.startIndex..<self.connections.endIndex)
285+
}
286+
287+
vargeneralPurposeStats:Stats{
288+
self.connectionStats(in:self.connections.startIndex..<self.overflowIndex)
291289
}
292290

293291
varisEmpty:Bool{
@@ -328,6 +326,24 @@ extension HTTPConnectionPool{
328326
}
329327
}
330328

329+
privatefunc connectionStats(in range:Range<Int>)->Stats{
330+
varstats=Stats()
331+
// all additions here can be unchecked, since we will have at max self.connections.count
332+
// which itself is an Int. For this reason we will never overflow.
333+
forconnectionStateinself.connections[range]{
334+
if connectionState.isConnecting {
335+
stats.connecting &+=1
336+
}elseif connectionState.isBackingOff {
337+
stats.backingOff &+=1
338+
}elseif connectionState.isLeased {
339+
stats.leased &+=1
340+
}elseif connectionState.isIdle {
341+
stats.idle &+=1
342+
}
343+
}
344+
return stats
345+
}
346+
331347
// MARK: - Mutations -
332348

333349
/// A connection's use. Did it serve in the pool or was it specialized for an `EventLoop`?
@@ -836,6 +852,10 @@ extension HTTPConnectionPool{
836852
varleased:Int=0
837853
varconnecting:Int=0
838854
varbackingOff:Int=0
855+
856+
varnonLeased:Int{
857+
self.idle +self.connecting +self.backingOff
858+
}
839859
}
840860
}
841861
}

‎Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift‎

Lines changed: 119 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,23 @@ extension HTTPConnectionPool{
3333
/// The property was introduced to fail fast during testing.
3434
/// Otherwise this should always be true and not turned off.
3535
privateletretryConnectionEstablishment:Bool
36+
privateletpreWarmedConnectionCount:Int
3637

3738
init(
3839
idGenerator:Connection.ID.Generator,
3940
maximumConcurrentConnections:Int,
4041
retryConnectionEstablishment:Bool,
4142
maximumConnectionUses:Int?,
43+
preWarmedHTTP1ConnectionCount:Int,
4244
lifecycleState:StateMachine.LifecycleState
4345
){
4446
self.connections =HTTP1Connections(
4547
maximumConcurrentConnections: maximumConcurrentConnections,
4648
generator: idGenerator,
47-
maximumConnectionUses: maximumConnectionUses
49+
maximumConnectionUses: maximumConnectionUses,
50+
preWarmedHTTP1ConnectionCount: preWarmedHTTP1ConnectionCount
4851
)
52+
self.preWarmedConnectionCount = preWarmedHTTP1ConnectionCount
4953
self.retryConnectionEstablishment = retryConnectionEstablishment
5054

5155
self.requests =RequestQueue()
@@ -145,9 +149,26 @@ extension HTTPConnectionPool{
145149

146150
privatemutatingfunc executeRequestOnPreferredEventLoop(_ request:Request, eventLoop:EventLoop)->Action{
147151
iflet connection =self.connections.leaseConnection(onPreferred: eventLoop){
152+
// Cool, a connection is available. If using this would put us below our needed extra set, we
153+
// should create another.
154+
letstats=self.connections.generalPurposeStats
155+
letneedExtraConnection=
156+
stats.nonLeased <(self.requests.count +self.preWarmedConnectionCount) && self.connections.canGrow
157+
letaction:StateMachine.ConnectionAction
158+
159+
if needExtraConnection {
160+
action =.createConnectionAndCancelTimeoutTimer(
161+
createdID:self.connections.createNewConnection(on: eventLoop),
162+
on: eventLoop,
163+
cancelTimerID: connection.id
164+
)
165+
}else{
166+
action =.cancelTimeoutTimer(connection.id)
167+
}
168+
148169
return.init(
149170
request:.executeRequest(request, connection, cancelTimeout:false),
150-
connection:.cancelTimeoutTimer(connection.id)
171+
connection:action
151172
)
152173
}
153174

@@ -294,7 +315,20 @@ extension HTTPConnectionPool{
294315
}
295316
}
296317

297-
mutatingfunc connectionIdleTimeout(_ connectionID:Connection.ID)->Action{
318+
mutatingfunc connectionIdleTimeout(_ connectionID:Connection.ID, on eventLoop:anyEventLoop)->Action{
319+
// Don't close idle connections if we need pre-warmed connections. Instead, re-arm the idle timer.
320+
// We still want the idle timers to make sure we eventually fall below the pre-warmed limit.
321+
ifself.preWarmedConnectionCount >0{
322+
letstats=self.connections.generalPurposeStats
323+
if stats.idle <=self.preWarmedConnectionCount {
324+
return.init(
325+
request:.none,
326+
connection:.scheduleTimeoutTimer(connectionID, on: eventLoop)
327+
)
328+
}
329+
}
330+
331+
// Ok, we do actually want the connection count to go down.
298332
guardlet connection =self.connections.closeConnectionIfIdle(connectionID)else{
299333
// because of a race this connection (connection close runs against trigger of timeout)
300334
// was already removed from the state machine.
@@ -410,11 +444,7 @@ extension HTTPConnectionPool{
410444
case.running:
411445
// Close the connection if it's expired.
412446
if context.shouldBeClosed {
413-
letconnection=self.connections.closeConnection(at: index)
414-
return.init(
415-
request:.none,
416-
connection:.closeConnection(connection, isShutdown:.no)
417-
)
447+
returnself.nextActionForToBeClosedIdleConnection(at: index, context: context)
418448
}else{
419449
switch context.use {
420450
case.generalPurpose:
@@ -446,28 +476,63 @@ extension HTTPConnectionPool{
446476
at index:Int,
447477
context:HTTP1Connections.IdleConnectionContext
448478
)->EstablishedAction{
479+
varrequestAction=HTTPConnectionPool.StateMachine.RequestAction.none
480+
varparkedConnectionDetails:(HTTPConnectionPool.Connection.ID,anyEventLoop)?=nil
481+
449482
// 1. Check if there are waiting requests in the general purpose queue
450483
iflet request =self.requests.popFirst(for:nil){
451-
return.init(
452-
request:.executeRequest(request,self.connections.leaseConnection(at: index), cancelTimeout:true),
453-
connection:.none
484+
requestAction =.executeRequest(
485+
request,
486+
self.connections.leaseConnection(at: index),
487+
cancelTimeout:true
454488
)
455489
}
456490

457491
// 2. Check if there are waiting requests in the matching eventLoop queue
458-
iflet request =self.requests.popFirst(for: context.eventLoop){
459-
return.init(
460-
request:.executeRequest(request,self.connections.leaseConnection(at: index), cancelTimeout:true),
461-
connection:.none
492+
if case .none = requestAction,let request =self.requests.popFirst(for: context.eventLoop){
493+
requestAction =.executeRequest(
494+
request,
495+
self.connections.leaseConnection(at: index),
496+
cancelTimeout:true
462497
)
463498
}
464499

465500
// 3. Create a timeout timer to ensure the connection is closed if it is idle for too
466-
// long.
467-
let(connectionID, eventLoop)=self.connections.parkConnection(at: index)
501+
// long, assuming we don't already have a use for it.
502+
if case .none = requestAction {
503+
parkedConnectionDetails =self.connections.parkConnection(at: index)
504+
}
505+
506+
// 4. We may need to create another connection to make sure we have enough pre-warmed ones.
507+
// We need to do that if we have fewer non-leased connections than we need pre-warmed ones _and_ the pool can grow.
508+
// Note that in this case we don't need to account for the number of pending requests, as that is 0: step 1
509+
// confirmed that.
510+
letconnectionAction:EstablishedConnectionAction
511+
512+
ifself.connections.generalPurposeStats.nonLeased <self.preWarmedConnectionCount
513+
&& self.connections.canGrow
514+
{
515+
// Re-use the event loop of the connection that just got created.
516+
iflet parkedConnectionDetails {
517+
letnewConnectionID=self.connections.createNewConnection(on: parkedConnectionDetails.1)
518+
connectionAction =.scheduleTimeoutTimerAndCreateConnection(
519+
timeoutID: parkedConnectionDetails.0,
520+
newConnectionID: newConnectionID,
521+
on: parkedConnectionDetails.1
522+
)
523+
}else{
524+
letnewConnectionID=self.connections.createNewConnection(on: context.eventLoop)
525+
connectionAction =.createConnection(connectionID: newConnectionID, on: context.eventLoop)
526+
}
527+
}elseiflet parkedConnectionDetails {
528+
connectionAction =.scheduleTimeoutTimer(parkedConnectionDetails.0, on: parkedConnectionDetails.1)
529+
}else{
530+
connectionAction =.none
531+
}
532+
468533
return.init(
469-
request:.none,
470-
connection:.scheduleTimeoutTimer(connectionID, on: eventLoop)
534+
request:requestAction,
535+
connection:connectionAction
471536
)
472537
}
473538

@@ -495,6 +560,37 @@ extension HTTPConnectionPool{
495560
)
496561
}
497562

563+
privatemutatingfunc nextActionForToBeClosedIdleConnection(
564+
at index:Int,
565+
context:HTTP1Connections.IdleConnectionContext
566+
)->EstablishedAction{
567+
// Step 1: Tell the connection pool to drop what it knows about this object.
568+
letconnectionToClose=self.connections.closeConnection(at: index)
569+
570+
// Step 2: Check whether we need a connection to replace this one. We do if we have fewer non-leased connections
571+
// than we requests + minimumPrewarming count _and_ the pool can grow. Note that in many cases the above closure
572+
// will have made some space, which is just fine.
573+
letnonLeased=self.connections.generalPurposeStats.nonLeased
574+
letneededNonLeased=self.requests.generalPurposeCount +self.preWarmedConnectionCount
575+
576+
letconnectionAction:EstablishedConnectionAction
577+
if nonLeased < neededNonLeased && self.connections.canGrow {
578+
// We re-use the EL of the connection we just closed.
579+
letnewConnectionID=self.connections.createNewConnection(on: connectionToClose.eventLoop)
580+
connectionAction =.closeConnectionAndCreateConnection(
581+
closeConnection: connectionToClose,
582+
newConnectionID: newConnectionID,
583+
on: connectionToClose.eventLoop
584+
)
585+
}else{
586+
connectionAction =.closeConnection(connectionToClose, isShutdown:.no)
587+
}
588+
return.init(
589+
request:.none,
590+
connection: connectionAction
591+
)
592+
}
593+
498594
// MARK: Failed/Closed connection management
499595

500596
privatemutatingfunc nextActionForFailedConnection(
@@ -530,7 +626,10 @@ extension HTTPConnectionPool{
530626
at index:Int,
531627
context:HTTP1Connections.FailedConnectionContext
532628
)->Action{
533-
if context.connectionsStartingForUseCase <self.requests.generalPurposeCount {
629+
letneedConnectionForRequest=
630+
context.connectionsStartingForUseCase
631+
<(self.requests.generalPurposeCount +self.preWarmedConnectionCount)
632+
if needConnectionForRequest {
534633
// if we have more requests queued up, than we have starting connections, we should
535634
// create a new connection
536635
let(newConnectionID, newEventLoop)=self.connections.replaceConnection(at: index)

0 commit comments

Comments
(0)