Skip to content

Commit 1f5b633

Browse files
authored
Add a RequestQueue for the ConnectionPool (#412)
1 parent 6af7c8c commit 1f5b633

File tree

8 files changed

+351
-162
lines changed

8 files changed

+351
-162
lines changed

‎Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Waiter.swift‎

Lines changed: 0 additions & 57 deletions
This file was deleted.

‎Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift‎

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,46 @@ enum HTTPConnectionPool{
123123
}
124124
}
125125

126+
extensionHTTPConnectionPool{
127+
/// This is a wrapper that we use inside the connection pool state machine to ensure that
128+
/// the actual request can not be accessed at any time. Further it exposes all that is needed within
129+
/// the state machine. A request ID and the `EventLoop` requirement.
130+
structRequest{
131+
structID:Hashable{
132+
letobjectIdentifier:ObjectIdentifier
133+
leteventLoopID:EventLoopID?
134+
135+
fileprivateinit(_ request:HTTPSchedulableRequest, eventLoopRequirement eventLoopID:EventLoopID?){
136+
self.objectIdentifier =ObjectIdentifier(request)
137+
self.eventLoopID = eventLoopID
138+
}
139+
}
140+
141+
fileprivateletreq:HTTPSchedulableRequest
142+
143+
init(_ request:HTTPSchedulableRequest){
144+
self.req = request
145+
}
146+
147+
varid:HTTPConnectionPool.Request.ID{
148+
HTTPConnectionPool.Request.ID(self.req, eventLoopRequirement:self.requiredEventLoop?.id)
149+
}
150+
151+
varrequiredEventLoop:EventLoop?{
152+
switchself.req.eventLoopPreference.preference {
153+
case.indifferent,.delegate:
154+
returnnil
155+
case.delegateAndChannel(on:let eventLoop),.testOnly_exact(channelOn:let eventLoop, delegateOn: _):
156+
return eventLoop
157+
}
158+
}
159+
160+
func __testOnly_wrapped_request()->HTTPSchedulableRequest{
161+
self.req
162+
}
163+
}
164+
}
165+
126166
structEventLoopID:Hashable{
127167
privatevarid:Identifier
128168

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
extensionHTTPConnectionPool{
18+
/// A struct to store all queued requests.
19+
structRequestQueue{
20+
privatevargeneralPurposeQueue:CircularBuffer<Request>
21+
privatevareventLoopQueues:[EventLoopID:CircularBuffer<Request>]
22+
23+
init(){
24+
self.generalPurposeQueue =CircularBuffer(initialCapacity:32)
25+
self.eventLoopQueues =[:]
26+
}
27+
28+
varcount:Int{
29+
self.generalPurposeQueue.count +self.eventLoopQueues.reduce(0){ $0 + $1.value.count }
30+
}
31+
32+
varisEmpty:Bool{
33+
self.count ==0
34+
}
35+
36+
func count(for eventLoop:EventLoop?)->Int{
37+
iflet eventLoop = eventLoop {
38+
returnself.withEventLoopQueueIfAvailable(for: eventLoop.id){ $0.count }??0
39+
}
40+
returnself.generalPurposeQueue.count
41+
}
42+
43+
func isEmpty(for eventLoop:EventLoop?)->Bool{
44+
iflet eventLoop = eventLoop {
45+
returnself.withEventLoopQueueIfAvailable(for: eventLoop.id){ $0.isEmpty }??true
46+
}
47+
returnself.generalPurposeQueue.isEmpty
48+
}
49+
50+
@discardableResult
51+
mutatingfunc push(_ request:Request)->Request.ID{
52+
iflet eventLoop = request.requiredEventLoop {
53+
self.withEventLoopQueue(for: eventLoop.id){ queue in
54+
queue.append(request)
55+
}
56+
}else{
57+
self.generalPurposeQueue.append(request)
58+
}
59+
return request.id
60+
}
61+
62+
mutatingfunc popFirst(for eventLoop:EventLoop?=nil)->Request?{
63+
iflet eventLoop = eventLoop {
64+
returnself.withEventLoopQueue(for: eventLoop.id){ queue in
65+
queue.popFirst()
66+
}
67+
}else{
68+
returnself.generalPurposeQueue.popFirst()
69+
}
70+
}
71+
72+
mutatingfunc remove(_ requestID:Request.ID)->Request?{
73+
iflet eventLoopID = requestID.eventLoopID {
74+
returnself.withEventLoopQueue(for: eventLoopID){ queue in
75+
guardlet index = queue.firstIndex(where:{ $0.id == requestID })else{
76+
returnnil
77+
}
78+
return queue.remove(at: index)
79+
}
80+
}else{
81+
iflet index =self.generalPurposeQueue.firstIndex(where:{ $0.id == requestID }){
82+
// TBD: This is slow. Do we maybe want something more sophisticated here?
83+
returnself.generalPurposeQueue.remove(at: index)
84+
}
85+
returnnil
86+
}
87+
}
88+
89+
mutatingfunc removeAll()->[Request]{
90+
varresult=[Request]()
91+
result =self.eventLoopQueues.flatMap{ $0.value }
92+
result.append(contentsOf:self.generalPurposeQueue)
93+
94+
self.eventLoopQueues.removeAll()
95+
self.generalPurposeQueue.removeAll()
96+
return result
97+
}
98+
99+
privatemutatingfunc withEventLoopQueue<Result>(
100+
for eventLoopID:EventLoopID,
101+
_ closure:(inoutCircularBuffer<Request>)->Result
102+
)->Result{
103+
ifself.eventLoopQueues[eventLoopID]==nil{
104+
self.eventLoopQueues[eventLoopID]=CircularBuffer(initialCapacity:32)
105+
}
106+
returnclosure(&self.eventLoopQueues[eventLoopID]!)
107+
}
108+
109+
privatefunc withEventLoopQueueIfAvailable<Result>(
110+
for eventLoopID:EventLoopID,
111+
_ closure:(CircularBuffer<Request>)->Result
112+
)->Result?{
113+
iflet queue =self.eventLoopQueues[eventLoopID]{
114+
returnclosure(queue)
115+
}
116+
returnnil
117+
}
118+
}
119+
}

Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests+XCTest.swift renamed to Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests+XCTest.swift

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414
//
15-
// HTTPConnectionPool+WaiterTests+XCTest.swift
15+
// HTTPConnectionPool+RequestQueueTests+XCTest.swift
1616
//
1717
import XCTest
1818

@@ -22,11 +22,10 @@ import XCTest
2222
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
2323
///
2424

25-
extensionHTTPConnectionPool_WaiterTests{
26-
staticvarallTests:[(String,(HTTPConnectionPool_WaiterTests)->()throws->Void)]{
25+
extensionHTTPConnectionPool_RequestQueueTests{
26+
staticvarallTests:[(String,(HTTPConnectionPool_RequestQueueTests)->()throws->Void)]{
2727
return[
28-
("testCanBeRunIfEventLoopIsSpecified", testCanBeRunIfEventLoopIsSpecified),
29-
("testCanBeRunIfNoEventLoopIsSpecified", testCanBeRunIfNoEventLoopIsSpecified),
28+
("testCountAndIsEmptyWorks", testCountAndIsEmptyWorks),
3029
]
3130
}
3231
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testableimport AsyncHTTPClient
16+
import Logging
17+
import NIOCore
18+
import NIOEmbedded
19+
import NIOHTTP1
20+
import XCTest
21+
22+
classHTTPConnectionPool_RequestQueueTests:XCTestCase{
23+
func testCountAndIsEmptyWorks(){
24+
varqueue=HTTPConnectionPool.RequestQueue()
25+
XCTAssertTrue(queue.isEmpty)
26+
XCTAssertEqual(queue.count,0)
27+
letreq1=MockScheduledRequest(eventLoopPreference:.indifferent)
28+
letreq1ID= queue.push(.init(req1))
29+
XCTAssertFalse(queue.isEmpty)
30+
XCTAssertFalse(queue.isEmpty(for:nil))
31+
XCTAssertEqual(queue.count,1)
32+
XCTAssertEqual(queue.count(for:nil),1)
33+
34+
letreq2=MockScheduledRequest(eventLoopPreference:.indifferent)
35+
letreq2ID= queue.push(.init(req2))
36+
XCTAssertEqual(queue.count,2)
37+
38+
XCTAssert(queue.popFirst()?.__testOnly_wrapped_request()=== req1)
39+
XCTAssertEqual(queue.count,1)
40+
XCTAssertFalse(queue.isEmpty)
41+
XCTAssert(queue.remove(req2ID)?.__testOnly_wrapped_request()=== req2)
42+
XCTAssertNil(queue.remove(req1ID))
43+
XCTAssertEqual(queue.count,0)
44+
XCTAssertTrue(queue.isEmpty)
45+
46+
leteventLoop=EmbeddedEventLoop()
47+
48+
XCTAssertTrue(queue.isEmpty(for: eventLoop))
49+
XCTAssertEqual(queue.count(for: eventLoop),0)
50+
letreq3=MockScheduledRequest(eventLoopPreference:.delegateAndChannel(on: eventLoop))
51+
letreq3ID= queue.push(.init(req3))
52+
XCTAssertFalse(queue.isEmpty(for: eventLoop))
53+
XCTAssertEqual(queue.count(for: eventLoop),1)
54+
XCTAssertFalse(queue.isEmpty)
55+
XCTAssertEqual(queue.count,1)
56+
XCTAssert(queue.popFirst(for: eventLoop)?.__testOnly_wrapped_request()=== req3)
57+
XCTAssertNil(queue.remove(req3ID))
58+
XCTAssertTrue(queue.isEmpty(for: eventLoop))
59+
XCTAssertEqual(queue.count(for: eventLoop),0)
60+
XCTAssertTrue(queue.isEmpty)
61+
XCTAssertEqual(queue.count,0)
62+
63+
letreq4=MockScheduledRequest(eventLoopPreference:.delegateAndChannel(on: eventLoop))
64+
letreq4ID= queue.push(.init(req4))
65+
XCTAssert(queue.remove(req4ID)?.__testOnly_wrapped_request()=== req4)
66+
67+
letreq5=MockScheduledRequest(eventLoopPreference:.indifferent)
68+
queue.push(.init(req5))
69+
letreq6=MockScheduledRequest(eventLoopPreference:.delegateAndChannel(on: eventLoop))
70+
queue.push(.init(req6))
71+
letall= queue.removeAll()
72+
lettestSet= all.map{ $0.__testOnly_wrapped_request()}
73+
XCTAssertEqual(testSet.count,2)
74+
XCTAssertTrue(testSet.contains(where:{ $0 === req5 }))
75+
XCTAssertTrue(testSet.contains(where:{ $0 === req6 }))
76+
XCTAssertFalse(testSet.contains(where:{ $0 === req4 }))
77+
XCTAssertTrue(queue.isEmpty(for: eventLoop))
78+
XCTAssertEqual(queue.count(for: eventLoop),0)
79+
XCTAssertTrue(queue.isEmpty)
80+
XCTAssertEqual(queue.count,0)
81+
}
82+
}
83+
84+
privateclassMockScheduledRequest:HTTPSchedulableRequest{
85+
init(eventLoopPreference:HTTPClient.EventLoopPreference){
86+
self.eventLoopPreference = eventLoopPreference
87+
}
88+
89+
varlogger:Logger{preconditionFailure("Unimplemented")}
90+
varconnectionDeadline:NIODeadline{preconditionFailure("Unimplemented")}
91+
leteventLoopPreference:HTTPClient.EventLoopPreference
92+
93+
func requestWasQueued(_:HTTPRequestScheduler){
94+
preconditionFailure("Unimplemented")
95+
}
96+
97+
func fail(_:Error){
98+
preconditionFailure("Unimplemented")
99+
}
100+
101+
// MARK: HTTPExecutableRequest
102+
103+
varrequestHead:HTTPRequestHead{preconditionFailure("Unimplemented")}
104+
varrequestFramingMetadata:RequestFramingMetadata{preconditionFailure("Unimplemented")}
105+
varidleReadTimeout:TimeAmount?{preconditionFailure("Unimplemented")}
106+
107+
func willExecuteRequest(_:HTTPRequestExecutor){
108+
preconditionFailure("Unimplemented")
109+
}
110+
111+
func requestHeadSent(){
112+
preconditionFailure("Unimplemented")
113+
}
114+
115+
func resumeRequestBodyStream(){
116+
preconditionFailure("Unimplemented")
117+
}
118+
119+
func pauseRequestBodyStream(){
120+
preconditionFailure("Unimplemented")
121+
}
122+
123+
func receiveResponseHead(_:HTTPResponseHead){
124+
preconditionFailure("Unimplemented")
125+
}
126+
127+
func receiveResponseBodyParts(_:CircularBuffer<ByteBuffer>){
128+
preconditionFailure("Unimplemented")
129+
}
130+
131+
func succeedRequest(_:CircularBuffer<ByteBuffer>?){
132+
preconditionFailure("Unimplemented")
133+
}
134+
}

0 commit comments

Comments
(0)