gfe-relnote: Move headers streams out of static stream map. Protected by
gfe_reloadable_quic_eliminate_static_stream_map.
Following up this CL, crypto stream will be moved out of static_stream_map too.
PiperOrigin-RevId: 244429772
Change-Id: I5186bc7ab4e6ee9c9f546a3c9456be489a4ddc26
diff --git a/quic/core/http/quic_spdy_client_session_base.cc b/quic/core/http/quic_spdy_client_session_base.cc
index d388d2e..4aa3348 100644
--- a/quic/core/http/quic_spdy_client_session_base.cc
+++ b/quic/core/http/quic_spdy_client_session_base.cc
@@ -95,6 +95,13 @@
// It's quite possible to receive headers after a stream has been reset.
return;
}
+ if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+ stream->is_static()) {
+ connection()->CloseConnection(
+ QUIC_INVALID_HEADERS_STREAM_DATA, "stream is static",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
stream->OnPromiseHeaderList(promised_stream_id, frame_len, header_list);
}
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index bda2e17..d7a5473 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -355,9 +355,15 @@
headers_stream_ = QuicMakeUnique<QuicHeadersStream>((this));
DCHECK_EQ(QuicUtils::GetHeadersStreamId(connection()->transport_version()),
headers_stream_->id());
- RegisterStaticStream(
- QuicUtils::GetHeadersStreamId(connection()->transport_version()),
- headers_stream_.get());
+ if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) {
+ RegisterStaticStream(
+ QuicUtils::GetHeadersStreamId(connection()->transport_version()),
+ headers_stream_.get());
+ } else {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 7, 9);
+ unowned_headers_stream_ = headers_stream_.get();
+ RegisterStaticStreamNew(std::move(headers_stream_));
+ }
set_max_uncompressed_header_bytes(max_inbound_header_list_size_);
@@ -438,6 +444,14 @@
// It's quite possible to receive headers after a stream has been reset.
return;
}
+ if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+ stream->is_static()) {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 8, 9);
+ connection()->CloseConnection(
+ QUIC_INVALID_HEADERS_STREAM_DATA, "stream is static",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
stream->OnStreamHeaderList(fin, frame_len, header_list);
}
@@ -478,7 +492,7 @@
}
SpdyPriorityIR priority_frame(id, parent_stream_id, weight, exclusive);
SpdySerializedFrame frame(spdy_framer_.SerializeFrame(priority_frame));
- headers_stream_->WriteOrBufferData(
+ headers_stream()->WriteOrBufferData(
QuicStringPiece(frame.data(), frame.size()), false, nullptr);
return frame.size();
}
@@ -498,7 +512,7 @@
push_promise.set_fin(false);
SpdySerializedFrame frame(spdy_framer_.SerializeFrame(push_promise));
- headers_stream_->WriteOrBufferData(
+ headers_stream()->WriteOrBufferData(
QuicStringPiece(frame.data(), frame.size()), false, nullptr);
return frame.size();
}
@@ -508,7 +522,7 @@
settings_frame.AddSetting(SETTINGS_MAX_HEADER_LIST_SIZE, value);
SpdySerializedFrame frame(spdy_framer_.SerializeFrame(settings_frame));
- headers_stream_->WriteOrBufferData(
+ headers_stream()->WriteOrBufferData(
QuicStringPiece(frame.data(), frame.size()), false, nullptr);
return frame.size();
}
@@ -567,7 +581,7 @@
headers_frame.set_exclusive(exclusive);
}
SpdySerializedFrame frame(spdy_framer_.SerializeFrame(headers_frame));
- headers_stream_->WriteOrBufferData(
+ headers_stream()->WriteOrBufferData(
QuicStringPiece(frame.data(), frame.size()), false,
std::move(ack_listener));
return frame.size();
@@ -695,8 +709,20 @@
}
bool QuicSpdySession::HasActiveRequestStreams() const {
- // TODO(renjietang): Exclude static streams.
- return !dynamic_streams().empty();
+ if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) {
+ return !dynamic_streams().empty();
+ }
+ // In the case where session is destructed by calling
+ // dynamic_streams().clear(), we will have incorrect accounting here.
+ // TODO(renjietang): Modify destructors and make this a DCHECK.
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 9, 9);
+ if (static_cast<size_t>(dynamic_streams().size()) >
+ num_incoming_static_streams() + num_outgoing_static_streams()) {
+ return dynamic_streams().size() - num_incoming_static_streams() -
+ num_outgoing_static_streams() >
+ 0;
+ }
+ return false;
}
} // namespace quic
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index 0ff1150..9248fa5 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -139,7 +139,17 @@
QpackEncoder* qpack_encoder();
QpackDecoder* qpack_decoder();
- QuicHeadersStream* headers_stream() { return headers_stream_.get(); }
+ QuicHeadersStream* headers_stream() {
+ return GetQuicReloadableFlag(quic_eliminate_static_stream_map)
+ ? unowned_headers_stream_
+ : headers_stream_.get();
+ }
+
+ const QuicHeadersStream* headers_stream() const {
+ return GetQuicReloadableFlag(quic_eliminate_static_stream_map)
+ ? unowned_headers_stream_
+ : headers_stream_.get();
+ }
bool server_push_enabled() const { return server_push_enabled_; }
@@ -262,6 +272,13 @@
// TODO(123528590): Remove this member.
std::unique_ptr<QuicHeadersStream> headers_stream_;
+ // Unowned headers stream pointer that points to the stream
+ // in dynamic_stream_map.
+ // TODO(renjietang): Merge this with headers_stream_ and clean up other
+ // static_stream_map logic when flag eliminate_static_stream_map
+ // is deprecated.
+ QuicHeadersStream* unowned_headers_stream_;
+
// The maximum size of a header block that will be accepted from the peer,
// defined per spec as key + value + overhead per field (uncompressed).
size_t max_inbound_header_list_size_;
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 04605e1..5726573 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -896,9 +896,16 @@
// connection flow control blocked.
TestCryptoStream* crypto_stream = session_.GetMutableCryptoStream();
EXPECT_CALL(*crypto_stream, OnCanWrite());
- QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr);
- TestHeadersStream* headers_stream = new TestHeadersStream(&session_);
- QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream);
+ TestHeadersStream* headers_stream;
+ if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) {
+ QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr);
+ headers_stream = new TestHeadersStream(&session_);
+ QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream);
+ } else {
+ QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, nullptr);
+ headers_stream = new TestHeadersStream(&session_);
+ QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, headers_stream);
+ }
session_.MarkConnectionLevelWriteBlocked(
QuicUtils::GetHeadersStreamId(connection_->transport_version()));
EXPECT_CALL(*headers_stream, OnCanWrite());
@@ -1597,9 +1604,16 @@
}
TEST_P(QuicSpdySessionTestClient, WritePriority) {
- QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr);
- TestHeadersStream* headers_stream = new TestHeadersStream(&session_);
- QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream);
+ TestHeadersStream* headers_stream;
+ if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) {
+ QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr);
+ headers_stream = new TestHeadersStream(&session_);
+ QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream);
+ } else {
+ QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, nullptr);
+ headers_stream = new TestHeadersStream(&session_);
+ QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, headers_stream);
+ }
// Make packet writer blocked so |headers_stream| will buffer its write data.
MockPacketWriter* writer = static_cast<MockPacketWriter*>(
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 36070a4..91d323b 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -61,6 +61,8 @@
config_.GetMaxIncomingDynamicStreamsToSend()),
num_dynamic_incoming_streams_(0),
num_draining_incoming_streams_(0),
+ num_outgoing_static_streams_(0),
+ num_incoming_static_streams_(0),
num_locally_closed_incoming_streams_highest_offset_(0),
error_(QUIC_NO_ERROR),
flow_controller_(
@@ -118,6 +120,20 @@
}
}
+void QuicSession::RegisterStaticStreamNew(std::unique_ptr<QuicStream> stream) {
+ DCHECK(GetQuicReloadableFlag(quic_eliminate_static_stream_map));
+ QuicStreamId stream_id = stream->id();
+ dynamic_stream_map_[stream_id] = std::move(stream);
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ v99_streamid_manager_.RegisterStaticStream(stream_id);
+ }
+ if (IsIncomingStream(stream_id)) {
+ ++num_incoming_static_streams_;
+ } else {
+ ++num_outgoing_static_streams_;
+ }
+}
+
void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
// TODO(rch) deal with the error case of stream id 0.
QuicStreamId stream_id = frame.stream_id;
@@ -152,6 +168,14 @@
}
return;
}
+ if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) && frame.fin &&
+ handler.stream->is_static()) {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 1, 9);
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Attempt to close a static stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
handler.stream->OnStreamFrame(frame);
}
@@ -230,6 +254,19 @@
<< stream_id << ". Ignoring.";
return true;
}
+
+ if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+ stream->is_static()) {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 2, 9);
+ QUIC_DVLOG(1) << ENDPOINT
+ << "Received STOP_SENDING for a static stream, id: "
+ << stream_id << " Closing connection";
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a static stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return false;
+ }
+
stream->OnStopSending(frame.application_error_code);
stream->set_stream_error(
@@ -277,6 +314,14 @@
HandleRstOnValidNonexistentStream(frame);
return; // Errors are handled by GetOrCreateStream.
}
+ if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+ handler.stream->is_static()) {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 3, 9);
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Attempt to reset a static stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
handler.stream->OnStreamReset(frame);
}
@@ -317,14 +362,35 @@
error_ = error;
}
- while (!dynamic_stream_map_.empty()) {
- DynamicStreamMap::iterator it = dynamic_stream_map_.begin();
- QuicStreamId id = it->first;
- it->second->OnConnectionClosed(error, source);
- // The stream should call CloseStream as part of OnConnectionClosed.
- if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) {
- QUIC_BUG << ENDPOINT << "Stream failed to close under OnConnectionClosed";
- CloseStream(id);
+ if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) {
+ while (!dynamic_stream_map_.empty()) {
+ DynamicStreamMap::iterator it = dynamic_stream_map_.begin();
+ QuicStreamId id = it->first;
+ it->second->OnConnectionClosed(error, source);
+ // The stream should call CloseStream as part of OnConnectionClosed.
+ if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) {
+ QUIC_BUG << ENDPOINT << "Stream " << id
+ << " failed to close under OnConnectionClosed";
+ CloseStream(id);
+ }
+ }
+ } else {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 4, 9);
+ // Copy all non static streams in a new map for the ease of deleting.
+ QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
+ for (const auto& it : dynamic_stream_map_) {
+ if (!it.second->is_static()) {
+ non_static_streams[it.first] = it.second.get();
+ }
+ }
+ for (const auto& it : non_static_streams) {
+ QuicStreamId id = it.first;
+ it.second->OnConnectionClosed(error, source);
+ if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) {
+ QUIC_BUG << ENDPOINT << "Stream " << id
+ << " failed to close under OnConnectionClosed";
+ CloseStream(id);
+ }
}
}
@@ -533,7 +599,8 @@
uint64_t QuicSession::GetNumOpenDynamicStreams() const {
return dynamic_stream_map_.size() - draining_streams_.size() +
- locally_closed_streams_highest_offset_.size();
+ locally_closed_streams_highest_offset_.size() -
+ num_incoming_static_streams_ - num_outgoing_static_streams_;
}
void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
@@ -625,6 +692,17 @@
DynamicStreamMap::iterator it = dynamic_stream_map_.find(id);
if (it != dynamic_stream_map_.end()) {
+ if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+ it->second->is_static()) {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 5, 9);
+ QUIC_DVLOG(1) << ENDPOINT
+ << "Try to send rst for a static stream, id: " << id
+ << " Closing connection";
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Sending rst for a static stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
QuicStream* stream = it->second.get();
if (stream) {
stream->set_rst_sent(true);
@@ -687,6 +765,17 @@
return;
}
QuicStream* stream = it->second.get();
+ if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+ stream->is_static()) {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 6, 9);
+ QUIC_DVLOG(1) << ENDPOINT
+ << "Try to close a static stream, id: " << stream_id
+ << " Closing connection";
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Try to close a static stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
// Tell the stream that a RST has been sent.
if (locally_reset) {
@@ -1024,6 +1113,7 @@
}
void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
+ DCHECK(!stream->is_static());
QuicStreamId stream_id = stream->id();
QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << dynamic_stream_map_.size()
<< ". activating " << stream_id;
@@ -1245,7 +1335,8 @@
}
size_t QuicSession::GetNumActiveStreams() const {
- return dynamic_stream_map_.size() - draining_streams_.size();
+ return dynamic_stream_map_.size() - draining_streams_.size() -
+ num_incoming_static_streams_ - num_outgoing_static_streams_;
}
size_t QuicSession::GetNumDrainingStreams() const {
@@ -1280,9 +1371,11 @@
size_t QuicSession::GetNumDynamicOutgoingStreams() const {
DCHECK_GE(static_cast<size_t>(dynamic_stream_map_.size() +
pending_stream_map_.size()),
- num_dynamic_incoming_streams_);
+ num_dynamic_incoming_streams_ + num_outgoing_static_streams_ +
+ num_incoming_static_streams_);
return dynamic_stream_map_.size() + pending_stream_map_.size() -
- num_dynamic_incoming_streams_;
+ num_dynamic_incoming_streams_ - num_outgoing_static_streams_ -
+ num_incoming_static_streams_;
}
size_t QuicSession::GetNumDrainingOutgoingStreams() const {
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index a9fb992..d620502 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -300,6 +300,16 @@
// reserved headers and crypto streams.
size_t GetNumOpenOutgoingStreams() const;
+ // Returns the number of open peer initiated static streams.
+ size_t num_incoming_static_streams() const {
+ return num_incoming_static_streams_;
+ }
+
+ // Returns the number of open self initiated static streams.
+ size_t num_outgoing_static_streams() const {
+ return num_outgoing_static_streams_;
+ }
+
// Add the stream to the session's write-blocked list because it is blocked by
// connection-level flow control but not by its own stream-level flow control.
// The stream will be given a chance to write when a connection-level
@@ -481,6 +491,9 @@
// Register (|id|, |stream|) with the static stream map. Override previous
// registrations with the same id.
void RegisterStaticStream(QuicStreamId id, QuicStream* stream);
+ // TODO(renjietang): Replace the original Register method with the new one
+ // once flag is deprecated.
+ void RegisterStaticStreamNew(std::unique_ptr<QuicStream> stream);
const StaticStreamMap& static_streams() const { return static_stream_map_; }
DynamicStreamMap& dynamic_streams() { return dynamic_stream_map_; }
@@ -653,6 +666,14 @@
// A counter for peer initiated streams which are in the draining_streams_.
size_t num_draining_incoming_streams_;
+ // A counter for self initiated static streams which are in
+ // dynamic_stream_map_.
+ size_t num_outgoing_static_streams_;
+
+ // A counter for peer initiated static streams which are in
+ // dynamic_stream_map_.
+ size_t num_incoming_static_streams_;
+
// A counter for peer initiated streams which are in the
// locally_closed_streams_highest_offset_.
size_t num_locally_closed_incoming_streams_highest_offset_;
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index 40bed1c..0cacbe3 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -343,6 +343,9 @@
// Does not send a FIN. May cause the stream to be closed.
virtual void CloseWriteSide();
+ // Returns true if the stream is static.
+ bool is_static() const { return is_static_; }
+
protected:
// Sends as many bytes in the first |count| buffers of |iov| to the connection
// as the connection will consume. If FIN is consumed, the write side is
diff --git a/quic/test_tools/quic_spdy_session_peer.cc b/quic/test_tools/quic_spdy_session_peer.cc
index dcba12c..956cd88 100644
--- a/quic/test_tools/quic_spdy_session_peer.cc
+++ b/quic/test_tools/quic_spdy_session_peer.cc
@@ -13,7 +13,7 @@
// static
QuicHeadersStream* QuicSpdySessionPeer::GetHeadersStream(
QuicSpdySession* session) {
- return session->headers_stream_.get();
+ return session->headers_stream();
}
// static
@@ -25,6 +25,20 @@
}
}
+void QuicSpdySessionPeer::SetUnownedHeadersStream(
+ QuicSpdySession* session,
+ QuicHeadersStream* headers_stream) {
+ for (auto& it : session->dynamic_streams()) {
+ if (it.first == QuicUtils::GetHeadersStreamId(
+ session->connection()->transport_version())) {
+ it.second.reset(headers_stream);
+ session->unowned_headers_stream_ =
+ static_cast<QuicHeadersStream*>(it.second.get());
+ break;
+ }
+ }
+}
+
// static
const spdy::SpdyFramer& QuicSpdySessionPeer::GetSpdyFramer(
QuicSpdySession* session) {
diff --git a/quic/test_tools/quic_spdy_session_peer.h b/quic/test_tools/quic_spdy_session_peer.h
index 83e8b0c..47b55e1 100644
--- a/quic/test_tools/quic_spdy_session_peer.h
+++ b/quic/test_tools/quic_spdy_session_peer.h
@@ -24,6 +24,8 @@
static QuicHeadersStream* GetHeadersStream(QuicSpdySession* session);
static void SetHeadersStream(QuicSpdySession* session,
QuicHeadersStream* headers_stream);
+ static void SetUnownedHeadersStream(QuicSpdySession* session,
+ QuicHeadersStream* headers_stream);
static const spdy::SpdyFramer& GetSpdyFramer(QuicSpdySession* session);
static void SetHpackEncoderDebugVisitor(
QuicSpdySession* session,