From 77809cc051dcd9e0ff096869df0aabf352ecf24b Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 31 Aug 2021 22:58:51 +0200 Subject: [PATCH 1/3] Add a RequestQueue --- .../HTTPConnectionPool+Waiter.swift | 57 --------- .../ConnectionPool/HTTPConnectionPool.swift | 40 ++++++ .../HTTPConnectionPool+RequestQueue.swift | 115 +++++++++++++++++ ...ectionPool+RequestQueueTests+XCTest.swift} | 9 +- ...HTTPConnectionPool+RequestQueueTests.swift | 121 ++++++++++++++++++ .../HTTPConnectionPool+StateTestUtils.swift | 53 ++++++++ .../HTTPConnectionPool+WaiterTests.swift | 99 -------------- Tests/LinuxMain.swift | 2 +- 8 files changed, 334 insertions(+), 162 deletions(-) delete mode 100644 Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Waiter.swift create mode 100644 Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift rename Tests/AsyncHTTPClientTests/{HTTPConnectionPool+WaiterTests+XCTest.swift => HTTPConnectionPool+RequestQueueTests+XCTest.swift} (66%) create mode 100644 Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift create mode 100644 Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift delete mode 100644 Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests.swift diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Waiter.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Waiter.swift deleted file mode 100644 index d21d06d8d..000000000 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Waiter.swift +++ /dev/null @@ -1,57 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import NIOCore - -extension HTTPConnectionPool { - struct RequestID: Hashable { - private let objectIdentifier: ObjectIdentifier - - init(_ request: HTTPSchedulableRequest) { - self.objectIdentifier = ObjectIdentifier(request) - } - } - - struct Waiter { - var requestID: RequestID { - RequestID(self.request) - } - - var request: HTTPSchedulableRequest - - private var eventLoopRequirement: EventLoop? { - switch self.request.eventLoopPreference.preference { - case .delegateAndChannel(on: let eventLoop), - .testOnly_exact(channelOn: let eventLoop, delegateOn: _): - return eventLoop - case .delegate(on: _), - .indifferent: - return nil - } - } - - init(request: HTTPSchedulableRequest) { - self.request = request - } - - func canBeRun(on option: EventLoop) -> Bool { - guard let requirement = self.eventLoopRequirement else { - // if no requirement exists we can run on any EventLoop - return true - } - - return requirement === option - } - } -} diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index 5d15f6b16..aede73f50 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -123,6 +123,46 @@ enum HTTPConnectionPool { } } +extension HTTPConnectionPool { + /// This is a wrapper that we use inside the connection pool state machine to ensure that + /// the actual request can not be accessed at any time. Further it exposes all that is needed within + /// the state machine. A request ID and the `EventLoop` requirement. + struct Request { + struct ID: Hashable { + let objectIdentifier: ObjectIdentifier + let eventLoopID: EventLoopID? + + fileprivate init(_ request: HTTPSchedulableRequest, eventLoopRequirement eventLoopID: EventLoopID?) { + self.objectIdentifier = ObjectIdentifier(request) + self.eventLoopID = eventLoopID + } + } + + fileprivate let req: HTTPSchedulableRequest + + init(_ request: HTTPSchedulableRequest) { + self.req = request + } + + var id: HTTPConnectionPool.Request.ID { + HTTPConnectionPool.Request.ID(self.req, eventLoopRequirement: self.requiredEventLoop?.id) + } + + var requiredEventLoop: EventLoop? { + switch self.req.eventLoopPreference.preference { + case .indifferent, .delegate: + return nil + case .delegateAndChannel(on: let eventLoop), .testOnly_exact(channelOn: let eventLoop, delegateOn: _): + return eventLoop + } + } + + func __testOnly_internal_value() -> HTTPSchedulableRequest { + self.req + } + } +} + struct EventLoopID: Hashable { private var id: Identifier diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift new file mode 100644 index 000000000..a698aa8ea --- /dev/null +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift @@ -0,0 +1,115 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +extension HTTPConnectionPool { + /// A struct to store all queued requests. + struct RequestQueue { + private var generalPurposeQueue: CircularBuffer + private var eventLoopQueues: [EventLoopID: CircularBuffer] + + init() { + self.generalPurposeQueue = CircularBuffer(initialCapacity: 32) + self.eventLoopQueues = [:] + } + + var count: Int { + self.generalPurposeQueue.count + } + + var isEmpty: Bool { + self.generalPurposeQueue.isEmpty + } + + mutating func count(for eventLoop: EventLoop?) -> Int { + if let eventLoop = eventLoop { + return self.withEventLoopQueue(for: eventLoop.id) { $0.count } + } + return self.generalPurposeQueue.count + } + + mutating func isEmpty(for eventLoop: EventLoop?) -> Bool { + if let eventLoop = eventLoop { + return self.withEventLoopQueue(for: eventLoop.id) { $0.isEmpty } + } + return self.generalPurposeQueue.isEmpty + } + + @discardableResult + mutating func push(_ request: Request) -> Request.ID { + if let eventLoop = request.requiredEventLoop { + self.withEventLoopQueue(for: eventLoop.id) { queue in + queue.append(request) + } + } else { + self.generalPurposeQueue.append(request) + } + return request.id + } + + mutating func popFirst(for eventLoop: EventLoop? = nil) -> Request? { + if let eventLoop = eventLoop { + return self.withEventLoopQueue(for: eventLoop.id) { queue in + queue.popFirst() + } + } + return self.generalPurposeQueue.popFirst() + } + + mutating func remove(_ requestID: Request.ID) -> Request? { + if let eventLoopID = requestID.eventLoopID { + return self.withEventLoopQueue(for: eventLoopID) { queue in + guard let index = queue.firstIndex(where: { $0.id == requestID }) else { + return nil + } + return queue.remove(at: index) + } + } else { + if let index = self.generalPurposeQueue.firstIndex(where: { $0.id == requestID }) { + // TBD: This is slow. Do we maybe want something more sophisticated here? + return self.generalPurposeQueue.remove(at: index) + } + return nil + } + } + + mutating func removeAll() -> [Request] { + var result = [Request]() + result = self.eventLoopQueues.reduce(into: result) { partialResult, element in + element.value.forEach { request in + partialResult.append(request) + } + } + + self.generalPurposeQueue.forEach { request in + result.append(request) + } + + self.eventLoopQueues.removeAll() + self.generalPurposeQueue.removeAll() + return result + } + + private mutating func withEventLoopQueue( + for eventLoopID: EventLoopID, + _ closure: (inout CircularBuffer) -> Result + ) -> Result { + if self.eventLoopQueues[eventLoopID] == nil { + self.eventLoopQueues[eventLoopID] = CircularBuffer(initialCapacity: 32) + } + return closure(&self.eventLoopQueues[eventLoopID]!) + } + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests+XCTest.swift similarity index 66% rename from Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests+XCTest.swift rename to Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests+XCTest.swift index 22ac2329d..2511ba267 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests+XCTest.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// // -// HTTPConnectionPool+WaiterTests+XCTest.swift +// HTTPConnectionPool+RequestQueueTests+XCTest.swift // import XCTest @@ -22,11 +22,10 @@ import XCTest /// Do NOT edit this file directly as it will be regenerated automatically when needed. /// -extension HTTPConnectionPool_WaiterTests { - static var allTests: [(String, (HTTPConnectionPool_WaiterTests) -> () throws -> Void)] { +extension HTTPConnectionPool_RequestQueueTests { + static var allTests: [(String, (HTTPConnectionPool_RequestQueueTests) -> () throws -> Void)] { return [ - ("testCanBeRunIfEventLoopIsSpecified", testCanBeRunIfEventLoopIsSpecified), - ("testCanBeRunIfNoEventLoopIsSpecified", testCanBeRunIfNoEventLoopIsSpecified), + ("testCountAndIsEmptyWorks", testCountAndIsEmptyWorks), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift new file mode 100644 index 000000000..9509bdc94 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift @@ -0,0 +1,121 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +import Logging +import NIOCore +import NIOEmbedded +import NIOHTTP1 +import XCTest + +class HTTPConnectionPool_RequestQueueTests: XCTestCase { + func testCountAndIsEmptyWorks() { + var queue = HTTPConnectionPool.RequestQueue() + XCTAssertTrue(queue.isEmpty) + XCTAssertEqual(queue.count, 0) + let req1 = MockScheduledRequest(eventLoopPreference: .indifferent) + let req1ID = queue.push(.init(req1)) + XCTAssertFalse(queue.isEmpty) + XCTAssertFalse(queue.isEmpty(for: nil)) + XCTAssertEqual(queue.count, 1) + XCTAssertEqual(queue.count(for: nil), 1) + + let req2 = MockScheduledRequest(eventLoopPreference: .indifferent) + let req2ID = queue.push(.init(req2)) + XCTAssertEqual(queue.count, 2) + + XCTAssert(queue.popFirst()?.__testOnly_internal_value() === req1) + XCTAssertEqual(queue.count, 1) + XCTAssert(queue.remove(req2ID)?.__testOnly_internal_value() === req2) + XCTAssertNil(queue.remove(req1ID)) + + let eventLoop = EmbeddedEventLoop() + + XCTAssertTrue(queue.isEmpty(for: eventLoop)) + XCTAssertEqual(queue.count(for: eventLoop), 0) + let req3 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop)) + let req3ID = queue.push(.init(req3)) + XCTAssertFalse(queue.isEmpty(for: eventLoop)) + XCTAssertEqual(queue.count(for: eventLoop), 1) + XCTAssert(queue.popFirst(for: eventLoop)?.__testOnly_internal_value() === req3) + XCTAssertNil(queue.remove(req3ID)) + + let req4 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop)) + let req4ID = queue.push(.init(req4)) + XCTAssert(queue.remove(req4ID)?.__testOnly_internal_value() === req4) + + let req5 = MockScheduledRequest(eventLoopPreference: .indifferent) + queue.push(.init(req5)) + let req6 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop)) + queue.push(.init(req6)) + let all = queue.removeAll() + let testSet = all.map { $0.__testOnly_internal_value() } + XCTAssertEqual(testSet.count, 2) + XCTAssertTrue(testSet.contains(where: { $0 === req5 })) + XCTAssertTrue(testSet.contains(where: { $0 === req6 })) + XCTAssertFalse(testSet.contains(where: { $0 === req4 })) + } +} + +private class MockScheduledRequest: HTTPSchedulableRequest { + init(eventLoopPreference: HTTPClient.EventLoopPreference) { + self.eventLoopPreference = eventLoopPreference + } + + var logger: Logger { preconditionFailure("Unimplemented") } + var connectionDeadline: NIODeadline { preconditionFailure("Unimplemented") } + let eventLoopPreference: HTTPClient.EventLoopPreference + + func requestWasQueued(_: HTTPRequestScheduler) { + preconditionFailure("Unimplemented") + } + + func fail(_: Error) { + preconditionFailure("Unimplemented") + } + + // MARK: HTTPExecutableRequest + + var requestHead: HTTPRequestHead { preconditionFailure("Unimplemented") } + var requestFramingMetadata: RequestFramingMetadata { preconditionFailure("Unimplemented") } + var idleReadTimeout: TimeAmount? { preconditionFailure("Unimplemented") } + + func willExecuteRequest(_: HTTPRequestExecutor) { + preconditionFailure("Unimplemented") + } + + func requestHeadSent() { + preconditionFailure("Unimplemented") + } + + func resumeRequestBodyStream() { + preconditionFailure("Unimplemented") + } + + func pauseRequestBodyStream() { + preconditionFailure("Unimplemented") + } + + func receiveResponseHead(_: HTTPResponseHead) { + preconditionFailure("Unimplemented") + } + + func receiveResponseBodyParts(_: CircularBuffer) { + preconditionFailure("Unimplemented") + } + + func succeedRequest(_: CircularBuffer?) { + preconditionFailure("Unimplemented") + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift new file mode 100644 index 000000000..0b1a1b8a4 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift @@ -0,0 +1,53 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch +import NIOConcurrencyHelpers +import NIOCore +import NIOEmbedded + +/// An `EventLoopGroup` of `EmbeddedEventLoop`s. +final class EmbeddedEventLoopGroup: EventLoopGroup { + private let loops: [EmbeddedEventLoop] + private let index = NIOAtomic.makeAtomic(value: 0) + + internal init(loops: Int) { + self.loops = (0.. EventLoop { + let index: Int = self.index.add(1) + return self.loops[index % self.loops.count] + } + + internal func makeIterator() -> EventLoopIterator { + return EventLoopIterator(self.loops) + } + + internal func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + var shutdownError: Error? + + for loop in self.loops { + loop.shutdownGracefully(queue: queue) { error in + if let error = error { + shutdownError = error + } + } + } + + queue.sync { + callback(shutdownError) + } + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests.swift deleted file mode 100644 index 69f43dfb7..000000000 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests.swift +++ /dev/null @@ -1,99 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -@testable import AsyncHTTPClient -import Logging -import NIOCore -import NIOHTTP1 -import NIOPosix -import XCTest - -class HTTPConnectionPool_WaiterTests: XCTestCase { - func testCanBeRunIfEventLoopIsSpecified() { - let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) - - let theRightEL = eventLoopGroup.next() - let theFalseEL = eventLoopGroup.next() - - let mockRequest = MockScheduledRequest(eventLoopPreference: .init(.testOnly_exact(channelOn: theRightEL, delegateOn: theFalseEL))) - - let waiter = HTTPConnectionPool.Waiter(request: mockRequest) - - XCTAssertTrue(waiter.canBeRun(on: theRightEL)) - XCTAssertFalse(waiter.canBeRun(on: theFalseEL)) - } - - func testCanBeRunIfNoEventLoopIsSpecified() { - let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) - - let mockRequest = MockScheduledRequest(eventLoopPreference: .indifferent) - let waiter = HTTPConnectionPool.Waiter(request: mockRequest) - - for el in eventLoopGroup.makeIterator() { - XCTAssertTrue(waiter.canBeRun(on: el)) - } - } -} - -private class MockScheduledRequest: HTTPSchedulableRequest { - init(eventLoopPreference: HTTPClient.EventLoopPreference) { - self.eventLoopPreference = eventLoopPreference - } - - var logger: Logger { preconditionFailure("Unimplemented") } - var connectionDeadline: NIODeadline { preconditionFailure("Unimplemented") } - let eventLoopPreference: HTTPClient.EventLoopPreference - - func requestWasQueued(_: HTTPRequestScheduler) { - preconditionFailure("Unimplemented") - } - - func fail(_: Error) { - preconditionFailure("Unimplemented") - } - - // MARK: HTTPExecutableRequest - - var requestHead: HTTPRequestHead { preconditionFailure("Unimplemented") } - var requestFramingMetadata: RequestFramingMetadata { preconditionFailure("Unimplemented") } - var idleReadTimeout: TimeAmount? { preconditionFailure("Unimplemented") } - - func willExecuteRequest(_: HTTPRequestExecutor) { - preconditionFailure("Unimplemented") - } - - func requestHeadSent() { - preconditionFailure("Unimplemented") - } - - func resumeRequestBodyStream() { - preconditionFailure("Unimplemented") - } - - func pauseRequestBodyStream() { - preconditionFailure("Unimplemented") - } - - func receiveResponseHead(_: HTTPResponseHead) { - preconditionFailure("Unimplemented") - } - - func receiveResponseBodyParts(_: CircularBuffer) { - preconditionFailure("Unimplemented") - } - - func succeedRequest(_: CircularBuffer?) { - preconditionFailure("Unimplemented") - } -} diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index b31e0d29d..8c189d15a 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -41,7 +41,7 @@ import XCTest testCase(HTTPClientSOCKSTests.allTests), testCase(HTTPClientTests.allTests), testCase(HTTPConnectionPool_FactoryTests.allTests), - testCase(HTTPConnectionPool_WaiterTests.allTests), + testCase(HTTPConnectionPool_RequestQueueTests.allTests), testCase(HTTPRequestStateMachineTests.allTests), testCase(LRUCacheTests.allTests), testCase(RequestBagTests.allTests), From cd39b0f0cc6742eb583305a0b51ca551449550d7 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 1 Sep 2021 11:08:18 +0200 Subject: [PATCH 2/3] Code review --- .../ConnectionPool/HTTPConnectionPool.swift | 2 +- .../HTTPConnectionPool+RequestQueue.swift | 60 ++++++++++++------- ...HTTPConnectionPool+RequestQueueTests.swift | 23 +++++-- 3 files changed, 57 insertions(+), 28 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index aede73f50..c9206765c 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -157,7 +157,7 @@ extension HTTPConnectionPool { } } - func __testOnly_internal_value() -> HTTPSchedulableRequest { + func __testOnly_wrapped_request() -> HTTPSchedulableRequest { self.req } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift index a698aa8ea..8c0af1b5e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift @@ -23,32 +23,32 @@ extension HTTPConnectionPool { init() { self.generalPurposeQueue = CircularBuffer(initialCapacity: 32) self.eventLoopQueues = [:] + self.count = 0 } - var count: Int { - self.generalPurposeQueue.count - } + private(set) var count: Int var isEmpty: Bool { - self.generalPurposeQueue.isEmpty + self.count == 0 } - mutating func count(for eventLoop: EventLoop?) -> Int { + func count(for eventLoop: EventLoop?) -> Int { if let eventLoop = eventLoop { - return self.withEventLoopQueue(for: eventLoop.id) { $0.count } + return self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0 } return self.generalPurposeQueue.count } - mutating func isEmpty(for eventLoop: EventLoop?) -> Bool { + func isEmpty(for eventLoop: EventLoop?) -> Bool { if let eventLoop = eventLoop { - return self.withEventLoopQueue(for: eventLoop.id) { $0.isEmpty } + return self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.isEmpty } ?? true } return self.generalPurposeQueue.isEmpty } @discardableResult mutating func push(_ request: Request) -> Request.ID { + self.count += 1 if let eventLoop = request.requiredEventLoop { self.withEventLoopQueue(for: eventLoop.id) { queue in queue.append(request) @@ -60,17 +60,24 @@ extension HTTPConnectionPool { } mutating func popFirst(for eventLoop: EventLoop? = nil) -> Request? { + let request: Request? if let eventLoop = eventLoop { - return self.withEventLoopQueue(for: eventLoop.id) { queue in + request = self.withEventLoopQueue(for: eventLoop.id) { queue in queue.popFirst() } + } else { + request = self.generalPurposeQueue.popFirst() + } + if request != nil { + self.count -= 1 } - return self.generalPurposeQueue.popFirst() + return request } mutating func remove(_ requestID: Request.ID) -> Request? { + let request: Request? if let eventLoopID = requestID.eventLoopID { - return self.withEventLoopQueue(for: eventLoopID) { queue in + request = self.withEventLoopQueue(for: eventLoopID) { queue in guard let index = queue.firstIndex(where: { $0.id == requestID }) else { return nil } @@ -79,26 +86,25 @@ extension HTTPConnectionPool { } else { if let index = self.generalPurposeQueue.firstIndex(where: { $0.id == requestID }) { // TBD: This is slow. Do we maybe want something more sophisticated here? - return self.generalPurposeQueue.remove(at: index) + request = self.generalPurposeQueue.remove(at: index) + } else { + request = nil } - return nil } + if request != nil { + self.count -= 1 + } + return request } mutating func removeAll() -> [Request] { var result = [Request]() - result = self.eventLoopQueues.reduce(into: result) { partialResult, element in - element.value.forEach { request in - partialResult.append(request) - } - } - - self.generalPurposeQueue.forEach { request in - result.append(request) - } + result = self.eventLoopQueues.flatMap { $0.value } + result.append(contentsOf: self.generalPurposeQueue) self.eventLoopQueues.removeAll() self.generalPurposeQueue.removeAll() + self.count = 0 return result } @@ -111,5 +117,15 @@ extension HTTPConnectionPool { } return closure(&self.eventLoopQueues[eventLoopID]!) } + + private func withEventLoopQueueIfAvailable( + for eventLoopID: EventLoopID, + _ closure: (CircularBuffer) -> Result + ) -> Result? { + if self.eventLoopQueues[eventLoopID] != nil { + return closure(self.eventLoopQueues[eventLoopID]!) + } + return nil + } } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift index 9509bdc94..500bcf296 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift @@ -35,10 +35,13 @@ class HTTPConnectionPool_RequestQueueTests: XCTestCase { let req2ID = queue.push(.init(req2)) XCTAssertEqual(queue.count, 2) - XCTAssert(queue.popFirst()?.__testOnly_internal_value() === req1) + XCTAssert(queue.popFirst()?.__testOnly_wrapped_request() === req1) XCTAssertEqual(queue.count, 1) - XCTAssert(queue.remove(req2ID)?.__testOnly_internal_value() === req2) + XCTAssertFalse(queue.isEmpty) + XCTAssert(queue.remove(req2ID)?.__testOnly_wrapped_request() === req2) XCTAssertNil(queue.remove(req1ID)) + XCTAssertEqual(queue.count, 0) + XCTAssertTrue(queue.isEmpty) let eventLoop = EmbeddedEventLoop() @@ -48,23 +51,33 @@ class HTTPConnectionPool_RequestQueueTests: XCTestCase { let req3ID = queue.push(.init(req3)) XCTAssertFalse(queue.isEmpty(for: eventLoop)) XCTAssertEqual(queue.count(for: eventLoop), 1) - XCTAssert(queue.popFirst(for: eventLoop)?.__testOnly_internal_value() === req3) + XCTAssertFalse(queue.isEmpty) + XCTAssertEqual(queue.count, 1) + XCTAssert(queue.popFirst(for: eventLoop)?.__testOnly_wrapped_request() === req3) XCTAssertNil(queue.remove(req3ID)) + XCTAssertTrue(queue.isEmpty(for: eventLoop)) + XCTAssertEqual(queue.count(for: eventLoop), 0) + XCTAssertTrue(queue.isEmpty) + XCTAssertEqual(queue.count, 0) let req4 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop)) let req4ID = queue.push(.init(req4)) - XCTAssert(queue.remove(req4ID)?.__testOnly_internal_value() === req4) + XCTAssert(queue.remove(req4ID)?.__testOnly_wrapped_request() === req4) let req5 = MockScheduledRequest(eventLoopPreference: .indifferent) queue.push(.init(req5)) let req6 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop)) queue.push(.init(req6)) let all = queue.removeAll() - let testSet = all.map { $0.__testOnly_internal_value() } + let testSet = all.map { $0.__testOnly_wrapped_request() } XCTAssertEqual(testSet.count, 2) XCTAssertTrue(testSet.contains(where: { $0 === req5 })) XCTAssertTrue(testSet.contains(where: { $0 === req6 })) XCTAssertFalse(testSet.contains(where: { $0 === req4 })) + XCTAssertTrue(queue.isEmpty(for: eventLoop)) + XCTAssertEqual(queue.count(for: eventLoop), 0) + XCTAssertTrue(queue.isEmpty) + XCTAssertEqual(queue.count, 0) } } From 62b52affb588ac7a6d1ba6982f8c8aca49306370 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 1 Sep 2021 13:05:21 +0200 Subject: [PATCH 3/3] Core review 2 --- .../HTTPConnectionPool+RequestQueue.swift | 32 ++++++------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift index 8c0af1b5e..ca7af66fb 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift @@ -23,10 +23,11 @@ extension HTTPConnectionPool { init() { self.generalPurposeQueue = CircularBuffer(initialCapacity: 32) self.eventLoopQueues = [:] - self.count = 0 } - private(set) var count: Int + var count: Int { + self.generalPurposeQueue.count + self.eventLoopQueues.reduce(0) { $0 + $1.value.count } + } var isEmpty: Bool { self.count == 0 @@ -48,7 +49,6 @@ extension HTTPConnectionPool { @discardableResult mutating func push(_ request: Request) -> Request.ID { - self.count += 1 if let eventLoop = request.requiredEventLoop { self.withEventLoopQueue(for: eventLoop.id) { queue in queue.append(request) @@ -60,24 +60,18 @@ extension HTTPConnectionPool { } mutating func popFirst(for eventLoop: EventLoop? = nil) -> Request? { - let request: Request? if let eventLoop = eventLoop { - request = self.withEventLoopQueue(for: eventLoop.id) { queue in + return self.withEventLoopQueue(for: eventLoop.id) { queue in queue.popFirst() } } else { - request = self.generalPurposeQueue.popFirst() - } - if request != nil { - self.count -= 1 + return self.generalPurposeQueue.popFirst() } - return request } mutating func remove(_ requestID: Request.ID) -> Request? { - let request: Request? if let eventLoopID = requestID.eventLoopID { - request = self.withEventLoopQueue(for: eventLoopID) { queue in + return self.withEventLoopQueue(for: eventLoopID) { queue in guard let index = queue.firstIndex(where: { $0.id == requestID }) else { return nil } @@ -86,15 +80,10 @@ extension HTTPConnectionPool { } else { if let index = self.generalPurposeQueue.firstIndex(where: { $0.id == requestID }) { // TBD: This is slow. Do we maybe want something more sophisticated here? - request = self.generalPurposeQueue.remove(at: index) - } else { - request = nil + return self.generalPurposeQueue.remove(at: index) } + return nil } - if request != nil { - self.count -= 1 - } - return request } mutating func removeAll() -> [Request] { @@ -104,7 +93,6 @@ extension HTTPConnectionPool { self.eventLoopQueues.removeAll() self.generalPurposeQueue.removeAll() - self.count = 0 return result } @@ -122,8 +110,8 @@ extension HTTPConnectionPool { for eventLoopID: EventLoopID, _ closure: (CircularBuffer) -> Result ) -> Result? { - if self.eventLoopQueues[eventLoopID] != nil { - return closure(self.eventLoopQueues[eventLoopID]!) + if let queue = self.eventLoopQueues[eventLoopID] { + return closure(queue) } return nil }