Adds recursion guards to OgHttp2Session to prevent re-entrant calls to Send() or ProcessBytes() from doing the wrong thing.

PiperOrigin-RevId: 402923615
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 544ae23..8070994 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -1626,6 +1626,68 @@
                                             spdy::SpdyFrameType::SETTINGS}));
 }
 
+// Exercises a naive mutually recursive test client and server. This test fails
+// without recursion guards in OgHttp2Session.
+TEST(OgHttp2AdapterInteractionTest, ClientServerInteractionTest) {
+  MockHttp2Visitor client_visitor;
+  auto client_adapter =
+      OgHttp2Adapter::Create(client_visitor, {Perspective::kClient});
+  MockHttp2Visitor server_visitor;
+  auto server_adapter =
+      OgHttp2Adapter::Create(server_visitor, {Perspective::kServer});
+
+  // Feeds bytes sent from the client into the server's ProcessBytes.
+  EXPECT_CALL(client_visitor, OnReadyToSend(_))
+      .WillRepeatedly(
+          testing::Invoke(server_adapter.get(), &OgHttp2Adapter::ProcessBytes));
+  // Feeds bytes sent from the server into the client's ProcessBytes.
+  EXPECT_CALL(server_visitor, OnReadyToSend(_))
+      .WillRepeatedly(
+          testing::Invoke(client_adapter.get(), &OgHttp2Adapter::ProcessBytes));
+  // Sets up the server to respond automatically to a request from a client.
+  EXPECT_CALL(server_visitor, OnEndHeadersForStream(_))
+      .WillRepeatedly([&server_adapter](Http2StreamId stream_id) {
+        server_adapter->SubmitResponse(
+            stream_id, ToHeaders({{":status", "200"}}), nullptr);
+        server_adapter->Send();
+        return true;
+      });
+  // Sets up the client to create a new stream automatically when receiving a
+  // response.
+  EXPECT_CALL(client_visitor, OnEndHeadersForStream(_))
+      .WillRepeatedly([&client_adapter,
+                       &client_visitor](Http2StreamId stream_id) {
+        if (stream_id < 10) {
+          const Http2StreamId new_stream_id = stream_id + 2;
+          auto body =
+              absl::make_unique<TestDataFrameSource>(client_visitor, true);
+          body->AppendPayload("This is an example request body.");
+          body->EndData();
+          const int created_stream_id = client_adapter->SubmitRequest(
+              ToHeaders({{":method", "GET"},
+                         {":scheme", "http"},
+                         {":authority", "example.com"},
+                         {":path",
+                          absl::StrCat("/this/is/request/", new_stream_id)}}),
+              std::move(body), nullptr);
+          EXPECT_EQ(new_stream_id, created_stream_id);
+          client_adapter->Send();
+        }
+        return true;
+      });
+
+  // Submit a request to ensure the first stream is created.
+  int stream_id = client_adapter->SubmitRequest(
+      ToHeaders({{":method", "POST"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}}),
+      nullptr, nullptr);
+  EXPECT_EQ(stream_id, 1);
+
+  client_adapter->Send();
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace adapter
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index adcc7b5..607cda4 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -131,6 +131,20 @@
   }
 }
 
+class RunOnExit {
+ public:
+  explicit RunOnExit(std::function<void()> f) : f_(std::move(f)) {}
+  ~RunOnExit() {
+    if (f_) {
+      f_();
+    }
+    f_ = {};
+  }
+
+ private:
+  std::function<void()> f_;
+};
+
 }  // namespace
 
 void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockStart() {
@@ -267,6 +281,15 @@
 }
 
 int64_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
+  QUICHE_VLOG(2) << TracePerspectiveAsString(options_.perspective)
+                 << " processing [" << absl::CEscape(bytes) << "]";
+  if (processing_bytes_) {
+    QUICHE_VLOG(1) << "Returning early; already processing bytes.";
+    return 0;
+  }
+  processing_bytes_ = true;
+  RunOnExit r{[this]() { processing_bytes_ = false; }};
+
   int64_t preface_consumed = 0;
   if (!remaining_preface_.empty()) {
     QUICHE_VLOG(2) << "Preface bytes remaining: " << remaining_preface_.size();
@@ -338,6 +361,14 @@
 }
 
 int OgHttp2Session::Send() {
+  if (sending_) {
+    QUICHE_VLOG(1) << TracePerspectiveAsString(options_.perspective)
+                   << " returning early; already sending.";
+    return 0;
+  }
+  sending_ = true;
+  RunOnExit r{[this]() { sending_ = false; }};
+
   MaybeSetupPreface();
   int64_t result = std::numeric_limits<int64_t>::max();
   // Flush any serialized prefix.
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 7f60302..e4e376a 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -341,6 +341,11 @@
   bool peer_supports_metadata_ = false;
   bool end_metadata_ = false;
 
+  // Recursion guard for ProcessBytes().
+  bool processing_bytes_ = false;
+  // Recursion guard for Send().
+  bool sending_ = false;
+
   // Replace this with a stream ID, for multiple GOAWAY support.
   bool queued_goaway_ = false;
   bool latched_error_ = false;