Skip to content

Commit c464bf9

Browse files
authored
Don't hold a lock over a continuation in test helpers (#872)
Motivation: The various 'withMumbleContinuation' APIs are supposed to be invoked synchronously with the caller. This assumption allows a lock to be acquired before the call and released from the body of the 'withMumbleContinuation' after e.g. storing the continuation. However this isn't the case and the job may be re-enqueued on the executor meaning that this is pattern is vulnerable to deadlocks. Modifications: - Rework the test helpers to avoid holding a lock when a continuation is created. - Switch to using NIOLockedValue box Result: Lower chance of deadlock
1 parent 3c45dbd commit c464bf9

File tree

1 file changed

+127
-63
lines changed

1 file changed

+127
-63
lines changed

‎Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift‎

Lines changed: 127 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,11 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
4343
case failed(Error,CheckedContinuation<Void,Never>?)
4444
}
4545

46-
privatevar_state=State.buffering(.init(),nil)
47-
privateletlock=NIOLock()
46+
privateletstate=NIOLockedValueBox<State>(.buffering([],nil))
4847

4948
publicvarhasDemand:Bool{
50-
self.lock.withLock{
51-
switchself._state{
49+
self.state.withLockedValue{ state in
50+
switchstate{
5251
case.failed,.finished,.buffering:
5352
returnfalse
5453
case.waiting:
@@ -59,67 +58,132 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
5958

6059
/// Wait until a downstream consumer has issued more demand by calling `next`.
6160
publicfunc demand()async{
62-
self.lock.lock()
61+
letshouldBuffer=self.state.withLockedValue{ state in
62+
switch state {
63+
case.buffering(_,.none):
64+
returntrue
65+
case.waiting:
66+
returnfalse
67+
case.buffering(_,.some),.failed(_,.some):
68+
preconditionFailure("Already waiting for demand. Invalid state: \(state)")
69+
case.finished,.failed:
70+
preconditionFailure("Invalid state: \(state)")
71+
}
72+
}
6373

64-
switchself._state {
65-
case.buffering(let buffer,.none):
74+
if shouldBuffer {
6675
awaitwithCheckedContinuation{(continuation:CheckedContinuation<Void,Never>)in
67-
self._state =.buffering(buffer, continuation)
68-
self.lock.unlock()
76+
letshouldResumeContinuation=self.state.withLockedValue{ state in
77+
switch state {
78+
case.buffering(let buffer,.none):
79+
state =.buffering(buffer, continuation)
80+
returnfalse
81+
case.waiting:
82+
returntrue
83+
case.buffering(_,.some),.failed(_,.some):
84+
preconditionFailure("Already waiting for demand. Invalid state: \(state)")
85+
case.finished,.failed:
86+
preconditionFailure("Invalid state: \(state)")
87+
}
88+
}
89+
90+
if shouldResumeContinuation {
91+
continuation.resume()
92+
}
6993
}
70-
71-
case.waiting:
72-
self.lock.unlock()
73-
return
74-
75-
case.buffering(_,.some),.failed(_,.some):
76-
letstate=self._state
77-
self.lock.unlock()
78-
preconditionFailure("Already waiting for demand. Invalid state: \(state)")
79-
80-
case.finished,.failed:
81-
letstate=self._state
82-
self.lock.unlock()
83-
preconditionFailure("Invalid state: \(state)")
8494
}
8595
}
8696

97+
privateenumNextAction{
98+
/// Resume the continuation if present, and return the result if present.
99+
case resumeAndReturn(CheckedContinuation<Void,Never>?,Result<Element?,Error>?)
100+
/// Suspend the current task and wait for the next value.
101+
case suspend
102+
}
103+
87104
privatefunc next()asyncthrows->Element?{
88-
self.lock.lock()
89-
switchself._state {
90-
case.buffering(let buffer,let demandContinuation)where buffer.isEmpty:
91-
returntryawaitwithCheckedThrowingContinuation{ continuation in
92-
self._state =.waiting(continuation)
93-
self.lock.unlock()
94-
demandContinuation?.resume(returning:())
95-
}
105+
letaction:NextAction=self.state.withLockedValue{ state in
106+
switch state {
107+
case.buffering(var buffer,let demandContinuation):
108+
if buffer.isEmpty {
109+
return.suspend
110+
}else{
111+
letfirst= buffer.removeFirst()
112+
if first !=nil{
113+
state =.buffering(buffer, demandContinuation)
114+
}else{
115+
state =.finished
116+
}
117+
return.resumeAndReturn(nil,.success(first))
118+
}
119+
120+
case.failed(let error,let demandContinuation):
121+
state =.finished
122+
return.resumeAndReturn(demandContinuation,.failure(error))
123+
124+
case.finished:
125+
return.resumeAndReturn(nil,.success(nil))
96126

97-
case.buffering(var buffer,let demandContinuation):
98-
letfirst= buffer.removeFirst()
99-
if first !=nil{
100-
self._state =.buffering(buffer, demandContinuation)
101-
}else{
102-
self._state =.finished
127+
case.waiting:
128+
preconditionFailure(
129+
"Expected that there is always only one concurrent call to next. Invalid state: \(state)"
130+
)
103131
}
104-
self.lock.unlock()
105-
return first
132+
}
106133

107-
case.failed(let error,let demandContinuation):
108-
self._state =.finished
109-
self.lock.unlock()
134+
switch action {
135+
case.resumeAndReturn(let demandContinuation,let result):
110136
demandContinuation?.resume()
111-
throw error
112-
113-
case.finished:
114-
self.lock.unlock()
115-
returnnil
116-
117-
case.waiting:
118-
letstate=self._state
119-
self.lock.unlock()
120-
preconditionFailure(
121-
"Expected that there is always only one concurrent call to next. Invalid state: \(state)"
122-
)
137+
returntry result?.get()
138+
139+
case.suspend:
140+
// Holding the lock here *should* be safe but because of a bug in the runtime
141+
// it isn't, so drop the lock, create the continuation and then try again.
142+
//
143+
// See https://github.com/swiftlang/swift/issues/85668
144+
returntryawaitwithCheckedThrowingContinuation{
145+
(continuation:CheckedContinuation<Element?,anyError>)in
146+
letaction:NextAction=self.state.withLockedValue{ state in
147+
switch state {
148+
case.buffering(var buffer,let demandContinuation):
149+
if buffer.isEmpty {
150+
state =.waiting(continuation)
151+
return.resumeAndReturn(demandContinuation,nil)
152+
}else{
153+
letfirst= buffer.removeFirst()
154+
if first !=nil{
155+
state =.buffering(buffer, demandContinuation)
156+
}else{
157+
state =.finished
158+
}
159+
return.resumeAndReturn(nil,.success(first))
160+
}
161+
162+
case.failed(let error,let demandContinuation):
163+
state =.finished
164+
return.resumeAndReturn(demandContinuation,.failure(error))
165+
166+
case.finished:
167+
return.resumeAndReturn(nil,.success(nil))
168+
169+
case.waiting:
170+
preconditionFailure(
171+
"Expected that there is always only one concurrent call to next. Invalid state: \(state)"
172+
)
173+
}
174+
}
175+
176+
switch action {
177+
case.resumeAndReturn(let demandContinuation,let result):
178+
demandContinuation?.resume()
179+
// Resume the continuation rather than returning th result.
180+
iflet result {
181+
continuation.resume(with: result)
182+
}
183+
case.suspend:
184+
preconditionFailure() // Not returned from the code above.
185+
}
186+
}
123187
}
124188
}
125189

@@ -137,19 +201,19 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
137201
}
138202

139203
privatefunc writeBufferOrEnd(_ element:Element?){
140-
letwriteAction=self.lock.withLock{()->WriteActionin
141-
switchself._state{
204+
letwriteAction=self.state.withLockedValue{state->WriteActionin
205+
switchstate{
142206
case.buffering(var buffer,let continuation):
143207
buffer.append(element)
144-
self._state=.buffering(buffer, continuation)
208+
state=.buffering(buffer, continuation)
145209
return.none
146210

147211
case.waiting(let continuation):
148-
self._state=.buffering(.init(),nil)
212+
state=.buffering(.init(),nil)
149213
return.succeedContinuation(continuation, element)
150214

151215
case.finished,.failed:
152-
preconditionFailure("Invalid state: \(self._state)")
216+
preconditionFailure("Invalid state: \(state)")
153217
}
154218
}
155219

@@ -170,17 +234,17 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
170234
/// Drops all buffered writes and emits an error on the waiting `next`. If there is no call to `next`
171235
/// waiting, will emit the error on the next call to `next`.
172236
publicfunc fail(_ error:Error){
173-
leterrorAction=self.lock.withLock{()->ErrorActionin
174-
switchself._state{
237+
leterrorAction=self.state.withLockedValue{state->ErrorActionin
238+
switchstate{
175239
case.buffering(_,let demandContinuation):
176-
self._state=.failed(error, demandContinuation)
240+
state=.failed(error, demandContinuation)
177241
return.none
178242

179243
case.failed,.finished:
180244
return.none
181245

182246
case.waiting(let continuation):
183-
self._state=.finished
247+
state=.finished
184248
return.failContinuation(continuation, error)
185249
}
186250
}

0 commit comments

Comments
(0)