Factors out `MaybeSendBufferedData()`, and moves the functionality into `SendQueuedFrames()`.
Also defines a `SendResult` enum to represent the effect of a send operation. This required more plumbing than I expected.
PiperOrigin-RevId: 409452834
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 0a037c1..d398b1b 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -403,41 +403,49 @@
RunOnExit r{[this]() { sending_ = false; }};
MaybeSetupPreface();
- int64_t result = std::numeric_limits<int64_t>::max();
- // Flush any serialized prefix.
- while (result > 0 && !serialized_prefix_.empty()) {
- result = visitor_.OnReadyToSend(serialized_prefix_);
- if (result > 0) {
- serialized_prefix_.erase(0, result);
- }
- }
- if (result < 0) {
- LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
- ConnectionError::kSendError);
- return result;
- } else if (!serialized_prefix_.empty()) {
- return 0;
- }
- bool continue_writing = SendQueuedFrames();
- while (continue_writing && !connection_metadata_.empty()) {
+ SendResult continue_writing = SendQueuedFrames();
+ while (continue_writing == SendResult::SEND_OK &&
+ !connection_metadata_.empty()) {
continue_writing = SendMetadata(0, connection_metadata_);
}
// Wake streams for writes.
- while (continue_writing && write_scheduler_.HasReadyStreams() &&
- connection_send_window_ > 0) {
+ while (continue_writing == SendResult::SEND_OK &&
+ write_scheduler_.HasReadyStreams() && connection_send_window_ > 0) {
const Http2StreamId stream_id = write_scheduler_.PopNextReadyStream();
// TODO(birenroy): Add a return value to indicate write blockage, so streams
// aren't woken unnecessarily.
continue_writing = WriteForStream(stream_id);
}
- if (continue_writing) {
- SendQueuedFrames();
+ if (continue_writing == SendResult::SEND_OK) {
+ continue_writing = SendQueuedFrames();
}
- return 0;
+ return continue_writing == SendResult::SEND_ERROR ? -1 : 0;
}
-bool OgHttp2Session::SendQueuedFrames() {
+OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() {
+ int64_t result = std::numeric_limits<int64_t>::max();
+ while (result > 0 && !buffered_data_.empty()) {
+ result = visitor_.OnReadyToSend(buffered_data_);
+ if (result > 0) {
+ buffered_data_.erase(0, result);
+ }
+ }
+ if (result < 0) {
+ LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
+ ConnectionError::kSendError);
+ return SendResult::SEND_ERROR;
+ }
+ return buffered_data_.empty() ? SendResult::SEND_OK
+ : SendResult::SEND_BLOCKED;
+}
+
+OgHttp2Session::SendResult OgHttp2Session::SendQueuedFrames() {
+ // Flush any serialized prefix.
+ const SendResult result = MaybeSendBufferedData();
+ if (result != SendResult::SEND_OK) {
+ return result;
+ }
// Serialize and send frames in the queue.
while (!frames_.empty()) {
const auto& frame_ptr = frames_.front();
@@ -454,10 +462,10 @@
if (result < 0) {
LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
ConnectionError::kSendError);
- return false;
+ return SendResult::SEND_ERROR;
} else if (result == 0) {
// Write blocked.
- return false;
+ return SendResult::SEND_BLOCKED;
} else {
visitor_.OnFrameSent(c.frame_type(), c.stream_id(), frame_payload_length,
c.flags(), c.error_code());
@@ -471,23 +479,24 @@
frames_.pop_front();
if (static_cast<size_t>(result) < frame.size()) {
// The frame was partially written, so the rest must be buffered.
- serialized_prefix_.append(frame.data() + result, frame.size() - result);
- return false;
+ buffered_data_.append(frame.data() + result, frame.size() - result);
+ return SendResult::SEND_BLOCKED;
}
}
}
- return true;
+ return SendResult::SEND_OK;
}
-bool OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
+OgHttp2Session::SendResult OgHttp2Session::WriteForStream(
+ Http2StreamId stream_id) {
auto it = stream_map_.find(stream_id);
if (it == stream_map_.end()) {
QUICHE_LOG(ERROR) << "Can't find stream " << stream_id
<< " which is ready to write!";
- return true;
+ return SendResult::SEND_OK;
}
StreamState& state = it->second;
- bool connection_can_write = true;
+ SendResult connection_can_write = SendResult::SEND_OK;
if (!state.outbound_metadata.empty()) {
connection_can_write = SendMetadata(stream_id, state.outbound_metadata);
}
@@ -503,12 +512,12 @@
MaybeCloseWithRstStream(stream_id, state);
}
}
- return true;
+ return SendResult::SEND_OK;
}
int32_t available_window =
std::min({connection_send_window_, state.send_window,
static_cast<int32_t>(max_frame_payload_)});
- while (connection_can_write && available_window > 0 &&
+ while (connection_can_write == SendResult::SEND_OK && available_window > 0 &&
state.outbound_body != nullptr) {
int64_t length;
bool end_data;
@@ -530,11 +539,11 @@
spdy::SpdySerializedFrame header =
spdy::SpdyFramer::SerializeDataFrameHeaderWithPaddingLengthField(
data);
- QUICHE_DCHECK(serialized_prefix_.empty() && frames_.empty());
+ QUICHE_DCHECK(buffered_data_.empty() && frames_.empty());
const bool success =
state.outbound_body->Send(absl::string_view(header), length);
if (!success) {
- connection_can_write = false;
+ connection_can_write = SendResult::SEND_BLOCKED;
break;
}
visitor_.OnFrameSent(/* DATA */ 0, stream_id, length, fin ? 0x1 : 0x0, 0);
@@ -571,11 +580,14 @@
}
// Streams can continue writing as long as the connection is not write-blocked
// and there is additional flow control quota available.
- return connection_can_write && available_window > 0;
+ if (connection_can_write != SendResult::SEND_OK) {
+ return connection_can_write;
+ }
+ return available_window <= 0 ? SendResult::SEND_BLOCKED : SendResult::SEND_OK;
}
-bool OgHttp2Session::SendMetadata(Http2StreamId stream_id,
- OgHttp2Session::MetadataSequence& sequence) {
+OgHttp2Session::SendResult OgHttp2Session::SendMetadata(
+ Http2StreamId stream_id, OgHttp2Session::MetadataSequence& sequence) {
const uint32_t max_payload_size =
std::min(kMaxAllowedMetadataFrameSize, max_frame_payload_);
auto payload_buffer = absl::make_unique<uint8_t[]>(max_payload_size);
@@ -588,7 +600,7 @@
source.Pack(payload_buffer.get(), max_payload_size);
if (written < 0) {
// Did not touch the connection, so perhaps writes are still possible.
- return true;
+ return SendResult::SEND_OK;
}
QUICHE_DCHECK_LE(static_cast<size_t>(written), max_payload_size);
auto payload = absl::string_view(
@@ -1006,8 +1018,8 @@
void OgHttp2Session::MaybeSetupPreface() {
if (!queued_preface_) {
if (options_.perspective == Perspective::kClient) {
- serialized_prefix_.assign(spdy::kHttp2ConnectionHeaderPrefix,
- spdy::kHttp2ConnectionHeaderPrefixSize);
+ buffered_data_.assign(spdy::kHttp2ConnectionHeaderPrefix,
+ spdy::kHttp2ConnectionHeaderPrefixSize);
}
// First frame must be a non-ack SETTINGS.
if (frames_.empty() ||
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 6c844d8..88cdb7d 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -101,7 +101,7 @@
return !received_goaway_ && !decoder_.HasError();
}
bool want_write() const override {
- return !frames_.empty() || !serialized_prefix_.empty() ||
+ return !frames_.empty() || !buffered_data_.empty() ||
write_scheduler_.HasReadyStreams() || !connection_metadata_.empty();
}
int GetRemoteWindowSize() const override { return connection_send_window_; }
@@ -246,15 +246,25 @@
void SendWindowUpdate(Http2StreamId stream_id, size_t update_delta);
- // Sends queued frames, returning true if all frames were flushed
- // successfully.
- bool SendQueuedFrames();
+ enum class SendResult {
+ // All data was flushed.
+ SEND_OK,
+ // Not all data was flushed (due to flow control or TCP back pressure).
+ SEND_BLOCKED,
+ // An error occurred while sending data.
+ SEND_ERROR,
+ };
- // Returns false if the connection is write-blocked (due to flow control or
- // some other reason).
- bool WriteForStream(Http2StreamId stream_id);
+ // Sends the buffered connection preface or serialized frame data, if any.
+ SendResult MaybeSendBufferedData();
- bool SendMetadata(Http2StreamId stream_id, MetadataSequence& sequence);
+ // Serializes and sends queued frames.
+ SendResult SendQueuedFrames();
+
+ // Writes DATA frames for stream `stream_id`.
+ SendResult WriteForStream(Http2StreamId stream_id);
+
+ SendResult SendMetadata(Http2StreamId stream_id, MetadataSequence& sequence);
void SendHeaders(Http2StreamId stream_id, spdy::SpdyHeaderBlock headers,
bool end_stream);
@@ -317,10 +327,11 @@
// `max_outbound_concurrent_streams_`.
std::list<PendingStreamState> pending_streams_;
- // Maintains the queue of outbound frames, and any serialized bytes that have
- // not yet been consumed.
+ // The queue of outbound frames.
std::list<std::unique_ptr<spdy::SpdyFrameIR>> frames_;
- std::string serialized_prefix_;
+ // Buffered data (connection preface, serialized frames) that has not yet been
+ // sent.
+ std::string buffered_data_;
// Maintains the set of streams ready to write data to the peer.
using WriteScheduler = PriorityWriteScheduler<Http2StreamId>;