Factors out `MaybeSendBufferedData()`, and moves the functionality into `SendQueuedFrames()`.

Also defines a `SendResult` enum to represent the effect of a send operation. This required more plumbing than I expected.

PiperOrigin-RevId: 409452834
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 0a037c1..d398b1b 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -403,41 +403,49 @@
   RunOnExit r{[this]() { sending_ = false; }};
 
   MaybeSetupPreface();
-  int64_t result = std::numeric_limits<int64_t>::max();
-  // Flush any serialized prefix.
-  while (result > 0 && !serialized_prefix_.empty()) {
-    result = visitor_.OnReadyToSend(serialized_prefix_);
-    if (result > 0) {
-      serialized_prefix_.erase(0, result);
-    }
-  }
-  if (result < 0) {
-    LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
-                        ConnectionError::kSendError);
-    return result;
-  } else if (!serialized_prefix_.empty()) {
-    return 0;
-  }
 
-  bool continue_writing = SendQueuedFrames();
-  while (continue_writing && !connection_metadata_.empty()) {
+  SendResult continue_writing = SendQueuedFrames();
+  while (continue_writing == SendResult::SEND_OK &&
+         !connection_metadata_.empty()) {
     continue_writing = SendMetadata(0, connection_metadata_);
   }
   // Wake streams for writes.
-  while (continue_writing && write_scheduler_.HasReadyStreams() &&
-         connection_send_window_ > 0) {
+  while (continue_writing == SendResult::SEND_OK &&
+         write_scheduler_.HasReadyStreams() && connection_send_window_ > 0) {
     const Http2StreamId stream_id = write_scheduler_.PopNextReadyStream();
     // TODO(birenroy): Add a return value to indicate write blockage, so streams
     // aren't woken unnecessarily.
     continue_writing = WriteForStream(stream_id);
   }
-  if (continue_writing) {
-    SendQueuedFrames();
+  if (continue_writing == SendResult::SEND_OK) {
+    continue_writing = SendQueuedFrames();
   }
-  return 0;
+  return continue_writing == SendResult::SEND_ERROR ? -1 : 0;
 }
 
-bool OgHttp2Session::SendQueuedFrames() {
+OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() {
+  int64_t result = std::numeric_limits<int64_t>::max();
+  while (result > 0 && !buffered_data_.empty()) {
+    result = visitor_.OnReadyToSend(buffered_data_);
+    if (result > 0) {
+      buffered_data_.erase(0, result);
+    }
+  }
+  if (result < 0) {
+    LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
+                        ConnectionError::kSendError);
+    return SendResult::SEND_ERROR;
+  }
+  return buffered_data_.empty() ? SendResult::SEND_OK
+                                : SendResult::SEND_BLOCKED;
+}
+
+OgHttp2Session::SendResult OgHttp2Session::SendQueuedFrames() {
+  // Flush any serialized prefix.
+  const SendResult result = MaybeSendBufferedData();
+  if (result != SendResult::SEND_OK) {
+    return result;
+  }
   // Serialize and send frames in the queue.
   while (!frames_.empty()) {
     const auto& frame_ptr = frames_.front();
@@ -454,10 +462,10 @@
     if (result < 0) {
       LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
                           ConnectionError::kSendError);
-      return false;
+      return SendResult::SEND_ERROR;
     } else if (result == 0) {
       // Write blocked.
-      return false;
+      return SendResult::SEND_BLOCKED;
     } else {
       visitor_.OnFrameSent(c.frame_type(), c.stream_id(), frame_payload_length,
                            c.flags(), c.error_code());
@@ -471,23 +479,24 @@
       frames_.pop_front();
       if (static_cast<size_t>(result) < frame.size()) {
         // The frame was partially written, so the rest must be buffered.
-        serialized_prefix_.append(frame.data() + result, frame.size() - result);
-        return false;
+        buffered_data_.append(frame.data() + result, frame.size() - result);
+        return SendResult::SEND_BLOCKED;
       }
     }
   }
-  return true;
+  return SendResult::SEND_OK;
 }
 
-bool OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
+OgHttp2Session::SendResult OgHttp2Session::WriteForStream(
+    Http2StreamId stream_id) {
   auto it = stream_map_.find(stream_id);
   if (it == stream_map_.end()) {
     QUICHE_LOG(ERROR) << "Can't find stream " << stream_id
                       << " which is ready to write!";
-    return true;
+    return SendResult::SEND_OK;
   }
   StreamState& state = it->second;
-  bool connection_can_write = true;
+  SendResult connection_can_write = SendResult::SEND_OK;
   if (!state.outbound_metadata.empty()) {
     connection_can_write = SendMetadata(stream_id, state.outbound_metadata);
   }
@@ -503,12 +512,12 @@
         MaybeCloseWithRstStream(stream_id, state);
       }
     }
-    return true;
+    return SendResult::SEND_OK;
   }
   int32_t available_window =
       std::min({connection_send_window_, state.send_window,
                 static_cast<int32_t>(max_frame_payload_)});
