Skip to content

Commit 1fe5485

Browse files
committed
Final implementation switch
1 parent 8bd8605 commit 1fe5485

File tree

6 files changed

+88
-639
lines changed

6 files changed

+88
-639
lines changed

‎Sources/AsyncHTTPClient/HTTPClient.swift‎

Lines changed: 56 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class HTTPClient{
6868
publicleteventLoopGroup:EventLoopGroup
6969
leteventLoopGroupProvider:EventLoopGroupProvider
7070
letconfiguration:Configuration
71-
letpool:ConnectionPool
71+
letpoolManager:HTTPConnectionPool.Manager
7272
varstate:State
7373
privateletstateLock=Lock()
7474

@@ -110,14 +110,18 @@ public class HTTPClient{
110110
#endif
111111
}
112112
self.configuration = configuration
113-
self.pool =ConnectionPool(configuration: configuration,
114-
backgroundActivityLogger: backgroundActivityLogger)
113+
self.poolManager =HTTPConnectionPool.Manager(
114+
eventLoopGroup:self.eventLoopGroup,
115+
configuration:self.configuration,
116+
backgroundActivityLogger: backgroundActivityLogger
117+
)
115118
self.state =.upAndRunning
116119
}
117120

118121
deinit{
119-
assert(self.pool.count ==0)
120-
assert(self.state ==.shutDown,"Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.")
122+
guard case .shutDown =self.state else{
123+
preconditionFailure("Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.")
124+
}
121125
}
122126

