Introduce a stream ID watermark for OgHttp2Session.

This CL introduces OgHttp2Session::highest_processed_stream_id_, updated when a
new stream is added to the stream map. Then OgHttp2Session uses the watermark
to validate new stream IDs; if a client sends a new stream ID not greater than
the watermark, OgHttp2Session latches an error and notifies the visitor (based
on go/http2spec#StreamIdentifiers).

PiperOrigin-RevId: 403102758
diff --git a/http2/adapter/http2_util.cc b/http2/adapter/http2_util.cc
index 17c3ce8..e450490 100644
--- a/http2/adapter/http2_util.cc
+++ b/http2/adapter/http2_util.cc
@@ -84,6 +84,8 @@
       return "ParseError";
     case ConnectionError::kHeaderError:
       return "HeaderError";
+    case ConnectionError::kInvalidNewStreamId:
+      return "InvalidNewStreamId";
   }
 }
 
diff --git a/http2/adapter/http2_visitor_interface.h b/http2/adapter/http2_visitor_interface.h
index f955bfb..4900a6a 100644
--- a/http2/adapter/http2_visitor_interface.h
+++ b/http2/adapter/http2_visitor_interface.h
@@ -70,6 +70,8 @@
     kParseError,
     // The visitor considered a received header to be a connection error.
     kHeaderError,
+    // The peer attempted to open a stream with an invalid stream ID.
+    kInvalidNewStreamId,
   };
   virtual void OnConnectionError(ConnectionError error) = 0;
 
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 8fc772d..f698d8d 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -2366,6 +2366,69 @@
   EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS}));
 }
 
+TEST(NgHttp2AdapterTest, ServerDropsNewStreamBelowWatermark) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
+
+  EXPECT_EQ(0, adapter->GetHighestReceivedStreamId());
+
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(3,
+                                          {{":method", "POST"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/one"}},
+                                          /*fin=*/false)
+                                 .Data(3, "This is the request body.")
+                                 .Headers(1,
+                                          {{":method", "GET"},
+                                           {":scheme", "http"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/two"}},
+                                          /*fin=*/true)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(3, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(3));
+  EXPECT_CALL(visitor, OnHeaderForStream(3, ":method", "POST"));
+  EXPECT_CALL(visitor, OnHeaderForStream(3, ":scheme", "https"));
+  EXPECT_CALL(visitor, OnHeaderForStream(3, ":authority", "example.com"));
+  EXPECT_CALL(visitor, OnHeaderForStream(3, ":path", "/this/is/request/one"));
+  EXPECT_CALL(visitor, OnEndHeadersForStream(3));
+  EXPECT_CALL(visitor, OnFrameHeader(3, 25, DATA, 0));
+  EXPECT_CALL(visitor, OnBeginDataForStream(3, 25));
+  EXPECT_CALL(visitor, OnDataForStream(3, "This is the request body."));
+
+  // It looks like nghttp2 delivers the under-watermark frame header but
+  // otherwise silently drops the rest of the frame without error.
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 5));
+  EXPECT_CALL(visitor, OnInvalidFrame).Times(0);
+  EXPECT_CALL(visitor, OnConnectionError).Times(0);
+
+  const int64_t result = adapter->ProcessBytes(frames);
+  EXPECT_EQ(frames.size(), result);
+
+  EXPECT_EQ(3, adapter->GetHighestReceivedStreamId());
+
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+  int send_result = adapter->Send();
+  // Some bytes should have been serialized.
+  EXPECT_EQ(0, send_result);
+  // SETTINGS ack
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS}));
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace adapter
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 48baba1..e1c540d 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -1678,6 +1678,69 @@
   client_adapter->Send();
 }
 
+TEST(OgHttp2AdapterServerTest, ServerForbidsNewStreamBelowWatermark) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  EXPECT_EQ(0, adapter->GetHighestReceivedStreamId());
+
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(3,
+                                          {{":method", "POST"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/one"}},
+                                          /*fin=*/false)
+                                 .Data(3, "This is the request body.")
+                                 .Headers(1,
+                                          {{":method", "GET"},
+                                           {":scheme", "http"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/two"}},
+                                          /*fin=*/true)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(3, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(3));
+  EXPECT_CALL(visitor, OnHeaderForStream(3, ":method", "POST"));
+  EXPECT_CALL(visitor, OnHeaderForStream(3, ":scheme", "https"));
+  EXPECT_CALL(visitor, OnHeaderForStream(3, ":authority", "example.com"));
+  EXPECT_CALL(visitor, OnHeaderForStream(3, ":path", "/this/is/request/one"));
+  EXPECT_CALL(visitor, OnEndHeadersForStream(3));
+  EXPECT_CALL(visitor, OnFrameHeader(3, 25, DATA, 0));
+  EXPECT_CALL(visitor, OnBeginDataForStream(3, 25));
+  EXPECT_CALL(visitor, OnDataForStream(3, "This is the request body."));
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 5));
+  EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kInvalidNewStreamId));
+
+  const int64_t result = adapter->ProcessBytes(frames);
+  EXPECT_LT(result, 0);
+
+  EXPECT_EQ(3, adapter->GetHighestReceivedStreamId());
+
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+  int send_result = adapter->Send();
+  // Some bytes should have been serialized.
+  EXPECT_EQ(0, send_result);
+  // SETTINGS and SETTINGS ack.
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::SETTINGS}));
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace adapter
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 2784bef..1f8fcf2 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -6,6 +6,7 @@
 #include "absl/memory/memory.h"
 #include "absl/strings/escaping.h"
 #include "http2/adapter/http2_protocol.h"
+#include "http2/adapter/http2_util.h"
 #include "http2/adapter/http2_visitor_interface.h"
 #include "http2/adapter/oghttp2_util.h"
 #include "spdy/core/spdy_protocol.h"
@@ -807,6 +808,12 @@
                                spdy::SpdyStreamId /*parent_stream_id*/,
                                bool /*exclusive*/, bool /*fin*/, bool /*end*/) {
   if (options_.perspective == Perspective::kServer) {
+    const auto new_stream_id = static_cast<Http2StreamId>(stream_id);
+    if (new_stream_id <= highest_processed_stream_id_) {
+      // A new stream ID lower than the watermark is a connection error.
+      LatchErrorAndNotify(ConnectionError::kInvalidNewStreamId);
+      return;
+    }
     CreateStream(stream_id);
   }
 }
@@ -988,6 +995,9 @@
     // Add the stream to the write scheduler.
     const WriteScheduler::StreamPrecedenceType precedence(3);
     write_scheduler_.RegisterStream(stream_id, precedence);
+
+    highest_processed_stream_id_ =
+        std::max(highest_processed_stream_id_, stream_id);
   }
   return iter;
 }
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index e4e376a..af17230 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -4,6 +4,7 @@
 #include <cstdint>
 #include <list>
 
+#include "absl/strings/string_view.h"
 #include "http2/adapter/data_source.h"
 #include "http2/adapter/header_validator.h"
 #include "http2/adapter/http2_protocol.h"
@@ -326,7 +327,11 @@
   MetadataSequence connection_metadata_;
 
   Http2StreamId next_stream_id_ = 1;
+  // The highest received stream ID is the highest stream ID in any frame read
+  // from the peer. The highest processed stream ID is the highest stream ID for
+  // which this endpoint created a stream in the stream map.
   Http2StreamId highest_received_stream_id_ = 0;
+  Http2StreamId highest_processed_stream_id_ = 0;
   Http2StreamId metadata_stream_id_ = 0;
   size_t metadata_length_ = 0;
   int32_t connection_send_window_ = kInitialFlowControlWindowSize;