Adds methods to retrieve the receive window size for the connection or a particular stream.
Also renames GetPeerConnectionWindow() to GetSendWindowSize(), and implements proper receive-side flow control accounting in OgHttp2Session.
PiperOrigin-RevId: 378752864
diff --git a/http2/adapter/http2_adapter.h b/http2/adapter/http2_adapter.h
index 2dd7d3f..e53918c 100644
--- a/http2/adapter/http2_adapter.h
+++ b/http2/adapter/http2_adapter.h
@@ -69,8 +69,16 @@
// Invokes the visitor's OnReadyToSend() method for serialized frame data.
virtual void Send() = 0;
- // Returns the connection-level flow control window for the peer.
- virtual int GetPeerConnectionWindow() const = 0;
+ // Returns the connection-level flow control window advertised by the peer.
+ virtual int GetSendWindowSize() const = 0;
+
+ // Returns the amount of data a peer could send on a given stream. This is
+ // the outstanding stream receive window.
+ virtual int GetStreamReceiveWindowSize(Http2StreamId stream_id) const = 0;
+
+ // Returns the total amount of data a peer could send on the connection. This
+ // is the outstanding connection receive window.
+ virtual int GetReceiveWindowSize() const = 0;
// Gets the highest stream ID value seen in a frame received by this endpoint.
// This method is only guaranteed to work for server endpoints.
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index 6ec2a14..d4e3365 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -102,10 +102,19 @@
}
}
-int NgHttp2Adapter::GetPeerConnectionWindow() const {
+int NgHttp2Adapter::GetSendWindowSize() const {
return session_->GetRemoteWindowSize();
}
+int NgHttp2Adapter::GetStreamReceiveWindowSize(Http2StreamId stream_id) const {
+ return nghttp2_session_get_stream_local_window_size(session_->raw_ptr(),
+ stream_id);
+}
+
+int NgHttp2Adapter::GetReceiveWindowSize() const {
+ return nghttp2_session_get_local_window_size(session_->raw_ptr());
+}
+
Http2StreamId NgHttp2Adapter::GetHighestReceivedStreamId() const {
return nghttp2_session_get_last_proc_stream_id(session_->raw_ptr());
}
diff --git a/http2/adapter/nghttp2_adapter.h b/http2/adapter/nghttp2_adapter.h
index a3f32df..2fdf3ee 100644
--- a/http2/adapter/nghttp2_adapter.h
+++ b/http2/adapter/nghttp2_adapter.h
@@ -49,7 +49,9 @@
void Send() override;
- int GetPeerConnectionWindow() const override;
+ int GetSendWindowSize() const override;
+ int GetStreamReceiveWindowSize(Http2StreamId stream_id) const override;
+ int GetReceiveWindowSize() const override;
Http2StreamId GetHighestReceivedStreamId() const override;
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 38f2301..df5d186 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -84,8 +84,7 @@
const ssize_t initial_result = adapter->ProcessBytes(initial_frames);
EXPECT_EQ(initial_frames.size(), initial_result);
- EXPECT_EQ(adapter->GetPeerConnectionWindow(),
- kInitialFlowControlWindowSize + 1000);
+ EXPECT_EQ(adapter->GetSendWindowSize(), kInitialFlowControlWindowSize + 1000);
// Some bytes should have been serialized.
adapter->Send();
EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
@@ -143,6 +142,18 @@
spdy::SpdyFrameType::HEADERS}));
visitor.Clear();
+ // All streams are active and have not yet received any data, so the receive
+ // window should be at the initial value.
+ EXPECT_EQ(kInitialFlowControlWindowSize,
+ adapter->GetStreamReceiveWindowSize(stream_id1));
+ EXPECT_EQ(kInitialFlowControlWindowSize,
+ adapter->GetStreamReceiveWindowSize(stream_id2));
+ EXPECT_EQ(kInitialFlowControlWindowSize,
+ adapter->GetStreamReceiveWindowSize(stream_id3));
+
+ // Connection has not yet received any data.
+ EXPECT_EQ(kInitialFlowControlWindowSize, adapter->GetReceiveWindowSize());
+
EXPECT_EQ(0, adapter->GetHighestReceivedStreamId());
EXPECT_EQ(kSentinel1, adapter->GetStreamUserData(stream_id1));
@@ -180,6 +191,19 @@
const ssize_t stream_result = adapter->ProcessBytes(stream_frames);
EXPECT_EQ(stream_frames.size(), stream_result);
+ // First stream has received some data.
+ EXPECT_GT(kInitialFlowControlWindowSize,
+ adapter->GetStreamReceiveWindowSize(stream_id1));
+ // Second stream was closed.
+ EXPECT_EQ(-1, adapter->GetStreamReceiveWindowSize(stream_id2));
+ // Third stream has not received any data.
+ EXPECT_EQ(kInitialFlowControlWindowSize,
+ adapter->GetStreamReceiveWindowSize(stream_id3));
+
+ // Connection window should be the same as the first stream.
+ EXPECT_EQ(adapter->GetReceiveWindowSize(),
+ adapter->GetStreamReceiveWindowSize(stream_id1));
+
// Should be 3, but this method only works for server adapters.
EXPECT_EQ(0, adapter->GetHighestReceivedStreamId());
@@ -250,6 +274,11 @@
EXPECT_GT(stream_id, 0);
EXPECT_TRUE(adapter->session().want_write());
adapter->Send();
+
+ EXPECT_EQ(kInitialFlowControlWindowSize,
+ adapter->GetStreamReceiveWindowSize(stream_id));
+ EXPECT_EQ(kInitialFlowControlWindowSize, adapter->GetReceiveWindowSize());
+
EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS,
spdy::SpdyFrameType::DATA}));
EXPECT_THAT(visitor.data(), testing::HasSubstr(kBody));
@@ -489,6 +518,11 @@
EXPECT_EQ(kSentinel1, adapter->GetStreamUserData(1));
+ EXPECT_GT(kInitialFlowControlWindowSize,
+ adapter->GetStreamReceiveWindowSize(1));
+ EXPECT_EQ(adapter->GetStreamReceiveWindowSize(1),
+ adapter->GetReceiveWindowSize());
+
// Because stream 3 has already been closed, it's not possible to set user
// data.
const char* kSentinel3 = "another arbitrary pointer";
@@ -497,8 +531,7 @@
EXPECT_EQ(3, adapter->GetHighestReceivedStreamId());
- EXPECT_EQ(adapter->GetPeerConnectionWindow(),
- kInitialFlowControlWindowSize + 1000);
+ EXPECT_EQ(adapter->GetSendWindowSize(), kInitialFlowControlWindowSize + 1000);
EXPECT_TRUE(adapter->session().want_write());
// Some bytes should have been serialized.
diff --git a/http2/adapter/oghttp2_adapter.cc b/http2/adapter/oghttp2_adapter.cc
index 9f57aec..11aefb2 100644
--- a/http2/adapter/oghttp2_adapter.cc
+++ b/http2/adapter/oghttp2_adapter.cc
@@ -83,10 +83,18 @@
session_->Send();
}
-int OgHttp2Adapter::GetPeerConnectionWindow() const {
+int OgHttp2Adapter::GetSendWindowSize() const {
return session_->GetRemoteWindowSize();
}
+int OgHttp2Adapter::GetStreamReceiveWindowSize(Http2StreamId stream_id) const {
+ return session_->GetStreamReceiveWindowSize(stream_id);
+}
+
+int OgHttp2Adapter::GetReceiveWindowSize() const {
+ return session_->GetReceiveWindowSize();
+}
+
Http2StreamId OgHttp2Adapter::GetHighestReceivedStreamId() const {
return session_->GetHighestReceivedStreamId();
}
diff --git a/http2/adapter/oghttp2_adapter.h b/http2/adapter/oghttp2_adapter.h
index bbdd2a4..477a6e0 100644
--- a/http2/adapter/oghttp2_adapter.h
+++ b/http2/adapter/oghttp2_adapter.h
@@ -35,7 +35,9 @@
int window_increment) override;
void SubmitMetadata(Http2StreamId stream_id, bool fin) override;
void Send() override;
- int GetPeerConnectionWindow() const override;
+ int GetSendWindowSize() const override;
+ int GetStreamReceiveWindowSize(Http2StreamId stream_id) const override;
+ int GetReceiveWindowSize() const override;
Http2StreamId GetHighestReceivedStreamId() const override;
void MarkDataConsumedForStream(Http2StreamId stream_id,
size_t num_bytes) override;
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index bee8640..1675bda 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -43,9 +43,9 @@
EXPECT_QUICHE_BUG(adapter_->SubmitMetadata(3, true), "Not implemented");
}
-TEST_F(OgHttp2AdapterTest, GetPeerConnectionWindow) {
- const int peer_window = adapter_->GetPeerConnectionWindow();
- EXPECT_GT(peer_window, 0);
+TEST_F(OgHttp2AdapterTest, GetSendWindowSize) {
+ const int peer_window = adapter_->GetSendWindowSize();
+ EXPECT_EQ(peer_window, kInitialFlowControlWindowSize);
}
TEST_F(OgHttp2AdapterTest, MarkDataConsumedForStream) {
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 5927914..853c374 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -23,7 +23,14 @@
}
OgHttp2Session::OgHttp2Session(Http2VisitorInterface& visitor, Options options)
- : visitor_(visitor), headers_handler_(visitor), options_(options) {
+ : visitor_(visitor),
+ headers_handler_(visitor),
+ connection_window_manager_(kInitialFlowControlWindowSize,
+ [this](size_t window_update_delta) {
+ SendWindowUpdate(kConnectionStreamId,
+ window_update_delta);
+ }),
+ options_(options) {
decoder_.set_visitor(this);
if (options_.perspective == Perspective::kServer) {
remaining_preface_ = {spdy::kHttp2ConnectionHeaderPrefix,
@@ -59,6 +66,18 @@
return true;
}
+int OgHttp2Session::GetStreamReceiveWindowSize(Http2StreamId stream_id) const {
+ auto it = stream_map_.find(stream_id);
+ if (it != stream_map_.end()) {
+ return it->second.window_manager.CurrentWindowSize();
+ }
+ return -1;
+}
+
+int OgHttp2Session::GetReceiveWindowSize() const {
+ return connection_window_manager_.CurrentWindowSize();
+}
+
ssize_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
ssize_t preface_consumed = 0;
if (!remaining_preface_.empty()) {
@@ -95,6 +114,7 @@
} else {
it->second.window_manager.MarkDataFlushed(num_bytes);
}
+ connection_window_manager_.MarkDataFlushed(num_bytes);
return 0; // Remove?
}
@@ -369,6 +389,7 @@
void OgHttp2Session::OnStreamFrameData(spdy::SpdyStreamId stream_id,
const char* data,
size_t len) {
+ MarkDataBuffered(stream_id, len);
visitor_.OnDataForStream(stream_id, absl::string_view(data, len));
}
@@ -380,13 +401,15 @@
visitor_.OnEndStream(stream_id);
}
-void OgHttp2Session::OnStreamPadLength(spdy::SpdyStreamId /*stream_id*/,
- size_t /*value*/) {
- // TODO(181586191): handle padding
+void OgHttp2Session::OnStreamPadLength(spdy::SpdyStreamId stream_id,
+ size_t value) {
+ MarkDataBuffered(stream_id, 1 + value);
+ // TODO(181586191): Pass padding to the visitor?
}
void OgHttp2Session::OnStreamPadding(spdy::SpdyStreamId stream_id, size_t len) {
- // TODO(181586191): handle padding
+ // Flow control was accounted for in OnStreamPadLength().
+ // TODO(181586191): Pass padding to the visitor?
}
spdy::SpdyHeadersHandlerInterface* OgHttp2Session::OnHeaderFrameStart(
@@ -559,5 +582,12 @@
}
}
+void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) {
+ connection_window_manager_.MarkDataBuffered(bytes);
+ if (auto it = stream_map_.find(stream_id); it != stream_map_.end()) {
+ it->second.window_manager.MarkDataBuffered(bytes);
+ }
+}
+
} // namespace adapter
} // namespace http2
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index e2ab3fd..9634400 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -57,6 +57,13 @@
// Resumes a stream that was previously blocked. Returns true on success.
bool ResumeStream(Http2StreamId stream_id);
+ // Returns the outstanding stream receive window, or -1 if the stream does not
+ // exist.
+ int GetStreamReceiveWindowSize(Http2StreamId stream_id) const;
+
+ // Returns the outstanding connection receive window.
+ int GetReceiveWindowSize() const;
+
// From Http2Session.
ssize_t ProcessBytes(absl::string_view bytes) override;
int Consume(Http2StreamId stream_id, size_t num_bytes) override;
@@ -171,6 +178,9 @@
// Section 8.1.
void MaybeCloseWithRstStream(Http2StreamId stream_id, StreamState& state);
+ // Performs flow control accounting for data sent by the peer.
+ void MarkDataBuffered(Http2StreamId stream_id, size_t bytes);
+
// Receives events when inbound frames are parsed.
Http2VisitorInterface& visitor_;
@@ -199,6 +209,8 @@
// session.
absl::string_view remaining_preface_;
+ WindowManager connection_window_manager_;
+
Http2StreamId next_stream_id_ = 1;
Http2StreamId highest_received_stream_id_ = 0;
int connection_send_window_ = kInitialFlowControlWindowSize;
diff --git a/http2/adapter/oghttp2_session_test.cc b/http2/adapter/oghttp2_session_test.cc
index 3869db1..cdaacdc 100644
--- a/http2/adapter/oghttp2_session_test.cc
+++ b/http2/adapter/oghttp2_session_test.cc
@@ -67,30 +67,46 @@
kInitialFlowControlWindowSize + 1000);
EXPECT_EQ(0, session.GetHighestReceivedStreamId());
+ // Connection has not yet received any data.
+ EXPECT_EQ(kInitialFlowControlWindowSize, session.GetReceiveWindowSize());
+
// Should OgHttp2Session require that streams 1 and 3 have been created?
+ // Submit a request to ensure the first stream is created.
+ const char* kSentinel1 = "arbitrary pointer 1";
+ TestDataFrameSource body1(visitor, "This is an example request body.");
+ int stream_id =
+ session.SubmitRequest(ToHeaders({{":method", "POST"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}}),
+ &body1, const_cast<char*>(kSentinel1));
+ EXPECT_EQ(stream_id, 1);
+
const std::string stream_frames =
TestFrameSequence()
- .Headers(1,
+ .Headers(stream_id,
{{":status", "200"},
{"server", "my-fake-server"},
{"date", "Tue, 6 Apr 2021 12:54:01 GMT"}},
/*fin=*/false)
- .Data(1, "This is the response body.")
+ .Data(stream_id, "This is the response body.")
.RstStream(3, Http2ErrorCode::INTERNAL_ERROR)
.GoAway(5, Http2ErrorCode::ENHANCE_YOUR_CALM, "calm down!!")
.Serialize();
- EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 4));
- EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
- EXPECT_CALL(visitor, OnHeaderForStream(1, ":status", "200"));
- EXPECT_CALL(visitor, OnHeaderForStream(1, "server", "my-fake-server"));
+ EXPECT_CALL(visitor, OnFrameHeader(stream_id, _, HEADERS, 4));
+ EXPECT_CALL(visitor, OnBeginHeadersForStream(stream_id));
+ EXPECT_CALL(visitor, OnHeaderForStream(stream_id, ":status", "200"));
EXPECT_CALL(visitor,
- OnHeaderForStream(1, "date", "Tue, 6 Apr 2021 12:54:01 GMT"));
- EXPECT_CALL(visitor, OnEndHeadersForStream(1));
- EXPECT_CALL(visitor, OnFrameHeader(1, 26, DATA, 0));
- EXPECT_CALL(visitor, OnBeginDataForStream(1, 26));
- EXPECT_CALL(visitor, OnDataForStream(1, "This is the response body."));
+ OnHeaderForStream(stream_id, "server", "my-fake-server"));
+ EXPECT_CALL(visitor, OnHeaderForStream(stream_id, "date",
+ "Tue, 6 Apr 2021 12:54:01 GMT"));
+ EXPECT_CALL(visitor, OnEndHeadersForStream(stream_id));
+ EXPECT_CALL(visitor, OnFrameHeader(stream_id, 26, DATA, 0));
+ EXPECT_CALL(visitor, OnBeginDataForStream(stream_id, 26));
+ EXPECT_CALL(visitor,
+ OnDataForStream(stream_id, "This is the response body."));
EXPECT_CALL(visitor, OnFrameHeader(3, 4, RST_STREAM, 0));
EXPECT_CALL(visitor, OnRstStream(3, Http2ErrorCode::INTERNAL_ERROR));
EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::INTERNAL_ERROR));
@@ -99,6 +115,13 @@
const ssize_t stream_result = session.ProcessBytes(stream_frames);
EXPECT_EQ(stream_frames.size(), stream_result);
EXPECT_EQ(3, session.GetHighestReceivedStreamId());
+
+ // The first stream is active and has received some data.
+ EXPECT_GT(kInitialFlowControlWindowSize,
+ session.GetStreamReceiveWindowSize(stream_id));
+ // Connection receive window is equivalent to the first stream's.
+ EXPECT_EQ(session.GetReceiveWindowSize(),
+ session.GetStreamReceiveWindowSize(stream_id));
}
// Verifies that a client session enqueues initial SETTINGS if Send() is called
@@ -404,6 +427,13 @@
EXPECT_EQ(kSentinel1, session.GetStreamUserData(1));
+ // The first stream is active and has received some data.
+ EXPECT_GT(kInitialFlowControlWindowSize,
+ session.GetStreamReceiveWindowSize(1));
+ // Connection receive window is equivalent to the first stream's.
+ EXPECT_EQ(session.GetReceiveWindowSize(),
+ session.GetStreamReceiveWindowSize(1));
+
// TODO(birenroy): drop stream state when streams are closed. It should no
// longer be possible to set user data.
const char* kSentinel3 = "another arbitrary pointer";