Factors out `MaybeSendBufferedData()`, and moves the functionality into `SendQueuedFrames()`. Also defines a `SendResult` enum to represent the effect of a send operation. This required more plumbing than I expected. PiperOrigin-RevId: 409452834
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc index 0a037c1..d398b1b 100644 --- a/http2/adapter/oghttp2_session.cc +++ b/http2/adapter/oghttp2_session.cc
@@ -403,41 +403,49 @@ RunOnExit r{[this]() { sending_ = false; }}; MaybeSetupPreface(); - int64_t result = std::numeric_limits<int64_t>::max(); - // Flush any serialized prefix. - while (result > 0 && !serialized_prefix_.empty()) { - result = visitor_.OnReadyToSend(serialized_prefix_); - if (result > 0) { - serialized_prefix_.erase(0, result); - } - } - if (result < 0) { - LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR, - ConnectionError::kSendError); - return result; - } else if (!serialized_prefix_.empty()) { - return 0; - } - bool continue_writing = SendQueuedFrames(); - while (continue_writing && !connection_metadata_.empty()) { + SendResult continue_writing = SendQueuedFrames(); + while (continue_writing == SendResult::SEND_OK && + !connection_metadata_.empty()) { continue_writing = SendMetadata(0, connection_metadata_); } // Wake streams for writes. - while (continue_writing && write_scheduler_.HasReadyStreams() && - connection_send_window_ > 0) { + while (continue_writing == SendResult::SEND_OK && + write_scheduler_.HasReadyStreams() && connection_send_window_ > 0) { const Http2StreamId stream_id = write_scheduler_.PopNextReadyStream(); // TODO(birenroy): Add a return value to indicate write blockage, so streams // aren't woken unnecessarily. continue_writing = WriteForStream(stream_id); } - if (continue_writing) { - SendQueuedFrames(); + if (continue_writing == SendResult::SEND_OK) { + continue_writing = SendQueuedFrames(); } - return 0; + return continue_writing == SendResult::SEND_ERROR ? -1 : 0; } -bool OgHttp2Session::SendQueuedFrames() { +OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() { + int64_t result = std::numeric_limits<int64_t>::max(); + while (result > 0 && !buffered_data_.empty()) { + result = visitor_.OnReadyToSend(buffered_data_); + if (result > 0) { + buffered_data_.erase(0, result); + } + } + if (result < 0) { + LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR, + ConnectionError::kSendError); + return SendResult::SEND_ERROR; + } + return buffered_data_.empty() ? SendResult::SEND_OK + : SendResult::SEND_BLOCKED; +} + +OgHttp2Session::SendResult OgHttp2Session::SendQueuedFrames() { + // Flush any serialized prefix. + const SendResult result = MaybeSendBufferedData(); + if (result != SendResult::SEND_OK) { + return result; + } // Serialize and send frames in the queue. while (!frames_.empty()) { const auto& frame_ptr = frames_.front(); @@ -454,10 +462,10 @@ if (result < 0) { LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR, ConnectionError::kSendError); - return false; + return SendResult::SEND_ERROR; } else if (result == 0) { // Write blocked. - return false; + return SendResult::SEND_BLOCKED; } else { visitor_.OnFrameSent(c.frame_type(), c.stream_id(), frame_payload_length, c.flags(), c.error_code()); @@ -471,23 +479,24 @@ frames_.pop_front(); if (static_cast<size_t>(result) < frame.size()) { // The frame was partially written, so the rest must be buffered. - serialized_prefix_.append(frame.data() + result, frame.size() - result); - return false; + buffered_data_.append(frame.data() + result, frame.size() - result); + return SendResult::SEND_BLOCKED; } } } - return true; + return SendResult::SEND_OK; } -bool OgHttp2Session::WriteForStream(Http2StreamId stream_id) { +OgHttp2Session::SendResult OgHttp2Session::WriteForStream( + Http2StreamId stream_id) { auto it = stream_map_.find(stream_id); if (it == stream_map_.end()) { QUICHE_LOG(ERROR) << "Can't find stream " << stream_id << " which is ready to write!"; - return true; + return SendResult::SEND_OK; } StreamState& state = it->second; - bool connection_can_write = true; + SendResult connection_can_write = SendResult::SEND_OK; if (!state.outbound_metadata.empty()) { connection_can_write = SendMetadata(stream_id, state.outbound_metadata); } @@ -503,12 +512,12 @@ MaybeCloseWithRstStream(stream_id, state); } } - return true; + return SendResult::SEND_OK; } int32_t available_window = std::min({connection_send_window_, state.send_window, static_cast<int32_t>(max_frame_payload_)}); - while (connection_can_write && available_window > 0 && + while (connection_can_write == SendResult::SEND_OK && available_window > 0 && state.outbound_body != nullptr) { int64_t length; bool end_data; @@ -530,11 +539,11 @@ spdy::SpdySerializedFrame header = spdy::SpdyFramer::SerializeDataFrameHeaderWithPaddingLengthField( data); - QUICHE_DCHECK(serialized_prefix_.empty() && frames_.empty()); + QUICHE_DCHECK(buffered_data_.empty() && frames_.empty()); const bool success = state.outbound_body->Send(absl::string_view(header), length); if (!success) { - connection_can_write = false; + connection_can_write = SendResult::SEND_BLOCKED; break; } visitor_.OnFrameSent(/* DATA */ 0, stream_id, length, fin ? 0x1 : 0x0, 0); @@ -571,11 +580,14 @@ } // Streams can continue writing as long as the connection is not write-blocked // and there is additional flow control quota available. - return connection_can_write && available_window > 0; + if (connection_can_write != SendResult::SEND_OK) { + return connection_can_write; + } + return available_window <= 0 ? SendResult::SEND_BLOCKED : SendResult::SEND_OK; } -bool OgHttp2Session::SendMetadata(Http2StreamId stream_id, - OgHttp2Session::MetadataSequence& sequence) { +OgHttp2Session::SendResult OgHttp2Session::SendMetadata( + Http2StreamId stream_id, OgHttp2Session::MetadataSequence& sequence) { const uint32_t max_payload_size = std::min(kMaxAllowedMetadataFrameSize, max_frame_payload_); auto payload_buffer = absl::make_unique<uint8_t[]>(max_payload_size); @@ -588,7 +600,7 @@ source.Pack(payload_buffer.get(), max_payload_size); if (written < 0) { // Did not touch the connection, so perhaps writes are still possible. - return true; + return SendResult::SEND_OK; } QUICHE_DCHECK_LE(static_cast<size_t>(written), max_payload_size); auto payload = absl::string_view( @@ -1006,8 +1018,8 @@ void OgHttp2Session::MaybeSetupPreface() { if (!queued_preface_) { if (options_.perspective == Perspective::kClient) { - serialized_prefix_.assign(spdy::kHttp2ConnectionHeaderPrefix, - spdy::kHttp2ConnectionHeaderPrefixSize); + buffered_data_.assign(spdy::kHttp2ConnectionHeaderPrefix, + spdy::kHttp2ConnectionHeaderPrefixSize); } // First frame must be a non-ack SETTINGS. if (frames_.empty() ||
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h index 6c844d8..88cdb7d 100644 --- a/http2/adapter/oghttp2_session.h +++ b/http2/adapter/oghttp2_session.h
@@ -101,7 +101,7 @@ return !received_goaway_ && !decoder_.HasError(); } bool want_write() const override { - return !frames_.empty() || !serialized_prefix_.empty() || + return !frames_.empty() || !buffered_data_.empty() || write_scheduler_.HasReadyStreams() || !connection_metadata_.empty(); } int GetRemoteWindowSize() const override { return connection_send_window_; } @@ -246,15 +246,25 @@ void SendWindowUpdate(Http2StreamId stream_id, size_t update_delta); - // Sends queued frames, returning true if all frames were flushed - // successfully. - bool SendQueuedFrames(); + enum class SendResult { + // All data was flushed. + SEND_OK, + // Not all data was flushed (due to flow control or TCP back pressure). + SEND_BLOCKED, + // An error occurred while sending data. + SEND_ERROR, + }; - // Returns false if the connection is write-blocked (due to flow control or - // some other reason). - bool WriteForStream(Http2StreamId stream_id); + // Sends the buffered connection preface or serialized frame data, if any. + SendResult MaybeSendBufferedData(); - bool SendMetadata(Http2StreamId stream_id, MetadataSequence& sequence); + // Serializes and sends queued frames. + SendResult SendQueuedFrames(); + + // Writes DATA frames for stream `stream_id`. + SendResult WriteForStream(Http2StreamId stream_id); + + SendResult SendMetadata(Http2StreamId stream_id, MetadataSequence& sequence); void SendHeaders(Http2StreamId stream_id, spdy::SpdyHeaderBlock headers, bool end_stream); @@ -317,10 +327,11 @@ // `max_outbound_concurrent_streams_`. std::list<PendingStreamState> pending_streams_; - // Maintains the queue of outbound frames, and any serialized bytes that have - // not yet been consumed. + // The queue of outbound frames. std::list<std::unique_ptr<spdy::SpdyFrameIR>> frames_; - std::string serialized_prefix_; + // Buffered data (connection preface, serialized frames) that has not yet been + // sent. + std::string buffered_data_; // Maintains the set of streams ready to write data to the peer. using WriteScheduler = PriorityWriteScheduler<Http2StreamId>;