Skip to content

Commit 19e2ea7

Browse files
author
Trevör
authored
Add an HTTP/1.1 connection pool (swift-server#105)
motivation: Better performance thanks to connection reuse changes: - Added a connection pool for HTTP/1.1 - All requests automatically use the connection pool - Up to 8 parallel connections per (scheme, host, port) - Multiple additional unit tests
1 parent de74219 commit 19e2ea7

File tree

10 files changed

+1727
-209
lines changed

10 files changed

+1727
-209
lines changed

‎Sources/AsyncHTTPClient/ConnectionPool.swift‎

Lines changed: 653 additions & 0 deletions
Large diffs are not rendered by default.

‎Sources/AsyncHTTPClient/HTTPClient.swift‎

Lines changed: 222 additions & 103 deletions
Large diffs are not rendered by default.

‎Sources/AsyncHTTPClient/HTTPClientProxyHandler.swift‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ internal final class HTTPClientProxyHandler: ChannelDuplexHandler, RemovableChan
6969
case awaitingResponse
7070
case connecting
7171
case connected
72+
case failed
7273
}
7374

7475
privatelethost:String
@@ -102,6 +103,7 @@ internal final class HTTPClientProxyHandler: ChannelDuplexHandler, RemovableChan
102103
// blank line that concludes the successful response's header section
103104
break
104105
case407:
106+
self.readState =.failed
105107
context.fireErrorCaught(HTTPClientError.proxyAuthenticationRequired)
106108
default:
107109
// Any response other than a successful response
@@ -119,6 +121,8 @@ internal final class HTTPClientProxyHandler: ChannelDuplexHandler, RemovableChan
119121
self.readBuffer.append(data)
120122
case.connected:
121123
context.fireChannelRead(data)
124+
case.failed:
125+
break
122126
}
123127
}
124128

‎Sources/AsyncHTTPClient/HTTPHandler.swift‎

Lines changed: 98 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import NIO
1717
import NIOConcurrencyHelpers
1818
import NIOFoundationCompat
1919
import NIOHTTP1
20+
import NIOHTTPCompression
2021
import NIOSSL
2122

