Skip to content

Commit 7cd68cb

Browse files
committed
[ConnectionPool] Split up locked and unlocked actions
# Conflicts: # Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift
1 parent 7808ca1 commit 7cd68cb

File tree

1 file changed

+132
-73
lines changed

1 file changed

+132
-73
lines changed

‎Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift‎

Lines changed: 132 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,17 @@ final class HTTPConnectionPool{
131131

132132
privateletstateLock=Lock()
133133
privatevar_state:StateMachine
134+
/// The connection idle timeout timers. Protected by the stateLock
135+
privatevar_idleTimer=[Connection.ID:Scheduled<Void>]()
136+
/// The connection backoff timeout timers. Protected by the stateLock
137+
privatevar_backoffTimer=[Connection.ID:Scheduled<Void>]()
134138

135139
privatestaticletfallbackConnectTimeout:TimeAmount=.seconds(30)
136140

137141
letkey:ConnectionPool.Key
138142

139143
privatelettimerLock=Lock()
140144
privatevar_requestTimer=[Request.ID:Scheduled<Void>]()
141-
privatevar_idleTimer=[Connection.ID:Scheduled<Void>]()
142-
privatevar_backoffTimer=[Connection.ID:Scheduled<Void>]()
143145

144146
privatevarlogger:Logger
145147

@@ -182,33 +184,90 @@ final class HTTPConnectionPool{
182184
}
183185

184186
func executeRequest(_ request:HTTPSchedulableRequest){
185-
letaction=self.stateLock.withLock{()->StateMachine.Actionin
186-
self._state.executeRequest(.init(request))
187-
}
188-
self.run(action: action)
187+
self.modifyStateAndRunActions{ $0.executeRequest(.init(request))}
189188
}
190189

191190
func shutdown(){
192191
self.logger.debug("Shutting down connection pool")
193-
letaction=self.stateLock.withLock{()->StateMachine.Actionin
194-
self._state.shutdown()
192+
self.modifyStateAndRunActions{ $0.shutdown()}
193+
}
194+
195+
// MARK: - Private Methods -
196+
197+
// MARK: Actions
198+
199+
///
200+
privatestructActions{
201+
enumConnectionAction{
202+
enumUnlocked{
203+
case createConnection(Connection.ID, on:EventLoop)
204+
case closeConnection(Connection, isShutdown:StateMachine.ConnectionAction.IsShutdown)
205+
case cleanupConnections(CleanupContext, isShutdown:StateMachine.ConnectionAction.IsShutdown)
206+
case none
207+
}
208+
209+
enumLocked{
210+
case scheduleBackoffTimer(Connection.ID, backoff:TimeAmount, on:EventLoop)
211+
case cancelBackoffTimers([Connection.ID])
212+
case scheduleTimeoutTimer(Connection.ID, on:EventLoop)
213+
case cancelTimeoutTimer(Connection.ID)
214+
case none
215+
}
216+
}
217+
218+
structLocked{
219+
varconnection:ConnectionAction.Locked
220+
}
221+
222+
structUnlocked{
223+
varconnection:ConnectionAction.Unlocked
224+
varrequest:StateMachine.RequestAction
225+
}
226+
227+
varlocked:Locked
228+
varunlocked:Unlocked
229+
230+
init(from stateMachineAction:StateMachine.Action){
231+
self.locked =Locked(connection:.none)
232+
self.unlocked =Unlocked(connection:.none, request: stateMachineAction.request)
233+
234+
switch stateMachineAction.connection {
235+
case.createConnection(let connectionID, on:let eventLoop):
236+
self.unlocked.connection =.createConnection(connectionID, on: eventLoop)
237+
case.scheduleBackoffTimer(let connectionID, backoff:let backoff, on:let eventLoop):
238+
self.locked.connection =.scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop)
239+
case.scheduleTimeoutTimer(let connectionID, on:let eventLoop):
240+
self.locked.connection =.scheduleTimeoutTimer(connectionID, on: eventLoop)
241+
case.cancelTimeoutTimer(let connectionID):
242+
self.locked.connection =.cancelTimeoutTimer(connectionID)
243+
case.closeConnection(let connection, isShutdown:let isShutdown):
244+
self.unlocked.connection =.closeConnection(connection, isShutdown: isShutdown)
245+
case.cleanupConnections(var cleanupContext, isShutdown:let isShutdown):
246+
//
247+
self.locked.connection =.cancelBackoffTimers(cleanupContext.connectBackoff)
248+
cleanupContext.connectBackoff =[]
249+
self.unlocked.connection =.cleanupConnections(cleanupContext, isShutdown: isShutdown)
250+
case.none:
251+
break
252+
}
195253
}
196-
self.run(action: action)
197254
}
198255

199256
// MARK: Run actions
200257

201-
privatefunc run(action:StateMachine.Action){
202-
self.runConnectionAction(action.connection)
203-
self.runRequestAction(action.request)
258+
privatefunc modifyStateAndRunActions(_ closure:(inoutStateMachine)->StateMachine.Action){
259+
letunlockedActions=self.stateLock.withLock{()->Actions.Unlockedin
260+
letstateMachineAction=closure(&self._state)
261+
letpoolAction=Actions(from: stateMachineAction)
262+
self.runLockedActions(poolAction.locked)
263+
return poolAction.unlocked
264+
}
265+
self.runUnlockedActions(unlockedActions)
204266
}
205267

206-
privatefunc runConnectionAction(_ action:StateMachine.ConnectionAction){
207-
switch action {
208-
case.createConnection(let connectionID,let eventLoop):
209-
self.createConnection(connectionID, on: eventLoop)
210-
211-
case.scheduleBackoffTimer(let connectionID,let backoff, on:let eventLoop):
268+
privatefunc runLockedActions(_ actions:Actions.Locked){
269+
switch actions.connection {
270+
case.scheduleBackoffTimer(let connectionID, backoff:let backoff, on:let eventLoop):
212271
self.scheduleConnectionStartBackoffTimer(connectionID, backoff, on: eventLoop)
213272

214273
case.scheduleTimeoutTimer(let connectionID, on:let eventLoop):
@@ -217,6 +276,26 @@ final class HTTPConnectionPool{
217276
case.cancelTimeoutTimer(let connectionID):
218277
self.cancelIdleTimerForConnection(connectionID)
219278

279+
case.cancelBackoffTimers(let connectionIDs):
280+
forconnectionIDin connectionIDs {
281+
self.cancelConnectionStartBackoffTimer(connectionID)
282+
}
283+
284+
case.none:
285+
break
286+
}
287+
}
288+
289+
privatefunc runUnlockedActions(_ actions:Actions.Unlocked){
290+
self.runUnlockedConnectionAction(actions.connection)
291+
self.runUnlockedRequestAction(actions.request)
292+
}
293+
294+
privatefunc runUnlockedConnectionAction(_ action:Actions.ConnectionAction.Unlocked){
295+
switch action {
296+
case.createConnection(let connectionID,let eventLoop):
297+
self.createConnection(connectionID, on: eventLoop)
298+
220299
case.closeConnection(let connection, isShutdown:let isShutdown):
221300
self.logger.trace("close connection", metadata:[
222301
"ahc-connection-id":"\(connection.id)",
@@ -251,7 +330,7 @@ final class HTTPConnectionPool{
251330
}
252331
}
253332

254-
privatefuncrunRequestAction(_ action:StateMachine.RequestAction){
333+
privatefuncrunUnlockedRequestAction(_ action:StateMachine.RequestAction){
255334
// The order of execution fail/execute request vs cancelling the request timeout timer does
256335
// not matter in the actions here. The actions don't cause any side effects that will be
257336
// reported back to the state machine and are not dependent on each other.
@@ -323,11 +402,9 @@ final class HTTPConnectionPool{
323402
guard timeoutFired else{return}
324403

325404
// 3. Tell the state machine about the timeout
326-
letaction=self.stateLock.withLock{
327-
self._state.timeoutRequest(requestID)
405+
self.modifyStateAndRunActions{
406+
$0.timeoutRequest(requestID)
328407
}
329-
330-
self.run(action: action)
331408
}
332409

333410
self.timerLock.withLockVoid{
@@ -362,34 +439,27 @@ final class HTTPConnectionPool{
362439
letscheduled= eventLoop.scheduleTask(in:self.idleConnectionTimeout){
363440
// there might be a race between a cancelTimer call and the triggering
364441
// of this scheduled task. both want to acquire the lock
365-
lettimerExisted=self.timerLock.withLock{
366-
self._idleTimer.removeValue(forKey: connectionID)!=nil
442+
self.modifyStateAndRunActions{ stateMachine in
443+
ifself._idleTimer.removeValue(forKey: connectionID)!=nil{
444+
// The timer still exists. State Machines assumes it is alive
445+
return stateMachine.connectionIdleTimeout(connectionID)
446+
}
447+
return.none
367448
}
368-
369-
guard timerExisted else{return}
370-
371-
letaction=self.stateLock.withLock{
372-
self._state.connectionIdleTimeout(connectionID)
373-
}
374-
self.run(action: action)
375449
}
376450

377-
self.timerLock.withLock{
378-
assert(self._idleTimer[connectionID]==nil)
379-
self._idleTimer[connectionID]= scheduled
380-
}
451+
assert(self._idleTimer[connectionID]==nil)
452+
self._idleTimer[connectionID]= scheduled
381453
}
382454

383455
privatefunc cancelIdleTimerForConnection(_ connectionID:Connection.ID){
384456
self.logger.trace("Cancel idle connection timeout timer", metadata:[
385457
"ahc-connection-id":"\(connectionID)",
386458
])
387-
388-
letcancelTimer=self.timerLock.withLock{
389-
self._idleTimer.removeValue(forKey: connectionID)
459+
guardlet cancelTimer =self._idleTimer.removeValue(forKey: connectionID)else{
460+
preconditionFailure("Expected to have an idle timer for connection \(connectionID) at this point.")
390461
}
391-
392-
cancelTimer?.cancel()
462+
cancelTimer.cancel()
393463
}
394464

395465
privatefunc scheduleConnectionStartBackoffTimer(
@@ -403,30 +473,24 @@ final class HTTPConnectionPool{
403473

404474
letscheduled= eventLoop.scheduleTask(in: timeAmount){
405475
// there might be a race between a backoffTimer and the pool shutting down.
406-
lettimerExisted=self.timerLock.withLock{
407-
self._backoffTimer.removeValue(forKey: connectionID)!=nil
408-
}
409-
410-
guard timerExisted else{return}
411-
412-
letaction=self.stateLock.withLock{
413-
self._state.connectionCreationBackoffDone(connectionID)
476+
self.modifyStateAndRunActions{ stateMachine in
477+
ifself._backoffTimer.removeValue(forKey: connectionID)!=nil{
478+
// The timer still exists. State Machines assumes it is alive
479+
return stateMachine.connectionCreationBackoffDone(connectionID)
480+
}
481+
return.none
414482
}
415-
self.run(action: action)
416483
}
417484

418-
self.timerLock.withLock{
419-
assert(self._backoffTimer[connectionID]==nil)
420-
self._backoffTimer[connectionID]= scheduled
421-
}
485+
assert(self._backoffTimer[connectionID]==nil)
486+
self._backoffTimer[connectionID]= scheduled
422487
}
423488

424489
privatefunc cancelConnectionStartBackoffTimer(_ connectionID:Connection.ID){
425-
letbackoffTimer=self.timerLock.withLock{
426-
self._backoffTimer[connectionID]
490+
guardlet backoffTimer =self._backoffTimer.removeValue(forKey: connectionID)else{
491+
preconditionFailure("Expected to have a backoff timer for connection \(connectionID) at this point.")
427492
}
428-
429-
backoffTimer?.cancel()
493+
backoffTimer.cancel()
430494
}
431495
}
432496

@@ -438,10 +502,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester{
438502
"ahc-connection-id":"\(connection.id)",
439503
"ahc-http-version":"http/1.1",
440504
])
441-
letaction=self.stateLock.withLock{
442-
self._state.newHTTP1ConnectionCreated(.http1_1(connection))
505+
self.modifyStateAndRunActions{
506+
$0.newHTTP1ConnectionCreated(.http1_1(connection))
443507
}
444-
self.run(action: action)
445508
}
446509

447510
func http2ConnectionCreated(_ connection:HTTP2Connection, maximumStreams:Int){
@@ -464,10 +527,9 @@ extension HTTPConnectionPool: HTTPConnectionRequester{
464527
"ahc-error":"\(error)",
465528
"ahc-connection-id":"\(connectionID)",
466529
])
467-
letaction=self.stateLock.withLock{
468-
self._state.failedToCreateNewConnection(error, connectionID: connectionID)
530+
self.modifyStateAndRunActions{
531+
$0.failedToCreateNewConnection(error, connectionID: connectionID)
469532
}
470-
self.run(action: action)
471533
}
472534
}
473535

@@ -477,21 +539,19 @@ extension HTTPConnectionPool: HTTP1ConnectionDelegate{
477539
"ahc-connection-id":"\(connection.id)",
478540
"ahc-http-version":"http/1.1",
479541
])
480-
letaction=self.stateLock.withLock{
481-
self._state.connectionClosed(connection.id)
542+
self.modifyStateAndRunActions{
543+
$0.connectionClosed(connection.id)
482544
}
483-
self.run(action: action)
484545
}
485546

486547
func http1ConnectionReleased(_ connection:HTTP1Connection){
487548
self.logger.trace("releasing connection", metadata:[
488549
"ahc-connection-id":"\(connection.id)",
489550
"ahc-http-version":"http/1.1",
490551
])
491-
letaction=self.stateLock.withLock{
492-
self._state.http1ConnectionReleased(connection.id)
552+
self.modifyStateAndRunActions{
553+
$0.http1ConnectionReleased(connection.id)
493554
}
494-
self.run(action: action)
495555
}
496556
}
497557

@@ -524,10 +584,9 @@ extension HTTPConnectionPool: HTTP2ConnectionDelegate{
524584
extensionHTTPConnectionPool:HTTPRequestScheduler{
525585
func cancelRequest(_ request:HTTPSchedulableRequest){
526586
letrequestID=Request(request).id
527-
letaction=self.stateLock.withLock{
528-
self._state.cancelRequest(requestID)
587+
self.modifyStateAndRunActions{
588+
$0.cancelRequest(requestID)
529589
}
530-
self.run(action: action)
531590
}
532591
}
533592

0 commit comments

Comments
(0)