Skip to content

Commit 8e4d519

Browse files
authored
Refactor Channel creation (swift-server#377)
- The connection creation logic has been refactored into a number of smaller methods that can be combined - Connection creation now has a logical home. It is moved from `Utils.swift` into a `ConnectionFactory` - There are explicit `ChannelHandlers` that are used for connection creation: - `TLSEventsHandler` got its own file and unit tests - `HTTP1ProxyConnectHandler` got its own file and unit tests - `SOCKSEventsHandler` got its own file and unit tests - Some small things are already part of this pr that will get their context later. For example: - `HTTPConnectionPool` is added as a namespace to not cause major renames in follow up PRs - `HTTPConnectionPool.Connection.ID` and its generator were added now. (This will be used later to identify a connection during its lifetime) - the file `HTTPConnectionPool+Manager` was added to give `HTTPConnectionPool.Connection.ID.Generator` already its final destination.
1 parent af837ed commit 8e4d519

23 files changed

+1753
-559
lines changed

‎Sources/AsyncHTTPClient/ConnectionPool.swift‎

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import Logging
1717
import NIO
1818
import NIOConcurrencyHelpers
1919
import NIOHTTP1
20-
import NIOHTTPCompression
2120
import NIOSSL
2221
import NIOTLS
2322
import NIOTransportServices
@@ -86,7 +85,9 @@ final class ConnectionPool{
8685
letprovider=HTTP1ConnectionProvider(key: key,
8786
eventLoop: taskEventLoop,
8887
configuration: key.config(overriding:self.configuration),
88+
tlsConfiguration: request.tlsConfiguration,
8989
pool:self,
90+
sslContextCache:self.sslContextCache,
9091
backgroundActivityLogger:self.backgroundActivityLogger)
9192
letenqueued= provider.enqueue()
9293
assert(enqueued)
@@ -213,6 +214,8 @@ class HTTP1ConnectionProvider{
213214

214215
privateletbackgroundActivityLogger:Logger
215216

217+
privateletfactory:HTTPConnectionPool.ConnectionFactory
218+
216219
/// Creates a new `HTTP1ConnectionProvider`
217220
///
218221
/// - parameters:
@@ -225,7 +228,9 @@ class HTTP1ConnectionProvider{
225228
init(key:ConnectionPool.Key,
226229
eventLoop:EventLoop,
227230
configuration:HTTPClient.Configuration,
231+
tlsConfiguration:TLSConfiguration?,
228232
pool:ConnectionPool,
233+
sslContextCache:SSLContextCache,
229234
backgroundActivityLogger:Logger){
230235
self.eventLoop = eventLoop
231236
self.configuration = configuration
@@ -234,6 +239,13 @@ class HTTP1ConnectionProvider{
234239
self.closePromise = eventLoop.makePromise()
235240
self.state =.init(eventLoop: eventLoop)
236241
self.backgroundActivityLogger = backgroundActivityLogger
242+
243+
self.factory =HTTPConnectionPool.ConnectionFactory(
244+
key:self.key,
245+
tlsConfiguration: tlsConfiguration,
246+
clientConfiguration:self.configuration,
247+
sslContextCache: sslContextCache
248+
)
237249
}
238250

239251
deinit{
@@ -440,12 +452,15 @@ class HTTP1ConnectionProvider{
440452

441453
privatefunc makeChannel(preference:HTTPClient.EventLoopPreference,
442454
logger:Logger)->EventLoopFuture<Channel>{
443-
returnNIOClientTCPBootstrap.makeHTTP1Channel(destination:self.key,
444-
eventLoop:self.eventLoop,
445-
configuration:self.configuration,
446-
sslContextCache:self.pool.sslContextCache,
447-
preference: preference,
448-
logger: logger)
455+
letconnectionID=HTTPConnectionPool.Connection.ID.globalGenerator.next()
456+
leteventLoop= preference.bestEventLoop ??self.eventLoop
457+
letdeadline=.now()+self.configuration.timeout.connectionCreationTimeout
458+
returnself.factory.makeHTTP1Channel(
459+
connectionID: connectionID,
460+
deadline: deadline,
461+
eventLoop: eventLoop,
462+
logger: logger
463+
)
449464
}
450465

451466
/// A `Waiter` represents a request that waits for a connection when none is
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
import NIOHTTP1
17+
18+
finalclassHTTP1ProxyConnectHandler:ChannelDuplexHandler,RemovableChannelHandler{
19+
typealiasOutboundIn=Never
20+
typealiasOutboundOut=HTTPClientRequestPart
21+
typealiasInboundIn=HTTPClientResponsePart
22+
23+
enumState{
24+
// transitions to `.connectSent` or `.failed`
25+
case initialized
26+
// transitions to `.headReceived` or `.failed`
27+
case connectSent(Scheduled<Void>)
28+
// transitions to `.completed` or `.failed`
29+
case headReceived(Scheduled<Void>)
30+
// final error state
31+
case failed(Error)
32+
// final success state
33+
case completed
34+
}
35+
36+
privatevarstate:State=.initialized
37+
38+
privatelettargetHost:String
39+
privatelettargetPort:Int
40+
privateletproxyAuthorization:HTTPClient.Authorization?
41+
privateletdeadline:NIODeadline
42+
43+
privatevarproxyEstablishedPromise:EventLoopPromise<Void>?
44+
varproxyEstablishedFuture:EventLoopFuture<Void>?{
45+
returnself.proxyEstablishedPromise?.futureResult
46+
}
47+
48+
init(targetHost:String,
49+
targetPort:Int,
50+
proxyAuthorization:HTTPClient.Authorization?,
51+
deadline:NIODeadline){
52+
self.targetHost = targetHost
53+
self.targetPort = targetPort
54+
self.proxyAuthorization = proxyAuthorization
55+
self.deadline = deadline
56+
}
57+
58+
func handlerAdded(context:ChannelHandlerContext){
59+
self.proxyEstablishedPromise = context.eventLoop.makePromise(of:Void.self)
60+
61+
self.sendConnect(context: context)
62+
}
63+
64+
func handlerRemoved(context:ChannelHandlerContext){
65+
switchself.state {
66+
case.failed,.completed:
67+
break
68+
case.initialized,.connectSent,.headReceived:
69+
structNoResult:Error{}
70+
self.state =.failed(NoResult())
71+
self.proxyEstablishedPromise?.fail(NoResult())
72+
}
73+
}
74+
75+
func channelActive(context:ChannelHandlerContext){
76+
self.sendConnect(context: context)
77+
}
78+
79+
func channelInactive(context:ChannelHandlerContext){
80+
switchself.state {
81+
case.initialized:
82+
preconditionFailure("How can we receive a channelInactive before a channelActive?")
83+
case.connectSent(let timeout),.headReceived(let timeout):
84+
timeout.cancel()
85+
self.failWithError(HTTPClientError.remoteConnectionClosed, context: context, closeConnection:false)
86+
87+
case.failed,.completed:
88+
break
89+
}
90+
}
91+
92+
func write(context:ChannelHandlerContext, data:NIOAny, promise:EventLoopPromise<Void>?){
93+
preconditionFailure("We don't support outgoing traffic during HTTP Proxy update.")
94+
}
95+
96+
func channelRead(context:ChannelHandlerContext, data:NIOAny){
97+
switchself.unwrapInboundIn(data){
98+
case.head(let head):
99+
self.handleHTTPHeadReceived(head, context: context)
100+
case.body:
101+
self.handleHTTPBodyReceived(context: context)
102+
case.end:
103+
self.handleHTTPEndReceived(context: context)
104+
}
105+
}
106+
107+
privatefunc sendConnect(context:ChannelHandlerContext){
108+
guard case .initialized =self.state else{
109+
// we might run into this handler twice, once in handlerAdded and once in channelActive.
110+
return
111+
}
112+
113+
lettimeout= context.eventLoop.scheduleTask(deadline:self.deadline){
114+
switchself.state {
115+
case.initialized:
116+
preconditionFailure("How can we have a scheduled timeout, if the connection is not even up?")
117+
118+
case.connectSent,.headReceived:
119+
self.failWithError(HTTPClientError.httpProxyHandshakeTimeout, context: context)
120+
121+
case.failed,.completed:
122+
break
123+
}
124+
}
125+
126+
self.state =.connectSent(timeout)
127+
128+
varhead=HTTPRequestHead(
129+
version:.init(major:1, minor:1),
130+
method:.CONNECT,
131+
uri:"\(self.targetHost):\(self.targetPort)"
132+
)
133+
iflet authorization =self.proxyAuthorization {
134+
head.headers.replaceOrAdd(name:"proxy-authorization", value: authorization.headerValue)
135+
}
136+
context.write(self.wrapOutboundOut(.head(head)), promise:nil)
137+
context.write(self.wrapOutboundOut(.end(nil)), promise:nil)
138+
context.flush()
139+
}
140+
141+
privatefunc handleHTTPHeadReceived(_ head:HTTPResponseHead, context:ChannelHandlerContext){
142+
guard case .connectSent(let scheduled)=self.state else{
143+
preconditionFailure("HTTPDecoder should throw an error, if we have not send a request")
144+
}
145+
146+
switch head.status.code {
147+
case200..<300:
148+
// Any 2xx (Successful) response indicates that the sender (and all
149+
// inbound proxies) will switch to tunnel mode immediately after the
150+
// blank line that concludes the successful response's header section
151+
self.state =.headReceived(scheduled)
152+
case407:
153+
self.failWithError(HTTPClientError.proxyAuthenticationRequired, context: context)
154+
155+
default:
156+
// Any response other than a successful response indicates that the tunnel
157+
// has not yet been formed and that the connection remains governed by HTTP.
158+
self.failWithError(HTTPClientError.invalidProxyResponse, context: context)
159+
}
160+
}
161+
162+
privatefunc handleHTTPBodyReceived(context:ChannelHandlerContext){
163+
switchself.state {
164+
case.headReceived(let timeout):
165+
timeout.cancel()
166+
// we don't expect a body
167+
self.failWithError(HTTPClientError.invalidProxyResponse, context: context)
168+
case.failed:
169+
// ran into an error before... ignore this one
170+
break
171+
case.completed,.connectSent,.initialized:
172+
preconditionFailure("Invalid state: \(self.state)")
173+
}
174+
}
175+
176+
privatefunc handleHTTPEndReceived(context:ChannelHandlerContext){
177+
switchself.state {
178+
case.headReceived(let timeout):
179+
timeout.cancel()
180+
self.state =.completed
181+
self.proxyEstablishedPromise?.succeed(())
182+
183+
case.failed:
184+
// ran into an error before... ignore this one
185+
break
186+
case.initialized,.connectSent,.completed:
187+
preconditionFailure("Invalid state: \(self.state)")
188+
}
189+
}
190+
191+
privatefunc failWithError(_ error:Error, context:ChannelHandlerContext, closeConnection:Bool=true){
192+
self.state =.failed(error)
193+
self.proxyEstablishedPromise?.fail(error)
194+
context.fireErrorCaught(error)
195+
if closeConnection {
196+
context.close(mode:.all, promise:nil)
197+
}
198+
}
199+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
import NIOSOCKS
17+
18+
finalclassSOCKSEventsHandler:ChannelInboundHandler,RemovableChannelHandler{
19+
typealiasInboundIn=NIOAny
20+
21+
enumState{
22+
// transitions to channelActive or failed
23+
case initialized
24+
// transitions to socksEstablished or failed
25+
case channelActive(Scheduled<Void>)
26+
// final success state
27+
case socksEstablished
28+
// final success state
29+
case failed(Error)
30+
}
31+
32+
privatevarsocksEstablishedPromise:EventLoopPromise<Void>?
33+
varsocksEstablishedFuture:EventLoopFuture<Void>?{
34+
returnself.socksEstablishedPromise?.futureResult
35+
}
36+
37+
privateletdeadline:NIODeadline
38+
privatevarstate:State=.initialized
39+
40+
init(deadline:NIODeadline){
41+
self.deadline = deadline
42+
}
43+
44+
func handlerAdded(context:ChannelHandlerContext){
45+
self.socksEstablishedPromise = context.eventLoop.makePromise(of:Void.self)
46+
47+
if context.channel.isActive {
48+
self.connectionStarted(context: context)
49+
}
50+
}
51+
52+
func handlerRemoved(context:ChannelHandlerContext){
53+
structNoResult:Error{}
54+
self.socksEstablishedPromise!.fail(NoResult())
55+
}
56+
57+
func channelActive(context:ChannelHandlerContext){
58+
self.connectionStarted(context: context)
59+
}
60+
61+
func userInboundEventTriggered(context:ChannelHandlerContext, event:Any){
62+
guard event is SOCKSProxyEstablishedEventelse{
63+
return context.fireUserInboundEventTriggered(event)
64+
}
65+
66+
switchself.state {
67+
case.initialized:
68+
preconditionFailure("How can we establish a SOCKS connection, if we are not connected?")
69+
case.socksEstablished:
70+
preconditionFailure("`SOCKSProxyEstablishedEvent` must only be fired once.")
71+
case.channelActive(let scheduled):
72+
self.state =.socksEstablished
73+
scheduled.cancel()
74+
self.socksEstablishedPromise?.succeed(())
75+
context.fireUserInboundEventTriggered(event)
76+
case.failed:
77+
// potentially a race with the timeout...
78+
break
79+
}
80+
}
81+
82+
func errorCaught(context:ChannelHandlerContext, error:Error){
83+
switchself.state {
84+
case.initialized:
85+
self.state =.failed(error)
86+
self.socksEstablishedPromise?.fail(error)
87+
case.channelActive(let scheduled):
88+
scheduled.cancel()
89+
self.state =.failed(error)
90+
self.socksEstablishedPromise?.fail(error)
91+
case.socksEstablished,.failed:
92+
break
93+
}
94+
context.fireErrorCaught(error)
95+
}
96+
97+
privatefunc connectionStarted(context:ChannelHandlerContext){
98+
guard case .initialized =self.state else{
99+
return
100+
}
101+
102+
letscheduled= context.eventLoop.scheduleTask(deadline:self.deadline){
103+
switchself.state {
104+
case.initialized,.channelActive:
105+
// close the connection, if the handshake timed out
106+
context.close(mode:.all, promise:nil)
107+
leterror=HTTPClientError.socksHandshakeTimeout
108+
self.state =.failed(error)
109+
self.socksEstablishedPromise?.fail(error)
110+
case.failed,.socksEstablished:
111+
break
112+
}
113+
}
114+
115+
self.state =.channelActive(scheduled)
116+
}
117+
}

0 commit comments

Comments
(0)