2223
extensionHTTPClient{
@@ -486,22 +487,31 @@ extension URL{
486487
extensionHTTPClient{
487488
/// Response execution context. Will be created by the library and could be used for obtaining
488489
/// `EventLoopFuture<Response>` of the execution or cancellation of the execution.
489-
publicfinalclassTask<Response>{
490+
publicfinalclassTask<Response>:TaskProtocol{
490491
/// The `EventLoop` the delegate will be executed on.
491492
publicleteventLoop:EventLoop
492493

493494
letpromise:EventLoopPromise<Response>
494-
varchannel:Channel?
495-
privatevarcancelled:Bool
496-
privateletlock:Lock
495+
varcompletion:EventLoopFuture<Void>
496+
varconnection:ConnectionPool.Connection?
497+
varcancelled:Bool
498+
letlock:Lock
499+
letid=UUID()
497500

498501
init(eventLoop:EventLoop){
499502
self.eventLoop = eventLoop
500503
self.promise = eventLoop.makePromise()
504+
self.completion =self.promise.futureResult.map{ _ in}
501505
self.cancelled =false
502506
self.lock =Lock()
503507
}
504508

509+
staticfunc failedTask(eventLoop:EventLoop, error:Error)->Task<Response>{
510+
lettask=self.init(eventLoop: eventLoop)
511+
task.promise.fail(error)
512+
return task
513+
}
514+
505515
/// `EventLoopFuture` for the response returned by this request.
506516
publicvarfutureResult:EventLoopFuture<Response>{
507517
returnself.promise.futureResult
@@ -520,28 +530,74 @@ extension HTTPClient{
520530
letchannel:Channel?=self.lock.withLock{
521531
if !cancelled {
522532
cancelled =true
523-
returnself.channel
533+
returnself.connection?.channel
534+
}else{
535+
returnnil
524536
}
525-
returnnil
526537
}
527538
channel?.triggerUserOutboundEvent(TaskCancelEvent(), promise:nil)
528539
}
529540

530541
@discardableResult
531-
funcsetChannel(_ channel:Channel)->Channel{
542+
funcsetConnection(_ connection:ConnectionPool.Connection)->ConnectionPool.Connection{
532543
returnself.lock.withLock{
533-
self.channel = channel
534-
return channel
544+
self.connection = connection
545+
ifself.cancelled {
546+
connection.channel.triggerUserOutboundEvent(TaskCancelEvent(), promise:nil)
547+
}
548+
return connection
549+
}
550+
}
551+
552+
func succeed<Delegate:HTTPClientResponseDelegate>(promise:EventLoopPromise<Response>?, with value:Response, delegateType:Delegate.Type){
553+
self.releaseAssociatedConnection(delegateType: delegateType).whenSuccess{
554+
promise?.succeed(value)
555+
}
556+
}
557+
558+
func fail<Delegate:HTTPClientResponseDelegate>(with error:Error, delegateType:Delegate.Type){
559+
iflet connection =self.connection {
560+
connection.close().whenComplete{ _ in
561+
self.releaseAssociatedConnection(delegateType: delegateType).whenComplete{ _ in
562+
self.promise.fail(error)
563+
}
564+
}
565+
}
566+
}
567+
568+
func releaseAssociatedConnection<Delegate:HTTPClientResponseDelegate>(delegateType:Delegate.Type)->EventLoopFuture<Void>{
569+
iflet connection =self.connection {
570+
return connection.removeHandler(NIOHTTPResponseDecompressor.self).flatMap{
571+
connection.removeHandler(IdleStateHandler.self)
572+
}.flatMap{
573+
connection.removeHandler(TaskHandler<Delegate>.self)
574+
}.map{
575+
connection.release()
576+
}.flatMapError{ error in
577+
fatalError("Couldn't remove taskHandler: \(error)")
578+
}
579+
580+
}else{
581+
// TODO: This seems only reached in some internal unit test
582+
// Maybe there could be a better handling in the future to make
583+
// it an error outside of testing contexts
584+
returnself.eventLoop.makeSucceededFuture(())
535585
}
536586
}
537587
}
538588
}
539589

540590
internalstructTaskCancelEvent{}
541591

592+
internalprotocolTaskProtocol{
593+
func cancel()
594+
varid:UUID{get}
595+
varcompletion:EventLoopFuture<Void>{get}
596+
}
597+
542598
// MARK: - TaskHandler
543599

544-
internalclassTaskHandler<Delegate:HTTPClientResponseDelegate>{
600+
internalclassTaskHandler<Delegate:HTTPClientResponseDelegate>:RemovableChannelHandler{
545601
enumState{
546602
case idle
547603
case sent
@@ -581,7 +637,7 @@ extension TaskHandler{
581637
_ body:@escaping(HTTPClient.Task<Delegate.Response>,Err)->Void){
582638
func doIt(){
583639
body(self.task, error)
584-
self.task.promise.fail(error)
640+
self.task.fail(with:error, delegateType:Delegate.self)
585641
}
586642

587643
ifself.task.eventLoop.inEventLoop {
@@ -621,13 +677,14 @@ extension TaskHandler{
621677
}
622678

623679
func callOutToDelegate<Response>(promise:EventLoopPromise<Response>?=nil,
624-
_ body:@escaping(HTTPClient.Task<Delegate.Response>)throws->Response){
680+
_ body:@escaping(HTTPClient.Task<Delegate.Response>)throws->Response)where Response ==Delegate.Response{
625681
func doIt(){
626682
do{
627683
letresult=trybody(self.task)
628-
promise?.succeed(result)
684+
685+
self.task.succeed(promise: promise, with: result, delegateType:Delegate.self)
629686
}catch{
630-
promise?.fail(error)
687+
self.task.fail(with:error, delegateType:Delegate.self)
631688
}
632689
}
633690

@@ -641,7 +698,7 @@ extension TaskHandler{
641698
}
642699

643700
func callOutToDelegate<Response>(channelEventLoop:EventLoop,
644-
_ body:@escaping(HTTPClient.Task<Delegate.Response>)throws->Response)->EventLoopFuture<Response>{
701+
_ body:@escaping(HTTPClient.Task<Delegate.Response>)throws->Response)->EventLoopFuture<Response>where Response ==Delegate.Response{
645702
letpromise= channelEventLoop.makePromise(of:Response.self)
646703
self.callOutToDelegate(promise: promise, body)
647704
return promise.futureResult
@@ -678,8 +735,6 @@ extension TaskHandler: ChannelDuplexHandler{
678735
headers.add(name:"Host", value: request.host)
679736
}
680737

681-
headers.add(name:"Connection", value:"close")
682-
683738
do{
684739
try headers.validate(body: request.body)
685740
}catch{
@@ -702,16 +757,10 @@ extension TaskHandler: ChannelDuplexHandler{
702757
context.eventLoop.assertInEventLoop()
703758
self.state =.sent
704759
self.callOutToDelegateFireAndForget(self.delegate.didSendRequest)
705-
706-
letchannel= context.channel
707-
self.task.futureResult.whenComplete{ _ in
708-
channel.close(promise:nil)
709-
}
710760
}.flatMapErrorThrowing{ error in
711761
context.eventLoop.assertInEventLoop()
712762
self.state =.end
713763
self.failTaskAndNotifyDelegate(error: error,self.delegate.didReceiveError)
714-
context.close(promise:nil)
715764
throw error
716765
}.cascade(to: promise)
717766
}
@@ -742,6 +791,16 @@ extension TaskHandler: ChannelDuplexHandler{
742791
letresponse=self.unwrapInboundIn(data)
743792
switch response {
744793
case.head(let head):
794+
if !head.isKeepAlive {
795+
self.task.lock.withLock{
796+
iflet connection =self.task.connection {
797+
connection.isClosing =true
798+
}else{
799+
preconditionFailure("There should always be a connection at this point")
800+
}
801+
}
802+
}
803+
745804
iflet redirectURL = redirectHandler?.redirectTarget(status: head.status, headers: head.headers){
746805
self.state =.redirected(head, redirectURL)
747806
}else{
@@ -768,8 +827,9 @@ extension TaskHandler: ChannelDuplexHandler{
768827
switchself.state {
769828
case.redirected(let head,let redirectURL):
770829
self.state =.end
771-
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise:self.task.promise)
772-
context.close(promise:nil)
830+
self.task.releaseAssociatedConnection(delegateType:Delegate.self).whenSuccess{
831+
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise:self.task.promise)
832+
}
773833
default:
774834
self.state =.end
775835
self.callOutToDelegate(promise:self.task.promise,self.delegate.didFinishRequest)
@@ -845,6 +905,13 @@ extension TaskHandler: ChannelDuplexHandler{
845905
self.failTaskAndNotifyDelegate(error: error,self.delegate.didReceiveError)
846906
}
847907
}
908+
909+
func handlerAdded(context:ChannelHandlerContext){
910+
guard context.channel.isActive else{
911+
self.failTaskAndNotifyDelegate(error:HTTPClientError.remoteConnectionClosed,self.delegate.didReceiveError)
912+
return
913+
}
914+
}
848915
}
849916

850917
// MARK: - RedirectHandler
@@ -931,9 +998,13 @@ internal struct RedirectHandler<ResponseType>{
931998
do{
932999
varnewRequest=tryHTTPClient.Request(url: redirectURL, method: method, headers: headers, body: body)
9331000
newRequest.redirectState = nextState
934-
returnself.execute(newRequest).futureResult.cascade(to: promise)
1001+
self.execute(newRequest).futureResult.whenComplete{ result in
1002+
promise.futureResult.eventLoop.execute{
1003+
promise.completeWith(result)
1004+
}
1005+
}
9351006
}catch{
936-
returnpromise.fail(error)
1007+
promise.fail(error)
9371008
}
9381009
}
9391010
}

‎Sources/AsyncHTTPClient/Utils.swift‎

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import NIO
1616
import NIOHTTP1
17+
import NIOHTTPCompression
1718

1819
internalextensionString{
1920
varisIPAddress:Bool{
@@ -44,3 +45,53 @@ public final class HTTPClientCopyingDelegate: HTTPClientResponseDelegate{
4445
return()
4546
}
4647
}
48+
49+
extensionClientBootstrap{
50+
staticfunc makeHTTPClientBootstrapBase(group:EventLoopGroup, host:String, port:Int, configuration:HTTPClient.Configuration, channelInitializer:((Channel)->EventLoopFuture<Void>)?=nil)->ClientBootstrap{
51+
returnClientBootstrap(group: group)
52+
.channelOption(ChannelOptions.socket(SocketOptionLevel(IPPROTO_TCP), TCP_NODELAY), value:1)
53+
54+
.channelInitializer{ channel in
55+
letchannelAddedFuture:EventLoopFuture<Void>
56+
switch configuration.proxy {
57+
case.none:
58+
channelAddedFuture = group.next().makeSucceededFuture(())
59+
case.some:
60+
channelAddedFuture = channel.pipeline.addProxyHandler(host: host, port: port, authorization: configuration.proxy?.authorization)
61+
}
62+
return channelAddedFuture.flatMap{(_:Void)->EventLoopFuture<Void>in
63+
channelInitializer?(channel)?? group.next().makeSucceededFuture(())
64+
}
65+
}
66+
}
67+
}
68+
69+
extensionCircularBuffer{
70+
@discardableResult
71+
mutatingfunc swapWithFirstAndRemove(at index:Index)->Element?{
72+
precondition(index >=self.startIndex && index <self.endIndex)
73+
if !self.isEmpty {
74+
self.swapAt(self.startIndex, index)
75+
returnself.removeFirst()
76+
}else{
77+
returnnil
78+
}
79+
}
80+
81+
@discardableResult
82+
mutatingfunc swapWithFirstAndRemove(where predicate:(Element)throws->Bool)rethrows->Element?{
83+
iflet existingIndex =tryself.firstIndex(where: predicate){
84+
returnself.swapWithFirstAndRemove(at: existingIndex)
85+
}else{
86+
returnnil
87+
}
88+
}
89+
}
90+
91+
extensionConnectionPool.Connection{
92+
func removeHandler<Handler:RemovableChannelHandler>(_ type:Handler.Type)->EventLoopFuture<Void>{
93+
returnself.channel.pipeline.handler(type: type).flatMap{ handler in
94+
self.channel.pipeline.removeHandler(handler)
95+
}.recover{ _ in}
96+
}
97+
}

‎Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ extension HTTPClientInternalTests{
3333
("testUploadStreamingBackpressure", testUploadStreamingBackpressure),
3434
("testRequestURITrailingSlash", testRequestURITrailingSlash),
3535
("testChannelAndDelegateOnDifferentEventLoops", testChannelAndDelegateOnDifferentEventLoops),
36+
("testResponseConnectionCloseGet", testResponseConnectionCloseGet),
3637
]
3738
}
3839
}

0 commit comments

Comments
(0)