Skip to content

Commit b019e08

Browse files
authored
Upload streaming (#37)
* Provide a way to stream Request.body Motivation: Users may want to optimize memory usage when executing request with big bodies, same as streaming download Modifications: Added ChunkProvider typealias in HTTPHandler.swift Added new Body enum case - stream in HTTPHandler.swift Extracted body processing in HTTPHandler.swift to a separate method Added .stream enum processing in HTTPHandler.swift Added upload streaming test to SwiftNIOHTTPTests.swift Result: HTTPClient library now provides methods to stream Request body
1 parent 100b04b commit b019e08

File tree

8 files changed

+357
-92
lines changed

8 files changed

+357
-92
lines changed

‎README.md‎

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,26 +116,38 @@ class CountingDelegate: HTTPResponseDelegate{
116116

117117
var count =0
118118

119-
funcdidTransmitRequestBody(){
120-
// this is executed when request is sent, called once
119+
funcdidSendRequestHead(task: HTTPClient.Task<Response>, _head: HTTPRequestHead){
120+
// this is executed right after request head was sent, called once
121121
}
122122

123-
funcdidReceiveHead(_head: HTTPResponseHead){
123+
funcdidSendRequestPart(task: HTTPClient.Task<Response>, _part: IOData){
124+
// this is executed when request body part is sent, could be called zero or more times
125+
}
126+
127+
funcdidSendRequest(task: HTTPClient.Task<Response>){
128+
// this is executed when request is fully sent, called once
129+
}
130+
131+
funcdidReceiveHead(task: HTTPClient.Task<Response>, _head: HTTPResponseHead) -> EventLoopFuture<Void>{
124132
// this is executed when we receive HTTP Reponse head part of the request (it contains response code and headers), called once
133+
// in case backpressure is needed, all reads will be paused until returned future is resolved
134+
return task.eventLoop.makeSucceededFuture(())
125135
}
126136

127-
funcdidReceivePart(_buffer: ByteBuffer){
137+
funcdidReceivePart(task: HTTPClient.Task<Response>, _buffer: ByteBuffer)-> EventLoopFuture<Void>{
128138
// this is executed when we receive parts of the response body, could be called zero or more times
129139
count += buffer.readableBytes
140+
// in case backpressure is needed, all reads will be paused until returned future is resolved
141+
return task.eventLoop.makeSucceededFuture(())
130142
}
131143

132-
funcdidFinishRequest() throws->Int{
144+
funcdidFinishRequest(task: HTTPClient.Task<Response>) throws->Int{
133145
// this is called when the request is fully read, called once
134146
// this is where you return a result or throw any errors you require to propagate to the client
135147
return count
136148
}
137149

138-
funcdidReceiveError(_error: Error){
150+
funcdidReceiveError(task: HTTPClient.Task<Response>, _error: Error){
139151
// this is called when we receive any network-related error, called once
140152
}
141153
}

‎Sources/NIOHTTPClient/HTTPHandler.swift‎

Lines changed: 127 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,48 @@ import NIOConcurrencyHelpers
1818
import NIOHTTP1
1919
import NIOSSL
2020

21-
publicextensionHTTPClient{
22-
enumBody:Equatable{
23-
case byteBuffer(ByteBuffer)
24-
case data(Data)
25-
case string(String)
26-
27-
varlength:Int{
28-
switchself{
29-
case.byteBuffer(let buffer):
30-
return buffer.readableBytes
31-
case.data(let data):
32-
return data.count
33-
case.string(let string):
34-
return string.utf8.count
21+
extensionHTTPClient{
22+
23+
publicstructBody{
24+
publicstructStreamWriter{
25+
letclosure:(IOData)->EventLoopFuture<Void>
26+
27+
publicfunc write(_ data:IOData)->EventLoopFuture<Void>{
28+
returnself.closure(data)
29+
}
30+
}
31+
32+
publicvarlength:Int?
33+
publicvarstream:(StreamWriter)->EventLoopFuture<Void>
34+
35+
publicstaticfunc byteBuffer(_ buffer:ByteBuffer)->Body{
36+
returnBody(length: buffer.readableBytes){ writer in
37+
writer.write(.byteBuffer(buffer))
38+
}
39+
}
40+
41+
publicstaticfunc stream(length:Int?=nil, _ stream:@escaping(StreamWriter)->EventLoopFuture<Void>)->Body{
42+
returnBody(length: length, stream: stream)
43+
}
44+
45+
publicstaticfunc data(_ data:Data)->Body{
46+
returnBody(length: data.count){ writer in
47+
varbuffer=ByteBufferAllocator().buffer(capacity: data.count)
48+
buffer.writeBytes(data)
49+
return writer.write(.byteBuffer(buffer))
50+
}
51+
}
52+
53+
publicstaticfunc string(_ string:String)->Body{
54+
returnBody(length: string.utf8.count){ writer in
55+
varbuffer=ByteBufferAllocator().buffer(capacity: string.utf8.count)
56+
buffer.writeString(string)
57+
return writer.write(.byteBuffer(buffer))
3558
}
3659
}
3760
}
3861

39-
structRequest:Equatable{
62+
publicstructRequest{
4063
publicvarversion:HTTPVersion
4164
publicvarmethod:HTTPMethod
4265
publicvarurl:URL
@@ -53,7 +76,7 @@ public extension HTTPClient{
5376
tryself.init(url: url, version: version, method: method, headers: headers, body: body)
5477
}
5578

56-
publicinit(url:URL, version:HTTPVersion, method:HTTPMethod=.GET, headers:HTTPHeaders=HTTPHeaders(), body:Body?=nil)throws{
79+
publicinit(url:URL, version:HTTPVersion=HTTPVersion(major:1, minor:1), method:HTTPMethod=.GET, headers:HTTPHeaders=HTTPHeaders(), body:Body?=nil)throws{
5780
guardlet scheme = url.scheme else{
5881
throwHTTPClientError.emptyScheme
5982
}
@@ -88,7 +111,7 @@ public extension HTTPClient{
88111
}
89112
}
90113

91-
structResponse:Equatable{
114+
publicstructResponse{
92115
publicvarhost:String
93116
publicvarstatus:HTTPResponseStatus
94117
publicvarheaders:HTTPHeaders
@@ -114,9 +137,7 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate{
114137
self.request = request
115138
}
116139

117-
func didTransmitRequestBody(task:HTTPClient.Task<Response>){}
118-
119-
func didReceiveHead(task:HTTPClient.Task<Response>, _ head:HTTPResponseHead){
140+
func didReceiveHead(task:HTTPClient.Task<Response>, _ head:HTTPResponseHead)->EventLoopFuture<Void>{
120141
switchself.state {
121142
case.idle:
122143
self.state =.head(head)
@@ -129,9 +150,10 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate{
129150
case.error:
130151
break
131152
}
153+
return task.eventLoop.makeSucceededFuture(())
132154
}
133155

134-
func didReceivePart(task:HTTPClient.Task<Response>, _ part:ByteBuffer){
156+
func didReceivePart(task:HTTPClient.Task<Response>, _ part:ByteBuffer)->EventLoopFuture<Void>{
135157
switchself.state {
136158
case.idle:
137159
preconditionFailure("no head received before body")
@@ -146,6 +168,7 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate{
146168
case.error:
147169
break
148170
}
171+
return task.eventLoop.makeSucceededFuture(())
149172
}
150173

151174
func didReceiveError(task:HTTPClient.Task<Response>, _ error:Error){
@@ -174,25 +197,33 @@ internal class ResponseAccumulator: HTTPClientResponseDelegate{
174197
publicprotocolHTTPClientResponseDelegate:AnyObject{
175198
associatedtypeResponse
176199

177-
func didTransmitRequestBody(task:HTTPClient.Task<Response>)
200+
func didSendRequestHead(task:HTTPClient.Task<Response>, _ head:HTTPRequestHead)
201+
202+
func didSendRequestPart(task:HTTPClient.Task<Response>, _ part:IOData)
203+
204+
func didSendRequest(task:HTTPClient.Task<Response>)
178205

179-
func didReceiveHead(task:HTTPClient.Task<Response>, _ head:HTTPResponseHead)
206+
func didReceiveHead(task:HTTPClient.Task<Response>, _ head:HTTPResponseHead)->EventLoopFuture<Void>
180207

181-
func didReceivePart(task:HTTPClient.Task<Response>, _ buffer:ByteBuffer)
208+
func didReceivePart(task:HTTPClient.Task<Response>, _ buffer:ByteBuffer)->EventLoopFuture<Void>
182209

183210
func didReceiveError(task:HTTPClient.Task<Response>, _ error:Error)
184211

185212
func didFinishRequest(task:HTTPClient.Task<Response>)throws->Response
186213
}
187214

188215
extensionHTTPClientResponseDelegate{
189-
funcdidTransmitRequestBody(task:HTTPClient.Task<Response>){}
216+
publicfuncdidSendRequestHead(task:HTTPClient.Task<Response>, _ head:HTTPRequestHead){}
190217

191-
funcdidReceiveHead(task:HTTPClient.Task<Response>, _:HTTPResponseHead){}
218+
publicfuncdidSendRequestPart(task:HTTPClient.Task<Response>, _ part:IOData){}
192219

193-
funcdidReceivePart(task:HTTPClient.Task<Response>, _:ByteBuffer){}
220+
publicfuncdidSendRequest(task:HTTPClient.Task<Response>){}
194221

195-
func didReceiveError(task:HTTPClient.Task<Response>, _:Error){}
222+
publicfunc didReceiveHead(task:HTTPClient.Task<Response>, _:HTTPResponseHead)->EventLoopFuture<Void>{return task.eventLoop.makeSucceededFuture(())}
223+
224+
publicfunc didReceivePart(task:HTTPClient.Task<Response>, _:ByteBuffer)->EventLoopFuture<Void>{return task.eventLoop.makeSucceededFuture(())}
225+
226+
publicfunc didReceiveError(task:HTTPClient.Task<Response>, _:Error){}
196227
}
197228

198229
internalextensionURL{
@@ -207,13 +238,15 @@ internal extension URL{
207238

208239
publicextensionHTTPClient{
209240
finalclassTask<Response>{
241+
publicleteventLoop:EventLoop
210242
letfuture:EventLoopFuture<Response>
211243

212244
privatevarchannel:Channel?
213245
privatevarcancelled:Bool
214246
privateletlock:Lock
215247

216-
init(future:EventLoopFuture<Response>){
248+
init(eventLoop:EventLoop, future:EventLoopFuture<Response>){
249+
self.eventLoop = eventLoop
217250
self.future = future
218251
self.cancelled =false
219252
self.lock =Lock()
@@ -267,6 +300,8 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler
267300
letredirectHandler:RedirectHandler<T.Response>?
268301

269302
varstate:State=.idle
303+
varpendingRead=false
304+
varmayRead=true
270305

271306
init(task:HTTPClient.Task<T.Response>, delegate:T, promise:EventLoopPromise<T.Response>, redirectHandler:RedirectHandler<T.Response>?){
272307
self.task = task
@@ -298,35 +333,52 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler
298333

299334
head.headers = headers
300335

301-
context.write(wrapOutboundOut(.head(head)), promise:nil)
336+
context.write(wrapOutboundOut(.head(head))).whenSuccess{
337+
self.delegate.didSendRequestHead(task:self.task, head)
338+
}
302339

303-
iflet body = request.body {
304-
letpart:HTTPClientRequestPart
305-
switch body {
306-
case.byteBuffer(let buffer):
307-
part =HTTPClientRequestPart.body(.byteBuffer(buffer))
308-
case.data(let data):
309-
varbuffer= context.channel.allocator.buffer(capacity: data.count)
310-
buffer.writeBytes(data)
311-
part =HTTPClientRequestPart.body(.byteBuffer(buffer))
312-
case.string(let string):
313-
varbuffer= context.channel.allocator.buffer(capacity: string.utf8.count)
314-
buffer.writeString(string)
315-
part =HTTPClientRequestPart.body(.byteBuffer(buffer))
316-
}
340+
self.writeBody(request: request, context: context).whenComplete{ result in
341+
switch result {
342+
case.success:
343+
context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
344+
context.flush()
317345

318-
context.write(wrapOutboundOut(part), promise:nil)
319-
}
346+
self.state =.sent
347+
self.delegate.didSendRequest(task:self.task)
320348

321-
context.write(wrapOutboundOut(.end(nil)), promise: promise)
322-
context.flush()
349+
letchannel= context.channel
350+
self.promise.futureResult.whenComplete{ _ in
351+
channel.close(promise:nil)
352+
}
353+
case.failure(let error):
354+
self.state =.end
355+
self.delegate.didReceiveError(task:self.task, error)
356+
self.promise.fail(error)
357+
context.close(promise:nil)
358+
}
359+
}
360+
}
323361

324-
self.state =.sent
325-
self.delegate.didTransmitRequestBody(task:self.task)
362+
privatefunc writeBody(request:HTTPClient.Request, context:ChannelHandlerContext)->EventLoopFuture<Void>{
363+
iflet body = request.body {
364+
return body.stream(HTTPClient.Body.StreamWriter{ part in
365+
letfuture= context.writeAndFlush(self.wrapOutboundOut(.body(part)))
366+
future.whenSuccess{ _ in
367+
self.delegate.didSendRequestPart(task:self.task, part)
368+
}
369+
return future
370+
})
371+
}else{
372+
return context.eventLoop.makeSucceededFuture(())
373+
}
374+
}
326375

327-
letchannel= context.channel
328-
self.promise.futureResult.whenComplete{ _ in
329-
channel.close(promise:nil)
376+
publicfunc read(context:ChannelHandlerContext){
377+
ifself.mayRead {
378+
self.pendingRead =false
379+
context.read()
380+
}else{
381+
self.pendingRead =true
330382
}
331383
}
332384

@@ -338,15 +390,21 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler
338390
self.state =.redirected(head, redirectURL)
339391
}else{
340392
self.state =.head
341-
self.delegate.didReceiveHead(task:self.task, head)
393+
self.mayRead =false
394+
self.delegate.didReceiveHead(task:self.task, head).whenComplete{ result in
395+
self.handleBackpressureResult(context: context, result: result)
396+
}
342397
}
343398
case.body(let body):
344399
switchself.state {
345400
case.redirected:
346401
break
347402
default:
348403
self.state =.body
349-
self.delegate.didReceivePart(task:self.task, body)
404+
self.mayRead =false
405+
self.delegate.didReceivePart(task:self.task, body).whenComplete{ result in
406+
self.handleBackpressureResult(context: context, result: result)
407+
}
350408
}
351409
case.end:
352410
switchself.state {
@@ -365,6 +423,20 @@ internal class TaskHandler<T: HTTPClientResponseDelegate>: ChannelInboundHandler
365423
}
366424
}
367425

426+
privatefunc handleBackpressureResult(context:ChannelHandlerContext, result:Result<Void,Error>){
427+
switch result {
428+
case.success:
429+
self.mayRead =true
430+
ifself.pendingRead {
431+
context.read()
432+
}
433+
case.failure(let error):
434+
self.state =.end
435+
self.delegate.didReceiveError(task:self.task, error)
436+
self.promise.fail(error)
437+
}
438+
}
439+
368440
func userInboundEventTriggered(context:ChannelHandlerContext, event:Any){
369441
if(event as?IdleStateHandler.IdleStateEvent)==.read {
370442
self.state =.end

‎Sources/NIOHTTPClient/RequestValidation.swift‎

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,17 @@ extension HTTPHeaders{
3535
}
3636

3737
if encodings.isEmpty {
38-
contentLength = body.length
38+
guardlet length = body.length else{
39+
throwHTTPClientError.contentLengthMissing
40+
}
41+
contentLength = length
3942
}else{
4043
transferEncoding = encodings.joined(separator:", ")
4144
if !encodings.contains("chunked"){
42-
contentLength = body.length
45+
guardlet length = body.length else{
46+
throwHTTPClientError.contentLengthMissing
47+
}
48+
contentLength = length
4349
}
4450
}
4551
}else{

‎Sources/NIOHTTPClient/SwiftNIOHTTP.swift‎

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ public class HTTPClient{
110110

111111
publicfunc execute<T:HTTPClientResponseDelegate>(request:Request, delegate:T, timeout:Timeout?=nil)->Task<T.Response>{
112112
lettimeout= timeout ?? configuration.timeout
113-
letpromise:EventLoopPromise<T.Response>=self.eventLoopGroup.next().makePromise()
113+
leteventLoop=self.eventLoopGroup.next()
114+
letpromise:EventLoopPromise<T.Response>= eventLoop.makePromise()
114115

115116
letredirectHandler:RedirectHandler<T.Response>?
116117
ifself.configuration.followRedirects {
@@ -121,7 +122,7 @@ public class HTTPClient{
121122
redirectHandler =nil
122123
}
123124

124-
lettask=Task(future: promise.futureResult)
125+
lettask=Task(eventLoop: eventLoop,future: promise.futureResult)
125126

126127
varbootstrap=ClientBootstrap(group:self.eventLoopGroup)
127128
.channelOption(ChannelOptions.socket(SocketOptionLevel(IPPROTO_TCP), TCP_NODELAY), value:1)
@@ -256,6 +257,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible{
256257
case identityCodingIncorrectlyPresent
257258
case chunkedSpecifiedMultipleTimes
258259
case invalidProxyResponse
260+
case contentLengthMissing
259261
}
260262

261263
privatevarcode:Code
@@ -279,4 +281,5 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible{
279281
publicstaticletidentityCodingIncorrectlyPresent=HTTPClientError(code:.identityCodingIncorrectlyPresent)
280282
publicstaticletchunkedSpecifiedMultipleTimes=HTTPClientError(code:.chunkedSpecifiedMultipleTimes)
281283
publicstaticletinvalidProxyResponse=HTTPClientError(code:.invalidProxyResponse)
284+
publicstaticletcontentLengthMissing=HTTPClientError(code:.contentLengthMissing)
282285
}

0 commit comments

Comments
(0)