123127
/// Shuts down the client and `EventLoopGroup` if it was created by the client.
@@ -175,14 +179,16 @@ public class HTTPClient{
175179
switchself.eventLoopGroupProvider {
176180
case.shared:
177181
self.state =.shutDown
178-
callback(nil)
182+
queue.async{
183+
callback(nil)
184+
}
179185
case.createNew:
180186
switchself.state {
181187
case.shuttingDown:
182188
self.state =.shutDown
183189
self.eventLoopGroup.shutdownGracefully(queue: queue, callback)
184190
case.shutDown,.upAndRunning:
185-
assertionFailure("The only valid state at this point is \(State.shutDown)")
191+
assertionFailure("The only valid state at this point is \(String(describing:State.shuttingDown))")
186192
}
187193
}
188194
}
@@ -191,33 +197,35 @@ public class HTTPClient{
191197
privatefunc shutdown(requiresCleanClose:Bool, queue:DispatchQueue, _ callback:@escaping(Error?)->Void){
192198
do{
193199
tryself.stateLock.withLock{
194-
ifself.state !=.upAndRunning{
200+
guard case .upAndRunning =self.state else{
195201
throwHTTPClientError.alreadyShutdown
196202
}
197-
self.state =.shuttingDown
203+
self.state =.shuttingDown(requiresCleanClose: requiresCleanClose, callback: callback)
198204
}
199205
}catch{
200206
callback(error)
201207
return
202208
}
203209

204-
self.pool.close(on:self.eventLoopGroup.next()).whenComplete{ result in
205-
varcloseError:Error?
210+
letpromise=self.eventLoopGroup.next().makePromise(of:Bool.self)
211+
self.poolManager.shutdown(promise: promise)
212+
promise.futureResult.whenComplete{ result in
206213
switch result {
207-
case.failure(let error):
208-
closeError = error
209-
case.success(let cleanShutdown):
210-
if !cleanShutdown, requiresCleanClose {
211-
closeError =HTTPClientError.uncleanShutdown
214+
case.failure:
215+
preconditionFailure("Shutting down the connection pool must not fail, ever.")
216+
case.success(let unclean):
217+
let(callback, uncleanError)=self.stateLock.withLock{()->((Error?)->Void,Error?)in
218+
guard case .shuttingDown(let requiresClean, callback:let callback)=self.state else{
219+
preconditionFailure("Why did the pool manager shut down, if it was not instructed to")
220+
}
221+
222+
leterror:Error?=(requiresClean && unclean)?HTTPClientError.uncleanShutdown :nil
223+
return(callback, error)
212224
}
213225

214-
self.shutdownEventLoop(queue: queue){ eventLoopError in
215-
// we prioritise .uncleanShutdown here
216-
iflet error = closeError {
217-
callback(error)
218-
}else{
219-
callback(eventLoopError)
220-
}
226+
self.shutdownEventLoop(queue: queue){ error in
227+
letreportedError= error ?? uncleanError
228+
callback(reportedError)
221229
}
222230
}
223231
}
@@ -492,7 +500,7 @@ public class HTTPClient{
492500
lettaskEL:EventLoop
493501
switch eventLoopPreference.preference {
494502
case.indifferent:
495-
taskEL =self.pool.associatedEventLoop(for:ConnectionPool.Key(request))??self.eventLoopGroup.next()
503+
taskEL =self.eventLoopGroup.next()
496504
case.delegate(on:let eventLoop):
497505
precondition(self.eventLoopGroup.makeIterator().contains{ $0 === eventLoop },"Provided EventLoop must be part of clients EventLoopGroup.")
498506
taskEL = eventLoop
@@ -540,75 +548,31 @@ public class HTTPClient{
540548
}
541549

542550
lettask=Task<Delegate.Response>(eventLoop: taskEL, logger: logger)
543-
letsetupComplete= taskEL.makePromise(of:Void.self)
544-
letconnection=self.pool.getConnection(request,
545-
preference: eventLoopPreference,
546-
taskEventLoop: taskEL,
547-
deadline: deadline,
548-
setupComplete: setupComplete.futureResult,
549-
logger: logger)
550-
551-
lettaskHandler=TaskHandler(task: task,
552-
kind: request.kind,
553-
delegate: delegate,
554-
redirectHandler: redirectHandler,
555-
ignoreUncleanSSLShutdown:self.configuration.ignoreUncleanSSLShutdown,
556-
logger: logger)
557-
558-
connection.flatMap{ connection ->EventLoopFuture<Void>in
559-
logger.debug("got connection for request",
560-
metadata:["ahc-connection":"\(connection)",
561-
"ahc-request":"\(request.method)\(request.url)",
562-
"ahc-channel-el":"\(connection.channel.eventLoop)",
563-
"ahc-task-el":"\(taskEL)"])
564-
565-
letchannel= connection.channel
566-
567-
func prepareChannelForTask0()->EventLoopFuture<Void>{
568-
do{
569-
letsyncPipelineOperations= channel.pipeline.syncOperations
570-
571-
iflet timeout =self.resolve(timeout:self.configuration.timeout.read, deadline: deadline){
572-
try syncPipelineOperations.addHandler(IdleStateHandler(readTimeout: timeout))
573-
}
574-
575-
try syncPipelineOperations.addHandler(taskHandler)
576-
}catch{
577-
connection.release(closing:true, logger: logger)
578-
return channel.eventLoop.makeFailedFuture(error)
579-
}
580-
581-
task.setConnection(connection)
551+
do{
552+
letrequestBag=tryRequestBag(
553+
request: request,
554+
eventLoopPreference: eventLoopPreference,
555+
task: task,
556+
redirectHandler: redirectHandler,
557+
connectionDeadline:.now()+(self.configuration.timeout.connect ??.seconds(10)),
558+
idleReadTimeout:self.configuration.timeout.read,
559+
delegate: delegate
560+
)
582561

583-
letisCancelled= task.lock.withLock{
584-
task.cancelled
562+
vardeadlineSchedule:Scheduled<Void>?
563+
iflet deadline = deadline {
564+
deadlineSchedule = taskEL.scheduleTask(deadline: deadline){
565+
requestBag.fail(HTTPClientError.deadlineExceeded)
585566
}
586567

587-
if !isCancelled {
588-
return channel.writeAndFlush(request).flatMapError{ _ in
589-
// At this point the `TaskHandler` will already be present
590-
// to handle the failure and pass it to the `promise`
591-
channel.eventLoop.makeSucceededVoidFuture()
592-
}
593-
}else{
594-
return channel.eventLoop.makeSucceededVoidFuture()
568+
task.promise.futureResult.whenComplete{ _ in
569+
deadlineSchedule?.cancel()
595570
}
596571
}
597572

598-
if channel.eventLoop.inEventLoop {
599-
returnprepareChannelForTask0()
600-
}else{
601-
return channel.eventLoop.flatSubmit{
602-
returnprepareChannelForTask0()
603-
}
604-
}
605-
}.always{ _ in
606-
setupComplete.succeed(())
607-
}.whenFailure{ error in
608-
taskHandler.callOutToDelegateFireAndForget{ task in
609-
delegate.didReceiveError(task: task, error)
610-
}
611-
task.promise.fail(error)
573+
self.poolManager.executeRequest(requestBag)
574+
}catch{
575+
task.fail(with: error, delegateType:Delegate.self)
612576
}
613577

614578
return task
@@ -821,7 +785,7 @@ public class HTTPClient{
821785

822786
enumState{
823787
case upAndRunning
824-
case shuttingDown
788+
case shuttingDown(requiresCleanClose:Bool, callback:(Error?)->Void)
825789
case shutDown
826790
}
827791
}
@@ -926,6 +890,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible{
926890
case serverOfferedUnsupportedApplicationProtocol(String)
927891
case requestStreamCancelled
928892
case getConnectionFromPoolTimeout
893+
case deadlineExceeded
929894
}
930895

931896
privatevarcode:Code
@@ -995,6 +960,9 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible{
995960
returnHTTPClientError(code:.serverOfferedUnsupportedApplicationProtocol(proto))
996961
}
997962

963+
/// The request deadline was exceeded. The request was cancelled because of this.
964+
publicstaticletdeadlineExceeded=HTTPClientError(code:.deadlineExceeded)
965+
998966
/// The remote server responded with a status code >= 300, before the full request was sent. The request stream
999967
/// was therefore cancelled
1000968
publicstaticletrequestStreamCancelled=HTTPClientError(code:.requestStreamCancelled)

‎Sources/AsyncHTTPClient/HTTPHandler.swift‎

Lines changed: 24 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -658,18 +658,28 @@ extension HTTPClient{
658658
publicleteventLoop:EventLoop
659659

660660
letpromise:EventLoopPromise<Response>
661-
varcompletion:EventLoopFuture<Void>
662-
varconnection:Connection?
663-
varcancelled:Bool
664-
letlock:Lock
665661
letlogger:Logger // We are okay to store the logger here because a Task is for only one request.
666662

663+
varisCancelled:Bool{
664+
self.lock.withLock{self._isCancelled }
665+
}
666+
667+
vartaskDelegate:HTTPClientTaskDelegate?{
668+
get{
669+
self.lock.withLock{self._taskDelegate }
670+
}
671+
set{
672+
self.lock.withLock{self._taskDelegate = newValue }
673+
}
674+
}
675+
676+
privatevar_isCancelled:Bool=false
677+
privatevar_taskDelegate:HTTPClientTaskDelegate?
678+
privateletlock=Lock()
679+
667680
init(eventLoop:EventLoop, logger:Logger){
668681
self.eventLoop = eventLoop
669682
self.promise = eventLoop.makePromise()
670-
self.completion =self.promise.futureResult.map{ _ in}
671-
self.cancelled =false
672-
self.lock =Lock()
673683
self.logger = logger
674684
}
675685

@@ -694,69 +704,24 @@ extension HTTPClient{
694704

695705
/// Cancels the request execution.
696706
publicfunc cancel(){
697-
letchannel:Channel?=self.lock.withLock{
698-
if !self.cancelled {
699-
self.cancelled =true
700-
returnself.connection?.channel
701-
}else{
702-
returnnil
703-
}
707+
lettaskDelegate=self.lock.withLock{()->HTTPClientTaskDelegate?in
708+
self._isCancelled =true
709+
returnself._taskDelegate
704710
}
705-
channel?.triggerUserOutboundEvent(TaskCancelEvent(), promise:nil)
706-
}
707711

708-
@discardableResult
709-
func setConnection(_ connection:Connection)->Connection{
710-
returnself.lock.withLock{
711-
self.connection = connection
712-
ifself.cancelled {
713-
connection.channel.triggerUserOutboundEvent(TaskCancelEvent(), promise:nil)
714-
}
715-
return connection
716-
}
712+
taskDelegate?.cancel()
717713
}
718714

719715
func succeed<Delegate:HTTPClientResponseDelegate>(promise:EventLoopPromise<Response>?,
720716
with value:Response,
721717
delegateType:Delegate.Type,
722718
closing:Bool){
723-
self.releaseAssociatedConnection(delegateType: delegateType,
724-
closing: closing).whenSuccess{
725-
promise?.succeed(value)
726-
}
719+
promise?.succeed(value)
727720
}
728721

729722
func fail<Delegate:HTTPClientResponseDelegate>(with error:Error,
730723
delegateType:Delegate.Type){
731-
iflet connection =self.connection {
732-
self.releaseAssociatedConnection(delegateType: delegateType, closing:true)
733-
.whenSuccess{
734-
self.promise.fail(error)
735-
connection.channel.close(promise:nil)
736-
}
737-
}else{
738-
// this is used in tests where we don't want to bootstrap the whole connection pool
739-
self.promise.fail(error)
740-
}
741-
}
742-
743-
func releaseAssociatedConnection<Delegate:HTTPClientResponseDelegate>(delegateType:Delegate.Type,
744-
closing:Bool)->EventLoopFuture<Void>{
745-
iflet connection =self.connection {
746-
// remove read timeout handler
747-
return connection.removeHandler(IdleStateHandler.self).flatMap{
748-
connection.removeHandler(TaskHandler<Delegate>.self)
749-
}.map{
750-
connection.release(closing: closing, logger:self.logger)
751-
}.flatMapError{ error in
752-
fatalError("Couldn't remove taskHandler: \(error)")
753-
}
754-
}else{
755-
// TODO: This seems only reached in some internal unit test
756-
// Maybe there could be a better handling in the future to make
757-
// it an error outside of testing contexts
758-
returnself.eventLoop.makeSucceededFuture(())
759-
}
724+
self.promise.fail(error)
760725
}
761726
}
762727
}
@@ -1076,9 +1041,7 @@ extension TaskHandler: ChannelDuplexHandler{
10761041
break
10771042
case.redirected(let head,let redirectURL):
10781043
self.state =.endOrError
1079-
self.task.releaseAssociatedConnection(delegateType:Delegate.self, closing:self.closing).whenSuccess{
1080-
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise:self.task.promise)
1081-
}
1044+
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise:self.task.promise)
10821045
default:
10831046
self.state =.bufferedEnd
10841047
self.handleReadForDelegate(response, context: context)

‎Sources/AsyncHTTPClient/RequestBag.swift‎

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate>{
6666
self.requestFramingMetadata = metadata
6767

6868
// TODO: comment in once we switch to using the Request bag in AHC
69-
// self.task.taskDelegate = self
70-
// self.task.futureResult.whenComplete{_ in
71-
// self.task.taskDelegate = nil
72-
// }
69+
self.task.taskDelegate =self
70+
self.task.futureResult.whenComplete{ _ in
71+
self.task.taskDelegate =nil
72+
}
7373
}
7474

7575
privatefunc requestWasQueued0(_ scheduler:HTTPRequestScheduler){
@@ -111,7 +111,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate>{
111111
self.writeNextRequestPart($0)
112112
}
113113

114-
body.stream(writer).whenComplete{
114+
body.stream(writer).hop(to:self.eventLoop).whenComplete{
115115
self.finishRequestBodyStream($0)
116116
}
117117

@@ -140,7 +140,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate>{
140140
}
141141

142142
privatefunc writeNextRequestPart0(_ part:IOData)->EventLoopFuture<Void>{
143-
self.task.eventLoop.assertInEventLoop()
143+
self.eventLoop.assertInEventLoop()
144144

145145
letaction=self.state.writeNextRequestPart(part, taskEventLoop:self.task.eventLoop)
146146

0 commit comments

Comments
(0)