Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ final class Transaction:
break

case .forwardStreamFinished(let executor):
executor.finishRequestBodyStream(self, promise: nil)
executor.finishRequestBodyStream(trailers: nil, request: self, promise: nil)
}
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
case .sendBodyPart(let part, let writePromise):
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise)

case .sendRequestEnd(let writePromise, let finalAction):
case .sendRequestEnd(let trailers, let writePromise, let finalAction):

let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We need to defer succeeding the old request to avoid ordering issues
Expand Down Expand Up @@ -282,7 +282,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
}
}

context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
context.writeAndFlush(self.wrapOutboundOut(.end(trailers)), promise: writePromise)

if let readTimeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
self.runTimeoutAction(readTimeoutAction, context: context)
Expand Down Expand Up @@ -339,7 +339,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
// that the request is neither failed nor finished yet
self.request!.receiveResponseBodyParts(buffer)

case .forwardResponseEnd(let finalAction, let buffer):
case .forwardResponseEnd(let finalAction, let buffer, let trailers):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet

Expand All @@ -358,15 +358,15 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
case .close:
self.request = nil
context.close(promise: nil)
oldRequest.receiveResponseEnd(buffer, trailers: nil)
oldRequest.receiveResponseEnd(buffer, trailers: trailers)

case .none:
oldRequest.receiveResponseEnd(buffer, trailers: nil)
oldRequest.receiveResponseEnd(buffer, trailers: trailers)

case .informConnectionIsIdle:
self.request = nil
self.onConnectionIdle()
oldRequest.receiveResponseEnd(buffer, trailers: nil)
oldRequest.receiveResponseEnd(buffer, trailers: trailers)
}

case .failRequest(let error, let finalAction):
Expand Down Expand Up @@ -504,14 +504,18 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
self.run(action, context: context)
}

fileprivate func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
fileprivate func finishRequestBodyStream0(
trailers: HTTPHeaders?,
request: HTTPExecutableRequest,
promise: EventLoopPromise<Void>?
) {
guard self.request === request, let context = self.channelContext else {
// See code comment in `writeRequestBodyPart0`
promise?.fail(HTTPClientError.requestStreamCancelled)
return
}

let action = self.state.requestStreamFinished(promise: promise)
let action = self.state.requestStreamFinished(trailers: trailers, promise: promise)
self.run(action, context: context)
}

Expand Down Expand Up @@ -565,9 +569,13 @@ extension HTTP1ClientChannelHandler {
}
}

