OgHttp2Session refactoring.

* Delegates SubmitRequest/SubmitResponse to internal implementations.
* Factors out some named methods for stream state operations related to message body handling.

These named methods may become more complex in the future, so this is preparatory refactoring.

PiperOrigin-RevId: 624309802
diff --git a/quiche/http2/adapter/oghttp2_session.cc b/quiche/http2/adapter/oghttp2_session.cc
index 235b0fa..e5d8375 100644
--- a/quiche/http2/adapter/oghttp2_session.cc
+++ b/quiche/http2/adapter/oghttp2_session.cc
@@ -407,7 +407,7 @@
 
 bool OgHttp2Session::ResumeStream(Http2StreamId stream_id) {
   auto it = stream_map_.find(stream_id);
-  if (it == stream_map_.end() || it->second.outbound_body == nullptr ||
+  if (it == stream_map_.end() || !HasMoreData(it->second) ||
       !write_scheduler_.StreamRegistered(stream_id)) {
     return false;
   }
@@ -652,6 +652,45 @@
   return write_scheduler_.PopNextReadyStream();
 }
 
+int32_t OgHttp2Session::SubmitRequestInternal(
+    absl::Span<const Header> headers,
+    std::unique_ptr<DataFrameSource> data_source, void* user_data) {
+  // TODO(birenroy): return an error for the incorrect perspective
+  const Http2StreamId stream_id = next_stream_id_;
+  next_stream_id_ += 2;
+  if (!pending_streams_.empty() || !CanCreateStream()) {
+    // TODO(diannahu): There should probably be a limit to the number of allowed
+    // pending streams.
+    pending_streams_.insert(
+        {stream_id, PendingStreamState{ToHeaderBlock(headers),
+                                       std::move(data_source), user_data}});
+    StartPendingStreams();
+  } else {
+    StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
+                 user_data);
+  }
+  return stream_id;
+}
+
+int OgHttp2Session::SubmitResponseInternal(
+    Http2StreamId stream_id, absl::Span<const Header> headers,
+    std::unique_ptr<DataFrameSource> data_source) {
+  // TODO(birenroy): return an error for the incorrect perspective
+  auto iter = stream_map_.find(stream_id);
+  if (iter == stream_map_.end()) {
+    QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
+    return -501;  // NGHTTP2_ERR_INVALID_ARGUMENT
+  }
+  const bool end_stream = data_source == nullptr;
+  if (!end_stream) {
+    // Add data source to stream state
+    iter->second.outbound_body = std::move(data_source);
+    write_scheduler_.MarkStreamReady(stream_id, false);
+  }
+  SendHeaders(stream_id, ToHeaderBlock(headers), end_stream);
+  return 0;
+}
+
 OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() {
   int64_t result = std::numeric_limits<int64_t>::max();
   while (result > 0 && !buffered_data_.empty()) {
@@ -797,19 +836,19 @@
   if (reset_it != streams_reset_.end()) {
     // The stream has been reset; there's no point in sending DATA or trailing
     // HEADERS.
-    state.outbound_body = nullptr;
+    AbandonData(state);
     state.trailers = nullptr;
     return SendResult::SEND_OK;
   }
 
   SendResult connection_can_write = SendResult::SEND_OK;
-  if (state.outbound_body == nullptr || state.data_deferred) {
+  if (!IsReadyToWriteData(state)) {
     // No data to send, but there might be trailers.
     if (state.trailers != nullptr) {
       // Trailers will include END_STREAM, so the data source can be discarded.
       // Since data_deferred is true, there is no data waiting to be flushed for
       // this stream.
-      state.outbound_body = nullptr;
+      AbandonData(state);
       auto block_ptr = std::move(state.trailers);
       if (state.half_closed_local) {
         QUICHE_LOG(ERROR) << "Sent fin; can't send trailers.";
@@ -827,7 +866,7 @@
       std::min({connection_send_window_, state.send_window,
                 static_cast<int32_t>(max_frame_payload_)});
   while (connection_can_write == SendResult::SEND_OK && available_window > 0 &&
-         state.outbound_body != nullptr && !state.data_deferred) {
+         IsReadyToWriteData(state)) {
     DataFrameInfo info = GetDataFrameInfo(stream_id, available_window, state);
     QUICHE_VLOG(2) << "WriteForStream | length: " << info.payload_length
                    << " end_data: " << info.end_data
@@ -900,13 +939,13 @@
           SendTrailers(stream_id, std::move(*block_ptr));
         }
       }
-      state.outbound_body = nullptr;
+      AbandonData(state);
     }
   }
   // If the stream still exists and has data to send, it should be marked as
   // ready in the write scheduler.
   if (stream_map_.contains(stream_id) && !state.data_deferred &&
-      state.send_window > 0 && state.outbound_body != nullptr) {
+      state.send_window > 0 && HasMoreData(state)) {
     write_scheduler_.MarkStreamReady(stream_id, false);
   }
   // Streams can continue writing as long as the connection is not write-blocked
@@ -946,40 +985,13 @@
 int32_t OgHttp2Session::SubmitRequest(
     absl::Span<const Header> headers,
     std::unique_ptr<DataFrameSource> data_source, void* user_data) {
-  // TODO(birenroy): return an error for the incorrect perspective
-  const Http2StreamId stream_id = next_stream_id_;
-  next_stream_id_ += 2;
-  if (!pending_streams_.empty() || !CanCreateStream()) {
-    // TODO(diannahu): There should probably be a limit to the number of allowed
-    // pending streams.
-    pending_streams_.insert(
-        {stream_id, PendingStreamState{ToHeaderBlock(headers),
-                                       std::move(data_source), user_data}});
-    StartPendingStreams();
-  } else {
-    StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
-                 user_data);
-  }
-  return stream_id;
+  return SubmitRequestInternal(headers, std::move(data_source), user_data);
 }
 
 int OgHttp2Session::SubmitResponse(
     Http2StreamId stream_id, absl::Span<const Header> headers,
     std::unique_ptr<DataFrameSource> data_source) {
-  // TODO(birenroy): return an error for the incorrect perspective
-  auto iter = stream_map_.find(stream_id);
-  if (iter == stream_map_.end()) {
-    QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
-    return -501;  // NGHTTP2_ERR_INVALID_ARGUMENT
-  }
-  const bool end_stream = data_source == nullptr;
-  if (!end_stream) {
-    // Add data source to stream state
-    iter->second.outbound_body = std::move(data_source);
-    write_scheduler_.MarkStreamReady(stream_id, false);
-  }
-  SendHeaders(stream_id, ToHeaderBlock(headers), end_stream);
-  return 0;
+  return SubmitResponseInternal(stream_id, headers, std::move(data_source));
 }
 
 int OgHttp2Session::SubmitTrailer(Http2StreamId stream_id,
@@ -1000,7 +1012,7 @@
                       << " already has trailers queued";
     return -514;  // NGHTTP2_ERR_INVALID_STREAM_STATE
   }
-  if (state.outbound_body == nullptr) {
+  if (!HasMoreData(state)) {
     // Enqueue trailers immediately.
     SendTrailers(stream_id, ToHeaderBlock(trailers));
   } else {
@@ -1215,7 +1227,7 @@
   auto iter = stream_map_.find(stream_id);
   if (iter != stream_map_.end()) {
     iter->second.half_closed_remote = true;
-    iter->second.outbound_body = nullptr;
+    AbandonData(iter->second);
   } else if (static_cast<Http2StreamId>(stream_id) >
              highest_processed_stream_id_) {
     // Receiving RST_STREAM before HEADERS is a connection error.
@@ -2039,6 +2051,18 @@
   }
 }
 
+bool OgHttp2Session::HasMoreData(const StreamState& stream_state) const {
+  return stream_state.outbound_body != nullptr;
+}
+
+bool OgHttp2Session::IsReadyToWriteData(const StreamState& stream_state) const {
+  return stream_state.outbound_body != nullptr && !stream_state.data_deferred;
+}
+
+void OgHttp2Session::AbandonData(StreamState& stream_state) {
+  stream_state.outbound_body = nullptr;
+}
+
 OgHttp2Session::DataFrameInfo OgHttp2Session::GetDataFrameInfo(
     Http2StreamId /*stream_id*/, size_t flow_control_available,
     StreamState& stream_state) {
diff --git a/quiche/http2/adapter/oghttp2_session.h b/quiche/http2/adapter/oghttp2_session.h
index 580bef7..006383f 100644
--- a/quiche/http2/adapter/oghttp2_session.h
+++ b/quiche/http2/adapter/oghttp2_session.h
@@ -365,6 +365,13 @@
   // streams, returns zero.
   Http2StreamId GetNextReadyStream();
 
+  int32_t SubmitRequestInternal(absl::Span<const Header> headers,
+                                std::unique_ptr<DataFrameSource> data_source,
+                                void* user_data);
+  int SubmitResponseInternal(Http2StreamId stream_id,
+                             absl::Span<const Header> headers,
+                             std::unique_ptr<DataFrameSource> data_source);
+
   // Sends the buffered connection preface or serialized frame data, if any.
   SendResult MaybeSendBufferedData();
 
@@ -447,6 +454,17 @@
   // initial window.
   void UpdateStreamReceiveWindowSizes(uint32_t new_value);
 
+  // Returns true if the given stream has additional data to write before
+  // trailers or the end of the stream.
+  bool HasMoreData(const StreamState& stream_state) const;
+
+  // Returns true if the given stream has data ready to write. Trailers are
+  // considered separately.
+  bool IsReadyToWriteData(const StreamState& stream_state) const;
+
+  // Abandons any remaining data, e.g. on stream reset.
+  void AbandonData(StreamState& stream_state);
+
   // Gathers information required to construct a DATA frame header.
   struct DataFrameInfo {
     int64_t payload_length;