diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift index c0c22bfee..a25c92e80 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift @@ -105,26 +105,43 @@ final class Transaction: struct BreakTheWriteLoopError: Swift.Error {} - // FIXME: Refactor this to not use `self.state.unsafe`. private func writeRequestBodyPart(_ part: ByteBuffer) async throws { - self.state.unsafe.lock() - switch self.state.unsafe.withValueAssumingLockIsAcquired({ state in state.writeNextRequestPart() }) { + let action = self.state.withLockedValue { state in + state.writeNextRequestPart() + } + + switch action { case .writeAndContinue(let executor): - self.state.unsafe.unlock() executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil) - - case .writeAndWait(let executor): + case .writeAndWait: + // Holding the lock here *should* be safe but because of a bug in the runtime + // it isn't, so drop the lock, create the continuation and try again. + // + // See https://github.com/swiftlang/swift/issues/85668 try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self.state.unsafe.withValueAssumingLockIsAcquired({ state in - state.waitForRequestBodyDemand(continuation: continuation) - }) - self.state.unsafe.unlock() + let action = self.state.withLockedValue { state in + // Check that nothing has changed between dropping and re-acquiring the lock. + let action = state.writeNextRequestPart() + switch action { + case .writeAndContinue, .fail: + () + case .writeAndWait: + state.waitForRequestBodyDemand(continuation: continuation) + } + return action + } - executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil) + switch action { + case .writeAndContinue(let executor): + executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil) + continuation.resume() + case .writeAndWait(let executor): + executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil) + case .fail: + continuation.resume(throwing: BreakTheWriteLoopError()) + } } - case .fail: - self.state.unsafe.unlock() throw BreakTheWriteLoopError() } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift index 87dcc03b6..3cdf51869 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -166,10 +166,22 @@ extension HTTPConnectionPool { } } - mutating func fail() { + enum FailAction { + case removeConnection + case none + } + + mutating func fail() -> FailAction { switch self.state { - case .starting, .backingOff, .idle, .leased: + case .starting: + // If the connection fails while we are starting it, the fail call raced with + // `failedToConnect` (promises are succeeded or failed before channel handler + // callbacks). let's keep the state in `starting`, so that `failedToConnect` can + // create a backoff timer. + return .none + case .backingOff, .idle, .leased: self.state = .closed + return .removeConnection case .closed: preconditionFailure("Invalid state: \(self.state)") } @@ -559,23 +571,28 @@ extension HTTPConnectionPool { } let use: ConnectionUse - self.connections[index].fail() - let eventLoop = self.connections[index].eventLoop - let starting: Int - if index < self.overflowIndex { - use = .generalPurpose - starting = self.startingGeneralPurposeConnections - } else { - use = .eventLoop(eventLoop) - starting = self.startingEventLoopConnections(on: eventLoop) - } + switch self.connections[index].fail() { + case .removeConnection: + let eventLoop = self.connections[index].eventLoop + let starting: Int + if index < self.overflowIndex { + use = .generalPurpose + starting = self.startingGeneralPurposeConnections + } else { + use = .eventLoop(eventLoop) + starting = self.startingEventLoopConnections(on: eventLoop) + } - let context = FailedConnectionContext( - eventLoop: eventLoop, - use: use, - connectionsStartingForUseCase: starting - ) - return (index, context) + let context = FailedConnectionContext( + eventLoop: eventLoop, + use: use, + connectionsStartingForUseCase: starting + ) + return (index, context) + + case .none: + return nil + } } // MARK: Migration diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index 34c8027e9..395064377 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -250,6 +250,11 @@ extension HTTPConnectionPool { self.failedConsecutiveConnectionAttempts += 1 self.lastConnectFailure = error + // We don't care how many waiting requests we have at this point, we will schedule a + // retry. More tasks, may appear until the backoff has completed. The final + // decision about the retry will be made in `connectionCreationBackoffDone(_:)` + let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID) + switch self.lifecycleState { case .running: guard self.retryConnectionEstablishment else { @@ -265,10 +270,6 @@ extension HTTPConnectionPool { connection: .none ) } - // We don't care how many waiting requests we have at this point, we will schedule a - // retry. More tasks, may appear until the backoff has completed. The final - // decision about the retry will be made in `connectionCreationBackoffDone(_:)` - let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID) let backoff = calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts) return .init( diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index dbb6b2d30..2a0e0cc80 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -187,10 +187,22 @@ extension HTTPConnectionPool { } } - mutating func fail() { + enum FailAction { + case removeConnection + case none + } + + mutating func fail() -> FailAction { switch self.state { - case .starting, .active, .backingOff, .draining: + case .starting: + // If the connection fails while we are starting it, the fail call raced with + // `failedToConnect` (promises are succeeded or failed before channel handler + // callbacks). let's keep the state in `starting`, so that `failedToConnect` can + // create a backoff timer. + return .none + case .active, .backingOff, .draining: self.state = .closed + return .removeConnection case .closed: preconditionFailure("Invalid state: \(self.state)") } @@ -749,10 +761,16 @@ extension HTTPConnectionPool { // must ignore the event. return nil } - self.connections[index].fail() - let eventLoop = self.connections[index].eventLoop - let context = FailedConnectionContext(eventLoop: eventLoop) - return (index, context) + + switch self.connections[index].fail() { + case .none: + return nil + + case .removeConnection: + let eventLoop = self.connections[index].eventLoop + let context = FailedConnectionContext(eventLoop: eventLoop) + return (index, context) + } } mutating func shutdown() -> CleanupContext { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 2372cab4b..67a07e6dd 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -226,20 +226,18 @@ extension HTTPConnectionPool { ) -> EstablishedAction { self.failedConsecutiveConnectionAttempts = 0 self.lastConnectFailure = nil - if self.connections.hasActiveConnection(for: connection.eventLoop) { - guard let (index, _) = self.connections.failConnection(connection.id) else { - preconditionFailure("we have established a new connection that we know nothing about?") - } - self.connections.removeConnection(at: index) + let doesConnectionExistsForEL = self.connections.hasActiveConnection(for: connection.eventLoop) + let (index, context) = self.connections.newHTTP2ConnectionEstablished( + connection, + maxConcurrentStreams: maxConcurrentStreams + ) + if doesConnectionExistsForEL { + let connection = self.connections.closeConnection(at: index) return .init( request: .none, connection: .closeConnection(connection, isShutdown: .no) ) } else { - let (index, context) = self.connections.newHTTP2ConnectionEstablished( - connection, - maxConcurrentStreams: maxConcurrentStreams - ) return self.nextActionForAvailableConnection(at: index, context: context) } } @@ -424,6 +422,8 @@ extension HTTPConnectionPool { self.failedConsecutiveConnectionAttempts += 1 self.lastConnectFailure = error + let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID) + switch self.lifecycleState { case .running: guard self.retryConnectionEstablishment else { @@ -440,7 +440,6 @@ extension HTTPConnectionPool { ) } - let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID) let backoff = calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts) return .init( request: .none, diff --git a/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift b/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift index 4a5c8d486..5e063be81 100644 --- a/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift +++ b/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift @@ -43,12 +43,11 @@ final class AsyncSequenceWriter: AsyncSequence, @unchecked Se case failed(Error, CheckedContinuation?) } - private var _state = State.buffering(.init(), nil) - private let lock = NIOLock() + private let state = NIOLockedValueBox(.buffering([], nil)) public var hasDemand: Bool { - self.lock.withLock { - switch self._state { + self.state.withLockedValue { state in + switch state { case .failed, .finished, .buffering: return false case .waiting: @@ -59,67 +58,132 @@ final class AsyncSequenceWriter: AsyncSequence, @unchecked Se /// Wait until a downstream consumer has issued more demand by calling `next`. public func demand() async { - self.lock.lock() + let shouldBuffer = self.state.withLockedValue { state in + switch state { + case .buffering(_, .none): + return true + case .waiting: + return false + case .buffering(_, .some), .failed(_, .some): + preconditionFailure("Already waiting for demand. Invalid state: \(state)") + case .finished, .failed: + preconditionFailure("Invalid state: \(state)") + } + } - switch self._state { - case .buffering(let buffer, .none): + if shouldBuffer { await withCheckedContinuation { (continuation: CheckedContinuation) in - self._state = .buffering(buffer, continuation) - self.lock.unlock() + let shouldResumeContinuation = self.state.withLockedValue { state in + switch state { + case .buffering(let buffer, .none): + state = .buffering(buffer, continuation) + return false + case .waiting: + return true + case .buffering(_, .some), .failed(_, .some): + preconditionFailure("Already waiting for demand. Invalid state: \(state)") + case .finished, .failed: + preconditionFailure("Invalid state: \(state)") + } + } + + if shouldResumeContinuation { + continuation.resume() + } } - - case .waiting: - self.lock.unlock() - return - - case .buffering(_, .some), .failed(_, .some): - let state = self._state - self.lock.unlock() - preconditionFailure("Already waiting for demand. Invalid state: \(state)") - - case .finished, .failed: - let state = self._state - self.lock.unlock() - preconditionFailure("Invalid state: \(state)") } } + private enum NextAction { + /// Resume the continuation if present, and return the result if present. + case resumeAndReturn(CheckedContinuation?, Result?) + /// Suspend the current task and wait for the next value. + case suspend + } + private func next() async throws -> Element? { - self.lock.lock() - switch self._state { - case .buffering(let buffer, let demandContinuation) where buffer.isEmpty: - return try await withCheckedThrowingContinuation { continuation in - self._state = .waiting(continuation) - self.lock.unlock() - demandContinuation?.resume(returning: ()) - } + let action: NextAction = self.state.withLockedValue { state in + switch state { + case .buffering(var buffer, let demandContinuation): + if buffer.isEmpty { + return .suspend + } else { + let first = buffer.removeFirst() + if first != nil { + state = .buffering(buffer, demandContinuation) + } else { + state = .finished + } + return .resumeAndReturn(nil, .success(first)) + } + + case .failed(let error, let demandContinuation): + state = .finished + return .resumeAndReturn(demandContinuation, .failure(error)) + + case .finished: + return .resumeAndReturn(nil, .success(nil)) - case .buffering(var buffer, let demandContinuation): - let first = buffer.removeFirst() - if first != nil { - self._state = .buffering(buffer, demandContinuation) - } else { - self._state = .finished + case .waiting: + preconditionFailure( + "Expected that there is always only one concurrent call to next. Invalid state: \(state)" + ) } - self.lock.unlock() - return first + } - case .failed(let error, let demandContinuation): - self._state = .finished - self.lock.unlock() + switch action { + case .resumeAndReturn(let demandContinuation, let result): demandContinuation?.resume() - throw error - - case .finished: - self.lock.unlock() - return nil - - case .waiting: - let state = self._state - self.lock.unlock() - preconditionFailure( - "Expected that there is always only one concurrent call to next. Invalid state: \(state)" - ) + return try result?.get() + + case .suspend: + // Holding the lock here *should* be safe but because of a bug in the runtime + // it isn't, so drop the lock, create the continuation and then try again. + // + // See https://github.com/swiftlang/swift/issues/85668 + return try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation) in + let action: NextAction = self.state.withLockedValue { state in + switch state { + case .buffering(var buffer, let demandContinuation): + if buffer.isEmpty { + state = .waiting(continuation) + return .resumeAndReturn(demandContinuation, nil) + } else { + let first = buffer.removeFirst() + if first != nil { + state = .buffering(buffer, demandContinuation) + } else { + state = .finished + } + return .resumeAndReturn(nil, .success(first)) + } + + case .failed(let error, let demandContinuation): + state = .finished + return .resumeAndReturn(demandContinuation, .failure(error)) + + case .finished: + return .resumeAndReturn(nil, .success(nil)) + + case .waiting: + preconditionFailure( + "Expected that there is always only one concurrent call to next. Invalid state: \(state)" + ) + } + } + + switch action { + case .resumeAndReturn(let demandContinuation, let result): + demandContinuation?.resume() + // Resume the continuation rather than returning th result. + if let result { + continuation.resume(with: result) + } + case .suspend: + preconditionFailure() // Not returned from the code above. + } + } } } @@ -137,19 +201,19 @@ final class AsyncSequenceWriter: AsyncSequence, @unchecked Se } private func writeBufferOrEnd(_ element: Element?) { - let writeAction = self.lock.withLock { () -> WriteAction in - switch self._state { + let writeAction = self.state.withLockedValue { state -> WriteAction in + switch state { case .buffering(var buffer, let continuation): buffer.append(element) - self._state = .buffering(buffer, continuation) + state = .buffering(buffer, continuation) return .none case .waiting(let continuation): - self._state = .buffering(.init(), nil) + state = .buffering(.init(), nil) return .succeedContinuation(continuation, element) case .finished, .failed: - preconditionFailure("Invalid state: \(self._state)") + preconditionFailure("Invalid state: \(state)") } } @@ -170,17 +234,17 @@ final class AsyncSequenceWriter: AsyncSequence, @unchecked Se /// Drops all buffered writes and emits an error on the waiting `next`. If there is no call to `next` /// waiting, will emit the error on the next call to `next`. public func fail(_ error: Error) { - let errorAction = self.lock.withLock { () -> ErrorAction in - switch self._state { + let errorAction = self.state.withLockedValue { state -> ErrorAction in + switch state { case .buffering(_, let demandContinuation): - self._state = .failed(error, demandContinuation) + state = .failed(error, demandContinuation) return .none case .failed, .finished: return .none case .waiting(let continuation): - self._state = .finished + state = .finished return .failContinuation(continuation, error) } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift index f225307ea..89f3bf7b5 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift @@ -384,6 +384,8 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase { XCTAssertEqual(connections.closeConnection(at: releaseIndex), lease) XCTAssertFalse(connections.isEmpty) + let backoffEL = connections.backoffNextConnectionAttempt(startingID) + XCTAssertIdentical(backoffEL, el2) guard let (failIndex, _) = connections.failConnection(startingID) else { return XCTFail("Expected that the connection is remembered") } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift index 8dd59baaf..9146f0593 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1StateTests.swift @@ -1493,4 +1493,48 @@ class HTTPConnectionPool_HTTP1StateMachineTests: XCTestCase { // We won't bother doing it though, it's enough that it asked. } + + func testFailConnectionRacesAgainstConnectionCreationFailed() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.StateMachine( + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 2, + retryConnectionEstablishment: true, + preferHTTP1: true, + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 + ) + + let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + + let executeAction = state.executeRequest(request) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), executeAction.request) + + // 1. connection attempt + guard case .createConnection(let connectionID, on: let connectionEL) = executeAction.connection else { + return XCTFail("Unexpected connection action: \(executeAction.connection)") + } + XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux + + // 2. connection fails – first with closed callback + + XCTAssertEqual(state.http1ConnectionClosed(connectionID), .none) + + // 3. connection fails – with make connection callback + + let action = state.failedToCreateNewConnection( + IOError(errnoCode: -1, reason: "Test failure"), + connectionID: connectionID + ) + XCTAssertEqual(action.request, .none) + guard case .scheduleBackoffTimer(connectionID, _, on: let backoffTimerEL) = action.connection else { + XCTFail("Unexpected connection action: \(action.connection)") + return + } + XCTAssertIdentical(connectionEL, backoffTimerEL) + + } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift index dd56a9102..dbfe90ff9 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift @@ -331,6 +331,8 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { XCTAssertEqual(connections.closeConnection(at: releaseIndex), leasedConn) XCTAssertFalse(connections.isEmpty) + let backoffEL = connections.backoffNextConnectionAttempt(startingID) + XCTAssertIdentical(el6, backoffEL) guard let (failIndex, _) = connections.failConnection(startingID) else { return XCTFail("Expected that the connection is remembered") } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index d59dae796..8fead4f4d 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -1527,6 +1527,50 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(state.http2ConnectionClosed(connection.id), .none) } + + func testFailConnectionRacesAgainstConnectionCreationFailed() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.StateMachine( + idGenerator: .init(), + maximumConcurrentHTTP1Connections: 2, + retryConnectionEstablishment: true, + preferHTTP1: false, + maximumConnectionUses: nil, + preWarmedHTTP1ConnectionCount: 0 + ) + + let mockRequest = MockHTTPScheduableRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + + let executeAction = state.executeRequest(request) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), executeAction.request) + + // 1. connection attempt + guard case .createConnection(let connectionID, on: let connectionEL) = executeAction.connection else { + return XCTFail("Unexpected connection action: \(executeAction.connection)") + } + XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux + + // 2. connection fails – first with closed callback + + XCTAssertEqual(state.http2ConnectionClosed(connectionID), .none) + + // 3. connection fails – with make connection callback + + let action = state.failedToCreateNewConnection( + IOError(errnoCode: -1, reason: "Test failure"), + connectionID: connectionID + ) + XCTAssertEqual(action.request, .none) + guard case .scheduleBackoffTimer(connectionID, _, on: let backoffTimerEL) = action.connection else { + XCTFail("Unexpected connection action: \(action.connection)") + return + } + XCTAssertIdentical(connectionEL, backoffTimerEL) + } + } /// Should be used if you have a value of statically unknown type and want to compare its value to another `Equatable` value.