func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
func finishRequestBodyStream(
trailers: HTTPHeaders?,
request: HTTPExecutableRequest,
promise: EventLoopPromise<Void>?
) {
self.loopBound.execute {
$0.finishRequestBodyStream0(request, promise: promise)
$0.finishRequestBodyStream0(trailers: trailers, request: request, promise: promise)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ struct HTTP1ConnectionStateMachine {
startIdleTimer: Bool
)
case sendBodyPart(IOData, EventLoopPromise<Void>?)
case sendRequestEnd(EventLoopPromise<Void>?, FinalSuccessfulStreamAction)
case sendRequestEnd(trailers: HTTPHeaders?, EventLoopPromise<Void>?, FinalSuccessfulStreamAction)
case failSendBodyPart(Error, EventLoopPromise<Void>?)
case failSendStreamFinished(Error, EventLoopPromise<Void>?)

Expand All @@ -66,7 +66,7 @@ struct HTTP1ConnectionStateMachine {

case forwardResponseHead(HTTPResponseHead, pauseRequestBodyStream: Bool)
case forwardResponseBodyParts(CircularBuffer<ByteBuffer>)
case forwardResponseEnd(FinalSuccessfulStreamAction, CircularBuffer<ByteBuffer>)
case forwardResponseEnd(FinalSuccessfulStreamAction, CircularBuffer<ByteBuffer>, HTTPHeaders?)

case failRequest(Error, FinalFailedStreamAction)

Expand Down Expand Up @@ -218,13 +218,13 @@ struct HTTP1ConnectionStateMachine {
}
}

mutating func requestStreamFinished(promise: EventLoopPromise<Void>?) -> Action {
mutating func requestStreamFinished(trailers: HTTPHeaders?, promise: EventLoopPromise<Void>?) -> Action {
guard case .inRequest(var requestStateMachine, let close) = self.state else {
fatalError("Invalid state: \(self.state)")
}

return self.avoidingStateMachineCoW { state -> Action in
let action = requestStateMachine.requestStreamFinished(promise: promise)
let action = requestStateMachine.requestStreamFinished(trailers: trailers, promise: promise)
state = .inRequest(requestStateMachine, close: close)
return state.modify(with: action)
}
Expand Down Expand Up @@ -427,7 +427,7 @@ extension HTTP1ConnectionStateMachine.State {
return .resumeRequestBodyStream
case .sendBodyPart(let part, let writePromise):
return .sendBodyPart(part, writePromise)
case .sendRequestEnd(let writePromise, let finalAction):
case .sendRequestEnd(let trailers, let writePromise, let finalAction):
guard case .inRequest(_, close: let close) = self else {
assertionFailure("Invalid state: \(self)")
self = .closing
Expand All @@ -450,13 +450,13 @@ extension HTTP1ConnectionStateMachine.State {
case .none:
newFinalAction = .none
}
return .sendRequestEnd(writePromise, newFinalAction)
return .sendRequestEnd(trailers: trailers, writePromise, newFinalAction)

case .forwardResponseHead(let head, let pauseRequestBodyStream):
return .forwardResponseHead(head, pauseRequestBodyStream: pauseRequestBodyStream)
case .forwardResponseBodyParts(let parts):
return .forwardResponseBodyParts(parts)
case .forwardResponseEnd(let finalAction, let finalParts):
case .forwardResponseEnd(let finalAction, let finalParts, let trailers):
guard case .inRequest(_, close: let close) = self else {
assertionFailure("Invalid state: \(self)")
self = .closing
Expand All @@ -480,7 +480,7 @@ extension HTTP1ConnectionStateMachine.State {
// request is ongoing. request stream is still alive
newFinalAction = .none
}
return .forwardResponseEnd(newFinalAction, finalParts)
return .forwardResponseEnd(newFinalAction, finalParts, trailers)

case .failRequest(let error, let finalAction):
switch self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
case .sendBodyPart(let data, let writePromise):
context.writeAndFlush(self.wrapOutboundOut(.body(data)), promise: writePromise)

case .sendRequestEnd(let writePromise, let finalAction):
case .sendRequestEnd(let trailers, let writePromise, let finalAction):
let promise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request is neither failed nor finished yet
Expand All @@ -205,7 +205,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
request.requestBodyStreamSent()
}

context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: promise)
context.writeAndFlush(self.wrapOutboundOut(.end(trailers)), promise: promise)

if let readTimeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
self.runTimeoutAction(readTimeoutAction, context: context)
Expand Down Expand Up @@ -256,10 +256,10 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
// the right result for HTTP/1). In the h2 case we MUST always close.
self.runFailedFinalAction(finalAction, context: context, error: error)

case .forwardResponseEnd(let finalAction, let finalParts):
case .forwardResponseEnd(let finalAction, let finalParts, let trailers):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request object is still present.
self.request!.receiveResponseEnd(finalParts, trailers: nil)
self.request!.receiveResponseEnd(finalParts, trailers: trailers)
self.request = nil
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
self.runTimeoutAction(.clearIdleWriteTimeoutTimer, context: context)
Expand Down Expand Up @@ -405,13 +405,17 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
self.run(action, context: context)
}

private func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
private func finishRequestBodyStream0(
trailers: HTTPHeaders?,
request: HTTPExecutableRequest,
promise: EventLoopPromise<Void>?
) {
guard self.request === request, let context = self.channelContext else {
// See code comment in `writeRequestBodyPart0`
return
}

let action = self.state.requestStreamFinished(promise: promise)
let action = self.state.requestStreamFinished(trailers: trailers, promise: promise)
self.run(action, context: context)
}

Expand Down Expand Up @@ -461,9 +465,13 @@ extension HTTP2ClientRequestHandler {
}
}

func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
func finishRequestBodyStream(
trailers: HTTPHeaders?,
request: HTTPExecutableRequest,
promise: EventLoopPromise<Void>?
) {
self.loopBound.execute {
$0.finishRequestBodyStream0(request, promise: promise)
$0.finishRequestBodyStream0(trailers: trailers, request: request, promise: promise)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,11 @@ protocol HTTPRequestExecutor: Sendable {
/// Signals that the request body stream has finished
///
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
func finishRequestBodyStream(_ task: HTTPExecutableRequest, promise: EventLoopPromise<Void>?)
func finishRequestBodyStream(
trailers: HTTPHeaders?,
request: HTTPExecutableRequest,
promise: EventLoopPromise<Void>?
)

/// Signals that more bytes from response body stream can be consumed.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ struct HTTPRequestStateMachine {
startIdleTimer: Bool
)
case sendBodyPart(IOData, EventLoopPromise<Void>?)
case sendRequestEnd(EventLoopPromise<Void>?, FinalSuccessfulRequestAction)
case sendRequestEnd(trailers: HTTPHeaders?, EventLoopPromise<Void>?, FinalSuccessfulRequestAction)
case failSendBodyPart(Error, EventLoopPromise<Void>?)
case failSendStreamFinished(Error, EventLoopPromise<Void>?)

Expand All @@ -111,7 +111,7 @@ struct HTTPRequestStateMachine {

case forwardResponseHead(HTTPResponseHead, pauseRequestBodyStream: Bool)
case forwardResponseBodyParts(CircularBuffer<ByteBuffer>)
case forwardResponseEnd(FinalSuccessfulRequestAction, CircularBuffer<ByteBuffer>)
case forwardResponseEnd(FinalSuccessfulRequestAction, CircularBuffer<ByteBuffer>, HTTPHeaders?)

case failRequest(Error, FinalFailedRequestAction)

Expand Down Expand Up @@ -353,7 +353,7 @@ struct HTTPRequestStateMachine {
}
}

mutating func requestStreamFinished(promise: EventLoopPromise<Void>?) -> Action {
mutating func requestStreamFinished(trailers: HTTPHeaders?, promise: EventLoopPromise<Void>?) -> Action {
switch self.state {
case .initialized,
.waitForChannelToBecomeWritable,
Expand All @@ -370,7 +370,7 @@ struct HTTPRequestStateMachine {
}

self.state = .running(.endSent, .waitingForHead)
return .sendRequestEnd(promise, .none)
return .sendRequestEnd(trailers: trailers, promise, .none)

case .running(
.streaming(let expectedBodyLength, let sentBodyBytes, _),
Expand All @@ -385,7 +385,7 @@ struct HTTPRequestStateMachine {
}

self.state = .running(.endSent, .receivingBody(head, streamState))
return .sendRequestEnd(promise, .none)
return .sendRequestEnd(trailers: trailers, promise, .none)

case .running(.streaming(let expectedBodyLength, let sentBodyBytes, _), .endReceived):
if let expected = expectedBodyLength, expected != sentBodyBytes {
Expand All @@ -395,7 +395,7 @@ struct HTTPRequestStateMachine {
}

self.state = .finished
return .sendRequestEnd(promise, .requestDone)
return .sendRequestEnd(trailers: trailers, promise, .requestDone)

case .failed(let error):
return .failSendStreamFinished(error, promise)
Expand Down Expand Up @@ -497,8 +497,8 @@ struct HTTPRequestStateMachine {
return self.receivedHTTPResponseHead(head)
case .body(let body):
return self.receivedHTTPResponseBodyPart(body)
case .end:
return self.receivedHTTPResponseEnd()
case .end(let trailers):
return self.receivedHTTPResponseEnd(trailers: trailers)
}
}

Expand Down Expand Up @@ -618,7 +618,7 @@ struct HTTPRequestStateMachine {
}
}

private mutating func receivedHTTPResponseEnd() -> Action {
private mutating func receivedHTTPResponseEnd(trailers: HTTPHeaders?) -> Action {
switch self.state {
case .initialized, .waitForChannelToBecomeWritable:
preconditionFailure(
Expand Down Expand Up @@ -648,7 +648,7 @@ struct HTTPRequestStateMachine {
),
.endReceived
)
return .forwardResponseEnd(.none, remainingBuffer)
return .forwardResponseEnd(.none, remainingBuffer, trailers)

case .close:
// If we receive a `.close` as a connectionAction from the responseStreamState
Expand All @@ -672,7 +672,7 @@ struct HTTPRequestStateMachine {
// connection should be closed anyway.
let (remainingBuffer, _) = responseStreamState.end()
state = .finished
return .forwardResponseEnd(.close, remainingBuffer)
return .forwardResponseEnd(.close, remainingBuffer, trailers)
}

case .running(.endSent, .receivingBody(_, var responseStreamState)):
Expand All @@ -681,9 +681,9 @@ struct HTTPRequestStateMachine {
state = .finished
switch action {
case .none:
return .forwardResponseEnd(.requestDone, remainingBuffer)
return .forwardResponseEnd(.requestDone, remainingBuffer, trailers)
case .close:
return .forwardResponseEnd(.close, remainingBuffer)
return .forwardResponseEnd(.close, remainingBuffer, trailers)
}
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/AsyncHTTPClient/RequestBag.swift
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate & Sendable>: Sendabl
promise.futureResult.whenSuccess {
self.delegate.didSendRequest(task: self.task)
}
writer.finishRequestBodyStream(self, promise: promise)
writer.finishRequestBodyStream(trailers: nil, request: self, promise: promise)

case .forwardStreamFinishedAndSucceedTask(let writer, let writerPromise):
let promise = writerPromise ?? self.task.eventLoop.makePromise(of: Void.self)
Expand All @@ -256,7 +256,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate & Sendable>: Sendabl
self.task.promise.fail(error)
}
}
writer.finishRequestBodyStream(self, promise: promise)
writer.finishRequestBodyStream(trailers: nil, request: self, promise: promise)

case .forwardStreamFailureAndFailTask(let writer, let error, let promise):
writer.cancelRequest(self)
Expand Down
Loading