gfe-relnote: Move headers streams out of static stream map. Protected by gfe_reloadable_quic_eliminate_static_stream_map. Following up this CL, crypto stream will be moved out of static_stream_map too. PiperOrigin-RevId: 244429772 Change-Id: I5186bc7ab4e6ee9c9f546a3c9456be489a4ddc26
diff --git a/quic/core/http/quic_spdy_client_session_base.cc b/quic/core/http/quic_spdy_client_session_base.cc index d388d2e..4aa3348 100644 --- a/quic/core/http/quic_spdy_client_session_base.cc +++ b/quic/core/http/quic_spdy_client_session_base.cc
@@ -95,6 +95,13 @@ // It's quite possible to receive headers after a stream has been reset. return; } + if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) && + stream->is_static()) { + connection()->CloseConnection( + QUIC_INVALID_HEADERS_STREAM_DATA, "stream is static", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } stream->OnPromiseHeaderList(promised_stream_id, frame_len, header_list); }
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc index bda2e17..d7a5473 100644 --- a/quic/core/http/quic_spdy_session.cc +++ b/quic/core/http/quic_spdy_session.cc
@@ -355,9 +355,15 @@ headers_stream_ = QuicMakeUnique<QuicHeadersStream>((this)); DCHECK_EQ(QuicUtils::GetHeadersStreamId(connection()->transport_version()), headers_stream_->id()); - RegisterStaticStream( - QuicUtils::GetHeadersStreamId(connection()->transport_version()), - headers_stream_.get()); + if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) { + RegisterStaticStream( + QuicUtils::GetHeadersStreamId(connection()->transport_version()), + headers_stream_.get()); + } else { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 7, 9); + unowned_headers_stream_ = headers_stream_.get(); + RegisterStaticStreamNew(std::move(headers_stream_)); + } set_max_uncompressed_header_bytes(max_inbound_header_list_size_); @@ -438,6 +444,14 @@ // It's quite possible to receive headers after a stream has been reset. return; } + if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) && + stream->is_static()) { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 8, 9); + connection()->CloseConnection( + QUIC_INVALID_HEADERS_STREAM_DATA, "stream is static", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } stream->OnStreamHeaderList(fin, frame_len, header_list); } @@ -478,7 +492,7 @@ } SpdyPriorityIR priority_frame(id, parent_stream_id, weight, exclusive); SpdySerializedFrame frame(spdy_framer_.SerializeFrame(priority_frame)); - headers_stream_->WriteOrBufferData( + headers_stream()->WriteOrBufferData( QuicStringPiece(frame.data(), frame.size()), false, nullptr); return frame.size(); } @@ -498,7 +512,7 @@ push_promise.set_fin(false); SpdySerializedFrame frame(spdy_framer_.SerializeFrame(push_promise)); - headers_stream_->WriteOrBufferData( + headers_stream()->WriteOrBufferData( QuicStringPiece(frame.data(), frame.size()), false, nullptr); return frame.size(); } @@ -508,7 +522,7 @@ settings_frame.AddSetting(SETTINGS_MAX_HEADER_LIST_SIZE, value); SpdySerializedFrame frame(spdy_framer_.SerializeFrame(settings_frame)); - headers_stream_->WriteOrBufferData( + headers_stream()->WriteOrBufferData( QuicStringPiece(frame.data(), frame.size()), false, nullptr); return frame.size(); } @@ -567,7 +581,7 @@ headers_frame.set_exclusive(exclusive); } SpdySerializedFrame frame(spdy_framer_.SerializeFrame(headers_frame)); - headers_stream_->WriteOrBufferData( + headers_stream()->WriteOrBufferData( QuicStringPiece(frame.data(), frame.size()), false, std::move(ack_listener)); return frame.size(); @@ -695,8 +709,20 @@ } bool QuicSpdySession::HasActiveRequestStreams() const { - // TODO(renjietang): Exclude static streams. - return !dynamic_streams().empty(); + if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) { + return !dynamic_streams().empty(); + } + // In the case where session is destructed by calling + // dynamic_streams().clear(), we will have incorrect accounting here. + // TODO(renjietang): Modify destructors and make this a DCHECK. + QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 9, 9); + if (static_cast<size_t>(dynamic_streams().size()) > + num_incoming_static_streams() + num_outgoing_static_streams()) { + return dynamic_streams().size() - num_incoming_static_streams() - + num_outgoing_static_streams() > + 0; + } + return false; } } // namespace quic
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h index 0ff1150..9248fa5 100644 --- a/quic/core/http/quic_spdy_session.h +++ b/quic/core/http/quic_spdy_session.h
@@ -139,7 +139,17 @@ QpackEncoder* qpack_encoder(); QpackDecoder* qpack_decoder(); - QuicHeadersStream* headers_stream() { return headers_stream_.get(); } + QuicHeadersStream* headers_stream() { + return GetQuicReloadableFlag(quic_eliminate_static_stream_map) + ? unowned_headers_stream_ + : headers_stream_.get(); + } + + const QuicHeadersStream* headers_stream() const { + return GetQuicReloadableFlag(quic_eliminate_static_stream_map) + ? unowned_headers_stream_ + : headers_stream_.get(); + } bool server_push_enabled() const { return server_push_enabled_; } @@ -262,6 +272,13 @@ // TODO(123528590): Remove this member. std::unique_ptr<QuicHeadersStream> headers_stream_; + // Unowned headers stream pointer that points to the stream + // in dynamic_stream_map. + // TODO(renjietang): Merge this with headers_stream_ and clean up other + // static_stream_map logic when flag eliminate_static_stream_map + // is deprecated. + QuicHeadersStream* unowned_headers_stream_; + // The maximum size of a header block that will be accepted from the peer, // defined per spec as key + value + overhead per field (uncompressed). size_t max_inbound_header_list_size_;
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc index 04605e1..5726573 100644 --- a/quic/core/http/quic_spdy_session_test.cc +++ b/quic/core/http/quic_spdy_session_test.cc
@@ -896,9 +896,16 @@ // connection flow control blocked. TestCryptoStream* crypto_stream = session_.GetMutableCryptoStream(); EXPECT_CALL(*crypto_stream, OnCanWrite()); - QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr); - TestHeadersStream* headers_stream = new TestHeadersStream(&session_); - QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream); + TestHeadersStream* headers_stream; + if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) { + QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr); + headers_stream = new TestHeadersStream(&session_); + QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream); + } else { + QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, nullptr); + headers_stream = new TestHeadersStream(&session_); + QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, headers_stream); + } session_.MarkConnectionLevelWriteBlocked( QuicUtils::GetHeadersStreamId(connection_->transport_version())); EXPECT_CALL(*headers_stream, OnCanWrite()); @@ -1597,9 +1604,16 @@ } TEST_P(QuicSpdySessionTestClient, WritePriority) { - QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr); - TestHeadersStream* headers_stream = new TestHeadersStream(&session_); - QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream); + TestHeadersStream* headers_stream; + if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) { + QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr); + headers_stream = new TestHeadersStream(&session_); + QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream); + } else { + QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, nullptr); + headers_stream = new TestHeadersStream(&session_); + QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, headers_stream); + } // Make packet writer blocked so |headers_stream| will buffer its write data. MockPacketWriter* writer = static_cast<MockPacketWriter*>(
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc index 36070a4..91d323b 100644 --- a/quic/core/quic_session.cc +++ b/quic/core/quic_session.cc
@@ -61,6 +61,8 @@ config_.GetMaxIncomingDynamicStreamsToSend()), num_dynamic_incoming_streams_(0), num_draining_incoming_streams_(0), + num_outgoing_static_streams_(0), + num_incoming_static_streams_(0), num_locally_closed_incoming_streams_highest_offset_(0), error_(QUIC_NO_ERROR), flow_controller_( @@ -118,6 +120,20 @@ } } +void QuicSession::RegisterStaticStreamNew(std::unique_ptr<QuicStream> stream) { + DCHECK(GetQuicReloadableFlag(quic_eliminate_static_stream_map)); + QuicStreamId stream_id = stream->id(); + dynamic_stream_map_[stream_id] = std::move(stream); + if (connection_->transport_version() == QUIC_VERSION_99) { + v99_streamid_manager_.RegisterStaticStream(stream_id); + } + if (IsIncomingStream(stream_id)) { + ++num_incoming_static_streams_; + } else { + ++num_outgoing_static_streams_; + } +} + void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) { // TODO(rch) deal with the error case of stream id 0. QuicStreamId stream_id = frame.stream_id; @@ -152,6 +168,14 @@ } return; } + if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) && frame.fin && + handler.stream->is_static()) { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 1, 9); + connection()->CloseConnection( + QUIC_INVALID_STREAM_ID, "Attempt to close a static stream", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } handler.stream->OnStreamFrame(frame); } @@ -230,6 +254,19 @@ << stream_id << ". Ignoring."; return true; } + + if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) && + stream->is_static()) { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 2, 9); + QUIC_DVLOG(1) << ENDPOINT + << "Received STOP_SENDING for a static stream, id: " + << stream_id << " Closing connection"; + connection()->CloseConnection( + QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a static stream", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return false; + } + stream->OnStopSending(frame.application_error_code); stream->set_stream_error( @@ -277,6 +314,14 @@ HandleRstOnValidNonexistentStream(frame); return; // Errors are handled by GetOrCreateStream. } + if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) && + handler.stream->is_static()) { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 3, 9); + connection()->CloseConnection( + QUIC_INVALID_STREAM_ID, "Attempt to reset a static stream", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } handler.stream->OnStreamReset(frame); } @@ -317,14 +362,35 @@ error_ = error; } - while (!dynamic_stream_map_.empty()) { - DynamicStreamMap::iterator it = dynamic_stream_map_.begin(); - QuicStreamId id = it->first; - it->second->OnConnectionClosed(error, source); - // The stream should call CloseStream as part of OnConnectionClosed. - if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) { - QUIC_BUG << ENDPOINT << "Stream failed to close under OnConnectionClosed"; - CloseStream(id); + if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) { + while (!dynamic_stream_map_.empty()) { + DynamicStreamMap::iterator it = dynamic_stream_map_.begin(); + QuicStreamId id = it->first; + it->second->OnConnectionClosed(error, source); + // The stream should call CloseStream as part of OnConnectionClosed. + if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) { + QUIC_BUG << ENDPOINT << "Stream " << id + << " failed to close under OnConnectionClosed"; + CloseStream(id); + } + } + } else { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 4, 9); + // Copy all non static streams in a new map for the ease of deleting. + QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams; + for (const auto& it : dynamic_stream_map_) { + if (!it.second->is_static()) { + non_static_streams[it.first] = it.second.get(); + } + } + for (const auto& it : non_static_streams) { + QuicStreamId id = it.first; + it.second->OnConnectionClosed(error, source); + if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) { + QUIC_BUG << ENDPOINT << "Stream " << id + << " failed to close under OnConnectionClosed"; + CloseStream(id); + } } } @@ -533,7 +599,8 @@ uint64_t QuicSession::GetNumOpenDynamicStreams() const { return dynamic_stream_map_.size() - draining_streams_.size() + - locally_closed_streams_highest_offset_.size(); + locally_closed_streams_highest_offset_.size() - + num_incoming_static_streams_ - num_outgoing_static_streams_; } void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address, @@ -625,6 +692,17 @@ DynamicStreamMap::iterator it = dynamic_stream_map_.find(id); if (it != dynamic_stream_map_.end()) { + if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) && + it->second->is_static()) { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 5, 9); + QUIC_DVLOG(1) << ENDPOINT + << "Try to send rst for a static stream, id: " << id + << " Closing connection"; + connection()->CloseConnection( + QUIC_INVALID_STREAM_ID, "Sending rst for a static stream", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } QuicStream* stream = it->second.get(); if (stream) { stream->set_rst_sent(true); @@ -687,6 +765,17 @@ return; } QuicStream* stream = it->second.get(); + if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) && + stream->is_static()) { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 6, 9); + QUIC_DVLOG(1) << ENDPOINT + << "Try to close a static stream, id: " << stream_id + << " Closing connection"; + connection()->CloseConnection( + QUIC_INVALID_STREAM_ID, "Try to close a static stream", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } // Tell the stream that a RST has been sent. if (locally_reset) { @@ -1024,6 +1113,7 @@ } void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) { + DCHECK(!stream->is_static()); QuicStreamId stream_id = stream->id(); QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << dynamic_stream_map_.size() << ". activating " << stream_id; @@ -1245,7 +1335,8 @@ } size_t QuicSession::GetNumActiveStreams() const { - return dynamic_stream_map_.size() - draining_streams_.size(); + return dynamic_stream_map_.size() - draining_streams_.size() - + num_incoming_static_streams_ - num_outgoing_static_streams_; } size_t QuicSession::GetNumDrainingStreams() const { @@ -1280,9 +1371,11 @@ size_t QuicSession::GetNumDynamicOutgoingStreams() const { DCHECK_GE(static_cast<size_t>(dynamic_stream_map_.size() + pending_stream_map_.size()), - num_dynamic_incoming_streams_); + num_dynamic_incoming_streams_ + num_outgoing_static_streams_ + + num_incoming_static_streams_); return dynamic_stream_map_.size() + pending_stream_map_.size() - - num_dynamic_incoming_streams_; + num_dynamic_incoming_streams_ - num_outgoing_static_streams_ - + num_incoming_static_streams_; } size_t QuicSession::GetNumDrainingOutgoingStreams() const {
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h index a9fb992..d620502 100644 --- a/quic/core/quic_session.h +++ b/quic/core/quic_session.h
@@ -300,6 +300,16 @@ // reserved headers and crypto streams. size_t GetNumOpenOutgoingStreams() const; + // Returns the number of open peer initiated static streams. + size_t num_incoming_static_streams() const { + return num_incoming_static_streams_; + } + + // Returns the number of open self initiated static streams. + size_t num_outgoing_static_streams() const { + return num_outgoing_static_streams_; + } + // Add the stream to the session's write-blocked list because it is blocked by // connection-level flow control but not by its own stream-level flow control. // The stream will be given a chance to write when a connection-level @@ -481,6 +491,9 @@ // Register (|id|, |stream|) with the static stream map. Override previous // registrations with the same id. void RegisterStaticStream(QuicStreamId id, QuicStream* stream); + // TODO(renjietang): Replace the original Register method with the new one + // once flag is deprecated. + void RegisterStaticStreamNew(std::unique_ptr<QuicStream> stream); const StaticStreamMap& static_streams() const { return static_stream_map_; } DynamicStreamMap& dynamic_streams() { return dynamic_stream_map_; } @@ -653,6 +666,14 @@ // A counter for peer initiated streams which are in the draining_streams_. size_t num_draining_incoming_streams_; + // A counter for self initiated static streams which are in + // dynamic_stream_map_. + size_t num_outgoing_static_streams_; + + // A counter for peer initiated static streams which are in + // dynamic_stream_map_. + size_t num_incoming_static_streams_; + // A counter for peer initiated streams which are in the // locally_closed_streams_highest_offset_. size_t num_locally_closed_incoming_streams_highest_offset_;
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h index 40bed1c..0cacbe3 100644 --- a/quic/core/quic_stream.h +++ b/quic/core/quic_stream.h
@@ -343,6 +343,9 @@ // Does not send a FIN. May cause the stream to be closed. virtual void CloseWriteSide(); + // Returns true if the stream is static. + bool is_static() const { return is_static_; } + protected: // Sends as many bytes in the first |count| buffers of |iov| to the connection // as the connection will consume. If FIN is consumed, the write side is
diff --git a/quic/test_tools/quic_spdy_session_peer.cc b/quic/test_tools/quic_spdy_session_peer.cc index dcba12c..956cd88 100644 --- a/quic/test_tools/quic_spdy_session_peer.cc +++ b/quic/test_tools/quic_spdy_session_peer.cc
@@ -13,7 +13,7 @@ // static QuicHeadersStream* QuicSpdySessionPeer::GetHeadersStream( QuicSpdySession* session) { - return session->headers_stream_.get(); + return session->headers_stream(); } // static @@ -25,6 +25,20 @@ } } +void QuicSpdySessionPeer::SetUnownedHeadersStream( + QuicSpdySession* session, + QuicHeadersStream* headers_stream) { + for (auto& it : session->dynamic_streams()) { + if (it.first == QuicUtils::GetHeadersStreamId( + session->connection()->transport_version())) { + it.second.reset(headers_stream); + session->unowned_headers_stream_ = + static_cast<QuicHeadersStream*>(it.second.get()); + break; + } + } +} + // static const spdy::SpdyFramer& QuicSpdySessionPeer::GetSpdyFramer( QuicSpdySession* session) {
diff --git a/quic/test_tools/quic_spdy_session_peer.h b/quic/test_tools/quic_spdy_session_peer.h index 83e8b0c..47b55e1 100644 --- a/quic/test_tools/quic_spdy_session_peer.h +++ b/quic/test_tools/quic_spdy_session_peer.h
@@ -24,6 +24,8 @@ static QuicHeadersStream* GetHeadersStream(QuicSpdySession* session); static void SetHeadersStream(QuicSpdySession* session, QuicHeadersStream* headers_stream); + static void SetUnownedHeadersStream(QuicSpdySession* session, + QuicHeadersStream* headers_stream); static const spdy::SpdyFramer& GetSpdyFramer(QuicSpdySession* session); static void SetHpackEncoderDebugVisitor( QuicSpdySession* session,