gfe-relnote: Remove draining_streams_ from QuicSession. Protected by gfe2_reloadable_flag_quic_deprecate_draining_streams. PiperOrigin-RevId: 306919260 Change-Id: I6d40df89133f450ff3e913280465aff328714408
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc index 63fb18d..e09c9bc 100644 --- a/quic/core/http/quic_spdy_session_test.cc +++ b/quic/core/http/quic_spdy_session_test.cc
@@ -1774,7 +1774,7 @@ QuicStreamFrame data1(i, true, 0, quiche::QuicheStringPiece("HT")); session_.OnStreamFrame(data1); EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams()); - session_.StreamDraining(i); + session_.StreamDraining(i, /*unidirectional=*/false); EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams()); } }
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc index b57ddd7..4124575 100644 --- a/quic/core/quic_session.cc +++ b/quic/core/quic_session.cc
@@ -93,6 +93,7 @@ num_expected_unidirectional_static_streams), num_dynamic_incoming_streams_(0), num_draining_incoming_streams_(0), + num_draining_outgoing_streams_(0), num_outgoing_static_streams_(0), num_incoming_static_streams_(0), num_locally_closed_incoming_streams_highest_offset_(0), @@ -121,7 +122,9 @@ num_expected_unidirectional_static_streams), enable_round_robin_scheduling_(false), write_with_transmission_( - GetQuicReloadableFlag(quic_write_with_transmission)) { + GetQuicReloadableFlag(quic_write_with_transmission)), + deprecate_draining_streams_( + GetQuicReloadableFlag(quic_deprecate_draining_streams)) { closed_streams_clean_up_alarm_ = QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm( new ClosedStreamsCleanUpDelegate(this))); @@ -685,7 +688,7 @@ } uint64_t QuicSession::GetNumOpenDynamicStreams() const { - return stream_map_.size() - draining_streams_.size() + + return stream_map_.size() - GetNumDrainingStreams() + locally_closed_streams_highest_offset_.size() - num_incoming_static_streams_ - num_outgoing_static_streams_; } @@ -904,16 +907,27 @@ InsertLocallyClosedStreamsHighestOffset( stream_id, stream->flow_controller()->highest_received_byte_offset()); } + bool stream_was_draining = false; + if (deprecate_draining_streams_) { + stream_was_draining = stream->was_draining(); + QUIC_DVLOG_IF(1, stream_was_draining) + << ENDPOINT << "Stream " << stream_id << " was draining"; + } stream_map_.erase(it); if (IsIncomingStream(stream_id)) { --num_dynamic_incoming_streams_; } - - const bool stream_was_draining = - draining_streams_.find(stream_id) != draining_streams_.end(); + if (!deprecate_draining_streams_) { + stream_was_draining = + draining_streams_.find(stream_id) != draining_streams_.end(); + } if (stream_was_draining) { if (IsIncomingStream(stream_id)) { + QUIC_BUG_IF(num_draining_incoming_streams_ == 0); --num_draining_incoming_streams_; + } else if (deprecate_draining_streams_) { + QUIC_BUG_IF(num_draining_outgoing_streams_ == 0); + --num_draining_outgoing_streams_; } draining_streams_.erase(stream_id); } else if (VersionHasIetfQuicFrames(transport_version())) { @@ -1593,8 +1607,22 @@ return CreateIncomingStream(stream_id); } -void QuicSession::StreamDraining(QuicStreamId stream_id) { +void QuicSession::StreamDraining(QuicStreamId stream_id, bool unidirectional) { DCHECK(QuicContainsKey(stream_map_, stream_id)); + if (deprecate_draining_streams_) { + QUIC_RELOADABLE_FLAG_COUNT(quic_deprecate_draining_streams); + QUIC_DVLOG(1) << ENDPOINT << "Stream " << stream_id << " is draining"; + if (VersionHasIetfQuicFrames(transport_version())) { + v99_streamid_manager_.OnStreamClosed(stream_id); + } + if (IsIncomingStream(stream_id)) { + ++num_draining_incoming_streams_; + return; + } + ++num_draining_outgoing_streams_; + OnCanCreateNewOutgoingStream(unidirectional); + return; + } if (!QuicContainsKey(draining_streams_, stream_id)) { draining_streams_.insert(stream_id); if (IsIncomingStream(stream_id)) { @@ -1749,11 +1777,14 @@ } size_t QuicSession::GetNumActiveStreams() const { - return stream_map_.size() - draining_streams_.size() - + return stream_map_.size() - GetNumDrainingStreams() - num_incoming_static_streams_ - num_outgoing_static_streams_; } size_t QuicSession::GetNumDrainingStreams() const { + if (deprecate_draining_streams_) { + return num_draining_incoming_streams_ + num_draining_outgoing_streams_; + } return draining_streams_.size(); } @@ -1796,6 +1827,9 @@ } size_t QuicSession::GetNumDrainingOutgoingStreams() const { + if (deprecate_draining_streams_) { + return num_draining_outgoing_streams_; + } DCHECK_GE(draining_streams_.size(), num_draining_incoming_streams_); return draining_streams_.size() - num_draining_incoming_streams_; }
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h index 4c4626b..d1720a0 100644 --- a/quic/core/quic_session.h +++ b/quic/core/quic_session.h
@@ -400,7 +400,7 @@ QuicStream* GetOrCreateStream(const QuicStreamId stream_id); // Mark a stream as draining. - virtual void StreamDraining(QuicStreamId id); + void StreamDraining(QuicStreamId id, bool unidirectional); // Returns true if this stream should yield writes to another blocked stream. virtual bool ShouldYield(QuicStreamId stream_id); @@ -483,6 +483,10 @@ bool write_with_transmission() const { return write_with_transmission_; } + bool deprecate_draining_streams() const { + return deprecate_draining_streams_; + } + protected: using StreamMap = QuicSmallMap<QuicStreamId, std::unique_ptr<QuicStream>, 10>; @@ -735,6 +739,8 @@ // Set of stream ids that are "draining" -- a FIN has been sent and received, // but the stream object still exists because not all the received data has // been consumed. + // TODO(fayang): Remove draining_streams_ when deprecate + // quic_deprecate_draining_streams. QuicUnorderedSet<QuicStreamId> draining_streams_; // Set of stream ids that are waiting for acks excluding crypto stream id. @@ -751,9 +757,15 @@ // A counter for peer initiated dynamic streams which are in the stream_map_. size_t num_dynamic_incoming_streams_; - // A counter for peer initiated streams which are in the draining_streams_. + // A counter for peer initiated streams which have sent and received FIN but + // waiting for application to consume data. size_t num_draining_incoming_streams_; + // A counter for self initiated streams which have sent and received FIN but + // waiting for application to consume data. Only used when + // deprecate_draining_streams_ is true. + size_t num_draining_outgoing_streams_; + // A counter for self initiated static streams which are in // stream_map_. size_t num_outgoing_static_streams_; @@ -818,6 +830,9 @@ // Latched value of gfe2_reloadable_flag_quic_write_with_transmission. const bool write_with_transmission_; + + // Latched value of quic_deprecate_draining_streams. + const bool deprecate_draining_streams_; }; } // namespace quic
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc index d50931c..bf4bffc 100644 --- a/quic/core/quic_session_test.cc +++ b/quic/core/quic_session_test.cc
@@ -1888,7 +1888,7 @@ QuicStreamFrame data1(stream_id, true, 0, quiche::QuicheStringPiece("HT")); session_.OnStreamFrame(data1); EXPECT_CALL(session_, OnCanCreateNewOutgoingStream(false)).Times(1); - session_.StreamDraining(stream_id); + session_.StreamDraining(stream_id, /*unidirectional=*/false); } TEST_P(QuicSessionTestServer, NoPendingStreams) { @@ -2019,7 +2019,7 @@ QuicStreamFrame data1(i, true, 0, quiche::QuicheStringPiece("HT")); session_.OnStreamFrame(data1); EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams()); - session_.StreamDraining(i); + session_.StreamDraining(i, /*unidirectional=*/false); EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams()); } }
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc index 3b2484d..813ea2d 100644 --- a/quic/core/quic_stream.cc +++ b/quic/core/quic_stream.cc
@@ -353,6 +353,7 @@ buffered_data_threshold_(GetQuicFlag(FLAGS_quic_buffered_data_threshold)), is_static_(is_static), deadline_(QuicTime::Zero()), + was_draining_(false), type_(VersionHasIetfQuicFrames(session->transport_version()) && type != CRYPTO ? QuicUtils::GetStreamType(id_, @@ -431,9 +432,14 @@ } if (frame.fin) { - fin_received_ = true; - if (fin_sent_) { - session_->StreamDraining(id_); + if (!session_->deprecate_draining_streams() || !fin_received_) { + fin_received_ = true; + if (fin_sent_) { + DCHECK(!was_draining_ || !session_->deprecate_draining_streams()); + session_->StreamDraining(id_, + /*unidirectional=*/type_ != BIDIRECTIONAL); + was_draining_ = true; + } } } @@ -1087,10 +1093,14 @@ MaybeSendBlocked(); } if (fin && consumed_data.fin_consumed) { + DCHECK(!fin_sent_); fin_sent_ = true; fin_outstanding_ = true; if (fin_received_) { - session_->StreamDraining(id_); + DCHECK(!was_draining_); + session_->StreamDraining(id_, + /*unidirectional=*/type_ != BIDIRECTIONAL); + was_draining_ = true; } CloseWriteSide(); } else if (fin && !consumed_data.fin_consumed) {
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h index bf12149..b8e940f 100644 --- a/quic/core/quic_stream.h +++ b/quic/core/quic_stream.h
@@ -358,6 +358,8 @@ // Returns true if the stream is static. bool is_static() const { return is_static_; } + bool was_draining() const { return was_draining_; } + static spdy::SpdyStreamPrecedence CalculateDefaultPriority( const QuicSession* session); @@ -534,6 +536,10 @@ // If initialized, reset this stream at this deadline. QuicTime deadline_; + // True if this stream has entered draining state. Only used when + // quic_deprecate_draining_streams is true. + bool was_draining_; + // Indicates whether this stream is bidirectional, read unidirectional or // write unidirectional. const StreamType type_;
diff --git a/quic/core/quic_stream_test.cc b/quic/core/quic_stream_test.cc index eea7b15..100c05e 100644 --- a/quic/core/quic_stream_test.cc +++ b/quic/core/quic_stream_test.cc
@@ -746,8 +746,7 @@ nullptr); EXPECT_TRUE(stream_->write_side_closed()); - EXPECT_EQ(1u, QuicSessionPeer::GetDrainingStreams(session_.get()) - ->count(kTestStreamId)); + EXPECT_EQ(1u, session_->GetNumDrainingStreams()); EXPECT_EQ(0u, session_->GetNumOpenIncomingStreams()); } @@ -775,8 +774,7 @@ EXPECT_FALSE(QuicStreamPeer::read_side_closed(stream_)); EXPECT_FALSE(stream_->reading_stopped()); - EXPECT_EQ(1u, QuicSessionPeer::GetDrainingStreams(session_.get()) - ->count(kTestStreamId)); + EXPECT_EQ(1u, session_->GetNumDrainingStreams()); EXPECT_EQ(0u, session_->GetNumOpenIncomingStreams()); }
diff --git a/quic/test_tools/quic_session_peer.cc b/quic/test_tools/quic_session_peer.cc index cbaf5d6..c23bc88 100644 --- a/quic/test_tools/quic_session_peer.cc +++ b/quic/test_tools/quic_session_peer.cc
@@ -143,12 +143,6 @@ } // static -QuicUnorderedSet<QuicStreamId>* QuicSessionPeer::GetDrainingStreams( - QuicSession* session) { - return &session->draining_streams_; -} - -// static void QuicSessionPeer::ActivateStream(QuicSession* session, std::unique_ptr<QuicStream> stream) { return session->ActivateStream(std::move(stream));
diff --git a/quic/test_tools/quic_session_peer.h b/quic/test_tools/quic_session_peer.h index 72caa91..5a58360 100644 --- a/quic/test_tools/quic_session_peer.h +++ b/quic/test_tools/quic_session_peer.h
@@ -59,8 +59,6 @@ static QuicSession::StreamMap& stream_map(QuicSession* session); static const QuicSession::ClosedStreams& closed_streams(QuicSession* session); static QuicSession::ZombieStreamMap& zombie_streams(QuicSession* session); - static QuicUnorderedSet<QuicStreamId>* GetDrainingStreams( - QuicSession* session); static void ActivateStream(QuicSession* session, std::unique_ptr<QuicStream> stream);
diff --git a/quic/test_tools/quic_test_utils.h b/quic/test_tools/quic_test_utils.h index 8147dcd..fafe8d0 100644 --- a/quic/test_tools/quic_test_utils.h +++ b/quic/test_tools/quic_test_utils.h
@@ -666,6 +666,7 @@ MOCK_METHOD1(OnAlpnSelected, void(quiche::QuicheStringPiece)); using QuicSession::ActivateStream; + using QuicSession::GetNumDrainingStreams; // Returns a QuicConsumedData that indicates all of |write_length| (and |fin| // if set) has been consumed.
diff --git a/quic/tools/quic_simple_server_session_test.cc b/quic/tools/quic_simple_server_session_test.cc index 1d5f3df..bdcedbc 100644 --- a/quic/tools/quic_simple_server_session_test.cc +++ b/quic/tools/quic_simple_server_session_test.cc
@@ -825,9 +825,11 @@ } if (VersionUsesHttp3(transport_version())) { - session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3)); + session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3), + /*unidirectional=*/true); } else { - session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(0)); + session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(0), + /*unidirectional=*/true); } // Number of open outgoing streams should still be the same, because a new // stream is opened. And the queue should be empty. @@ -924,8 +926,10 @@ session_->OnMaxStreamsFrame( QuicMaxStreamsFrame(0, num_resources + 3, /*unidirectional=*/true)); } - session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3)); - session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(4)); + session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3), + /*unidirectional=*/true); + session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(4), + /*unidirectional=*/true); } // Tests that closing a open outgoing stream can trigger a promised resource in