gfe-relnote: Remove draining_streams_ from QuicSession. Protected by gfe2_reloadable_flag_quic_deprecate_draining_streams.
PiperOrigin-RevId: 306919260
Change-Id: I6d40df89133f450ff3e913280465aff328714408
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 63fb18d..e09c9bc 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -1774,7 +1774,7 @@
QuicStreamFrame data1(i, true, 0, quiche::QuicheStringPiece("HT"));
session_.OnStreamFrame(data1);
EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams());
- session_.StreamDraining(i);
+ session_.StreamDraining(i, /*unidirectional=*/false);
EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
}
}
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index b57ddd7..4124575 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -93,6 +93,7 @@
num_expected_unidirectional_static_streams),
num_dynamic_incoming_streams_(0),
num_draining_incoming_streams_(0),
+ num_draining_outgoing_streams_(0),
num_outgoing_static_streams_(0),
num_incoming_static_streams_(0),
num_locally_closed_incoming_streams_highest_offset_(0),
@@ -121,7 +122,9 @@
num_expected_unidirectional_static_streams),
enable_round_robin_scheduling_(false),
write_with_transmission_(
- GetQuicReloadableFlag(quic_write_with_transmission)) {
+ GetQuicReloadableFlag(quic_write_with_transmission)),
+ deprecate_draining_streams_(
+ GetQuicReloadableFlag(quic_deprecate_draining_streams)) {
closed_streams_clean_up_alarm_ =
QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
new ClosedStreamsCleanUpDelegate(this)));
@@ -685,7 +688,7 @@
}
uint64_t QuicSession::GetNumOpenDynamicStreams() const {
- return stream_map_.size() - draining_streams_.size() +
+ return stream_map_.size() - GetNumDrainingStreams() +
locally_closed_streams_highest_offset_.size() -
num_incoming_static_streams_ - num_outgoing_static_streams_;
}
@@ -904,16 +907,27 @@
InsertLocallyClosedStreamsHighestOffset(
stream_id, stream->flow_controller()->highest_received_byte_offset());
}
+ bool stream_was_draining = false;
+ if (deprecate_draining_streams_) {
+ stream_was_draining = stream->was_draining();
+ QUIC_DVLOG_IF(1, stream_was_draining)
+ << ENDPOINT << "Stream " << stream_id << " was draining";
+ }
stream_map_.erase(it);
if (IsIncomingStream(stream_id)) {
--num_dynamic_incoming_streams_;
}
-
- const bool stream_was_draining =
- draining_streams_.find(stream_id) != draining_streams_.end();
+ if (!deprecate_draining_streams_) {
+ stream_was_draining =
+ draining_streams_.find(stream_id) != draining_streams_.end();
+ }
if (stream_was_draining) {
if (IsIncomingStream(stream_id)) {
+ QUIC_BUG_IF(num_draining_incoming_streams_ == 0);
--num_draining_incoming_streams_;
+ } else if (deprecate_draining_streams_) {
+ QUIC_BUG_IF(num_draining_outgoing_streams_ == 0);
+ --num_draining_outgoing_streams_;
}
draining_streams_.erase(stream_id);
} else if (VersionHasIetfQuicFrames(transport_version())) {
@@ -1593,8 +1607,22 @@
return CreateIncomingStream(stream_id);
}
-void QuicSession::StreamDraining(QuicStreamId stream_id) {
+void QuicSession::StreamDraining(QuicStreamId stream_id, bool unidirectional) {
DCHECK(QuicContainsKey(stream_map_, stream_id));
+ if (deprecate_draining_streams_) {
+ QUIC_RELOADABLE_FLAG_COUNT(quic_deprecate_draining_streams);
+ QUIC_DVLOG(1) << ENDPOINT << "Stream " << stream_id << " is draining";
+ if (VersionHasIetfQuicFrames(transport_version())) {
+ v99_streamid_manager_.OnStreamClosed(stream_id);
+ }
+ if (IsIncomingStream(stream_id)) {
+ ++num_draining_incoming_streams_;
+ return;
+ }
+ ++num_draining_outgoing_streams_;
+ OnCanCreateNewOutgoingStream(unidirectional);
+ return;
+ }
if (!QuicContainsKey(draining_streams_, stream_id)) {
draining_streams_.insert(stream_id);
if (IsIncomingStream(stream_id)) {
@@ -1749,11 +1777,14 @@
}
size_t QuicSession::GetNumActiveStreams() const {
- return stream_map_.size() - draining_streams_.size() -
+ return stream_map_.size() - GetNumDrainingStreams() -
num_incoming_static_streams_ - num_outgoing_static_streams_;
}
size_t QuicSession::GetNumDrainingStreams() const {
+ if (deprecate_draining_streams_) {
+ return num_draining_incoming_streams_ + num_draining_outgoing_streams_;
+ }
return draining_streams_.size();
}
@@ -1796,6 +1827,9 @@
}
size_t QuicSession::GetNumDrainingOutgoingStreams() const {
+ if (deprecate_draining_streams_) {
+ return num_draining_outgoing_streams_;
+ }
DCHECK_GE(draining_streams_.size(), num_draining_incoming_streams_);
return draining_streams_.size() - num_draining_incoming_streams_;
}
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index 4c4626b..d1720a0 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -400,7 +400,7 @@
QuicStream* GetOrCreateStream(const QuicStreamId stream_id);
// Mark a stream as draining.
- virtual void StreamDraining(QuicStreamId id);
+ void StreamDraining(QuicStreamId id, bool unidirectional);
// Returns true if this stream should yield writes to another blocked stream.
virtual bool ShouldYield(QuicStreamId stream_id);
@@ -483,6 +483,10 @@
bool write_with_transmission() const { return write_with_transmission_; }
+ bool deprecate_draining_streams() const {
+ return deprecate_draining_streams_;
+ }
+
protected:
using StreamMap = QuicSmallMap<QuicStreamId, std::unique_ptr<QuicStream>, 10>;
@@ -735,6 +739,8 @@
// Set of stream ids that are "draining" -- a FIN has been sent and received,
// but the stream object still exists because not all the received data has
// been consumed.
+ // TODO(fayang): Remove draining_streams_ when deprecate
+ // quic_deprecate_draining_streams.
QuicUnorderedSet<QuicStreamId> draining_streams_;
// Set of stream ids that are waiting for acks excluding crypto stream id.
@@ -751,9 +757,15 @@
// A counter for peer initiated dynamic streams which are in the stream_map_.
size_t num_dynamic_incoming_streams_;
- // A counter for peer initiated streams which are in the draining_streams_.
+ // A counter for peer initiated streams which have sent and received FIN but
+ // waiting for application to consume data.
size_t num_draining_incoming_streams_;
+ // A counter for self initiated streams which have sent and received FIN but
+ // waiting for application to consume data. Only used when
+ // deprecate_draining_streams_ is true.
+ size_t num_draining_outgoing_streams_;
+
// A counter for self initiated static streams which are in
// stream_map_.
size_t num_outgoing_static_streams_;
@@ -818,6 +830,9 @@
// Latched value of gfe2_reloadable_flag_quic_write_with_transmission.
const bool write_with_transmission_;
+
+ // Latched value of quic_deprecate_draining_streams.
+ const bool deprecate_draining_streams_;
};
} // namespace quic
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index d50931c..bf4bffc 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -1888,7 +1888,7 @@
QuicStreamFrame data1(stream_id, true, 0, quiche::QuicheStringPiece("HT"));
session_.OnStreamFrame(data1);
EXPECT_CALL(session_, OnCanCreateNewOutgoingStream(false)).Times(1);
- session_.StreamDraining(stream_id);
+ session_.StreamDraining(stream_id, /*unidirectional=*/false);
}
TEST_P(QuicSessionTestServer, NoPendingStreams) {
@@ -2019,7 +2019,7 @@
QuicStreamFrame data1(i, true, 0, quiche::QuicheStringPiece("HT"));
session_.OnStreamFrame(data1);
EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams());
- session_.StreamDraining(i);
+ session_.StreamDraining(i, /*unidirectional=*/false);
EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
}
}
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 3b2484d..813ea2d 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -353,6 +353,7 @@
buffered_data_threshold_(GetQuicFlag(FLAGS_quic_buffered_data_threshold)),
is_static_(is_static),
deadline_(QuicTime::Zero()),
+ was_draining_(false),
type_(VersionHasIetfQuicFrames(session->transport_version()) &&
type != CRYPTO
? QuicUtils::GetStreamType(id_,
@@ -431,9 +432,14 @@
}
if (frame.fin) {
- fin_received_ = true;
- if (fin_sent_) {
- session_->StreamDraining(id_);
+ if (!session_->deprecate_draining_streams() || !fin_received_) {
+ fin_received_ = true;
+ if (fin_sent_) {
+ DCHECK(!was_draining_ || !session_->deprecate_draining_streams());
+ session_->StreamDraining(id_,
+ /*unidirectional=*/type_ != BIDIRECTIONAL);
+ was_draining_ = true;
+ }
}
}
@@ -1087,10 +1093,14 @@
MaybeSendBlocked();
}
if (fin && consumed_data.fin_consumed) {
+ DCHECK(!fin_sent_);
fin_sent_ = true;
fin_outstanding_ = true;
if (fin_received_) {
- session_->StreamDraining(id_);
+ DCHECK(!was_draining_);
+ session_->StreamDraining(id_,
+ /*unidirectional=*/type_ != BIDIRECTIONAL);
+ was_draining_ = true;
}
CloseWriteSide();
} else if (fin && !consumed_data.fin_consumed) {
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index bf12149..b8e940f 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -358,6 +358,8 @@
// Returns true if the stream is static.
bool is_static() const { return is_static_; }
+ bool was_draining() const { return was_draining_; }
+
static spdy::SpdyStreamPrecedence CalculateDefaultPriority(
const QuicSession* session);
@@ -534,6 +536,10 @@
// If initialized, reset this stream at this deadline.
QuicTime deadline_;
+ // True if this stream has entered draining state. Only used when
+ // quic_deprecate_draining_streams is true.
+ bool was_draining_;
+
// Indicates whether this stream is bidirectional, read unidirectional or
// write unidirectional.
const StreamType type_;
diff --git a/quic/core/quic_stream_test.cc b/quic/core/quic_stream_test.cc
index eea7b15..100c05e 100644
--- a/quic/core/quic_stream_test.cc
+++ b/quic/core/quic_stream_test.cc
@@ -746,8 +746,7 @@
nullptr);
EXPECT_TRUE(stream_->write_side_closed());
- EXPECT_EQ(1u, QuicSessionPeer::GetDrainingStreams(session_.get())
- ->count(kTestStreamId));
+ EXPECT_EQ(1u, session_->GetNumDrainingStreams());
EXPECT_EQ(0u, session_->GetNumOpenIncomingStreams());
}
@@ -775,8 +774,7 @@
EXPECT_FALSE(QuicStreamPeer::read_side_closed(stream_));
EXPECT_FALSE(stream_->reading_stopped());
- EXPECT_EQ(1u, QuicSessionPeer::GetDrainingStreams(session_.get())
- ->count(kTestStreamId));
+ EXPECT_EQ(1u, session_->GetNumDrainingStreams());
EXPECT_EQ(0u, session_->GetNumOpenIncomingStreams());
}
diff --git a/quic/test_tools/quic_session_peer.cc b/quic/test_tools/quic_session_peer.cc
index cbaf5d6..c23bc88 100644
--- a/quic/test_tools/quic_session_peer.cc
+++ b/quic/test_tools/quic_session_peer.cc
@@ -143,12 +143,6 @@
}
// static
-QuicUnorderedSet<QuicStreamId>* QuicSessionPeer::GetDrainingStreams(
- QuicSession* session) {
- return &session->draining_streams_;
-}
-
-// static
void QuicSessionPeer::ActivateStream(QuicSession* session,
std::unique_ptr<QuicStream> stream) {
return session->ActivateStream(std::move(stream));
diff --git a/quic/test_tools/quic_session_peer.h b/quic/test_tools/quic_session_peer.h
index 72caa91..5a58360 100644
--- a/quic/test_tools/quic_session_peer.h
+++ b/quic/test_tools/quic_session_peer.h
@@ -59,8 +59,6 @@
static QuicSession::StreamMap& stream_map(QuicSession* session);
static const QuicSession::ClosedStreams& closed_streams(QuicSession* session);
static QuicSession::ZombieStreamMap& zombie_streams(QuicSession* session);
- static QuicUnorderedSet<QuicStreamId>* GetDrainingStreams(
- QuicSession* session);
static void ActivateStream(QuicSession* session,
std::unique_ptr<QuicStream> stream);
diff --git a/quic/test_tools/quic_test_utils.h b/quic/test_tools/quic_test_utils.h
index 8147dcd..fafe8d0 100644
--- a/quic/test_tools/quic_test_utils.h
+++ b/quic/test_tools/quic_test_utils.h
@@ -666,6 +666,7 @@
MOCK_METHOD1(OnAlpnSelected, void(quiche::QuicheStringPiece));
using QuicSession::ActivateStream;
+ using QuicSession::GetNumDrainingStreams;
// Returns a QuicConsumedData that indicates all of |write_length| (and |fin|
// if set) has been consumed.
diff --git a/quic/tools/quic_simple_server_session_test.cc b/quic/tools/quic_simple_server_session_test.cc
index 1d5f3df..bdcedbc 100644
--- a/quic/tools/quic_simple_server_session_test.cc
+++ b/quic/tools/quic_simple_server_session_test.cc
@@ -825,9 +825,11 @@
}
if (VersionUsesHttp3(transport_version())) {
- session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3));
+ session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3),
+ /*unidirectional=*/true);
} else {
- session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(0));
+ session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(0),
+ /*unidirectional=*/true);
}
// Number of open outgoing streams should still be the same, because a new
// stream is opened. And the queue should be empty.
@@ -924,8 +926,10 @@
session_->OnMaxStreamsFrame(
QuicMaxStreamsFrame(0, num_resources + 3, /*unidirectional=*/true));
}
- session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3));
- session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(4));
+ session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3),
+ /*unidirectional=*/true);
+ session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(4),
+ /*unidirectional=*/true);
}
// Tests that closing a open outgoing stream can trigger a promised resource in