-  while (connection_can_write && available_window > 0 &&
+  while (connection_can_write == SendResult::SEND_OK && available_window > 0 &&
          state.outbound_body != nullptr) {
     int64_t length;
     bool end_data;
@@ -530,11 +539,11 @@
       spdy::SpdySerializedFrame header =
           spdy::SpdyFramer::SerializeDataFrameHeaderWithPaddingLengthField(
               data);
-      QUICHE_DCHECK(serialized_prefix_.empty() && frames_.empty());
+      QUICHE_DCHECK(buffered_data_.empty() && frames_.empty());
       const bool success =
           state.outbound_body->Send(absl::string_view(header), length);
       if (!success) {
-        connection_can_write = false;
+        connection_can_write = SendResult::SEND_BLOCKED;
         break;
       }
       visitor_.OnFrameSent(/* DATA */ 0, stream_id, length, fin ? 0x1 : 0x0, 0);
@@ -571,11 +580,14 @@
   }
   // Streams can continue writing as long as the connection is not write-blocked
   // and there is additional flow control quota available.
-  return connection_can_write && available_window > 0;
+  if (connection_can_write != SendResult::SEND_OK) {
+    return connection_can_write;
+  }
+  return available_window <= 0 ? SendResult::SEND_BLOCKED : SendResult::SEND_OK;
 }
 
-bool OgHttp2Session::SendMetadata(Http2StreamId stream_id,
-                                  OgHttp2Session::MetadataSequence& sequence) {
+OgHttp2Session::SendResult OgHttp2Session::SendMetadata(
+    Http2StreamId stream_id, OgHttp2Session::MetadataSequence& sequence) {
   const uint32_t max_payload_size =
       std::min(kMaxAllowedMetadataFrameSize, max_frame_payload_);
   auto payload_buffer = absl::make_unique<uint8_t[]>(max_payload_size);
@@ -588,7 +600,7 @@
         source.Pack(payload_buffer.get(), max_payload_size);
     if (written < 0) {
       // Did not touch the connection, so perhaps writes are still possible.
-      return true;
+      return SendResult::SEND_OK;
     }
     QUICHE_DCHECK_LE(static_cast<size_t>(written), max_payload_size);
     auto payload = absl::string_view(
@@ -1006,8 +1018,8 @@
 void OgHttp2Session::MaybeSetupPreface() {
   if (!queued_preface_) {
     if (options_.perspective == Perspective::kClient) {
-      serialized_prefix_.assign(spdy::kHttp2ConnectionHeaderPrefix,
-                                spdy::kHttp2ConnectionHeaderPrefixSize);
+      buffered_data_.assign(spdy::kHttp2ConnectionHeaderPrefix,
+                            spdy::kHttp2ConnectionHeaderPrefixSize);
     }
     // First frame must be a non-ack SETTINGS.
     if (frames_.empty() ||
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 6c844d8..88cdb7d 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -101,7 +101,7 @@
     return !received_goaway_ && !decoder_.HasError();
   }
   bool want_write() const override {
-    return !frames_.empty() || !serialized_prefix_.empty() ||
+    return !frames_.empty() || !buffered_data_.empty() ||
            write_scheduler_.HasReadyStreams() || !connection_metadata_.empty();
   }
   int GetRemoteWindowSize() const override { return connection_send_window_; }
@@ -246,15 +246,25 @@
 
   void SendWindowUpdate(Http2StreamId stream_id, size_t update_delta);
 
-  // Sends queued frames, returning true if all frames were flushed
-  // successfully.
-  bool SendQueuedFrames();
+  enum class SendResult {
+    // All data was flushed.
+    SEND_OK,
+    // Not all data was flushed (due to flow control or TCP back pressure).
+    SEND_BLOCKED,
+    // An error occurred while sending data.
+    SEND_ERROR,
+  };
 
-  // Returns false if the connection is write-blocked (due to flow control or
-  // some other reason).
-  bool WriteForStream(Http2StreamId stream_id);
+  // Sends the buffered connection preface or serialized frame data, if any.
+  SendResult MaybeSendBufferedData();
 
-  bool SendMetadata(Http2StreamId stream_id, MetadataSequence& sequence);
+  // Serializes and sends queued frames.
+  SendResult SendQueuedFrames();
+
+  // Writes DATA frames for stream `stream_id`.
+  SendResult WriteForStream(Http2StreamId stream_id);
+
+  SendResult SendMetadata(Http2StreamId stream_id, MetadataSequence& sequence);
 
   void SendHeaders(Http2StreamId stream_id, spdy::SpdyHeaderBlock headers,
                    bool end_stream);
@@ -317,10 +327,11 @@
   // `max_outbound_concurrent_streams_`.
   std::list<PendingStreamState> pending_streams_;
 
-  // Maintains the queue of outbound frames, and any serialized bytes that have
-  // not yet been consumed.
+  // The queue of outbound frames.
   std::list<std::unique_ptr<spdy::SpdyFrameIR>> frames_;
-  std::string serialized_prefix_;
+  // Buffered data (connection preface, serialized frames) that has not yet been
+  // sent.
+  std::string buffered_data_;
 
   // Maintains the set of streams ready to write data to the peer.
   using WriteScheduler = PriorityWriteScheduler<Http2StreamId>;