Remove zombie stream map in QuicSession.
Those streams will live in stream_map_ instead.
This CL also removes some duplicated tests.
Protected by gfe2_reloadable_flag_quic_remove_zombie_streams
PiperOrigin-RevId: 324877471
Change-Id: I6d3df84048c35e7ca52ec030e019bbc0def4d3ce
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index e025706..90304c0 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -420,6 +420,7 @@
for (auto& stream : *closed_streams()) {
static_cast<QuicSpdyStream*>(stream.get())->ClearSession();
}
+ DCHECK(!remove_zombie_streams() || zombie_streams().empty());
for (auto const& kv : zombie_streams()) {
static_cast<QuicSpdyStream*>(kv.second.get())->ClearSession();
}
@@ -1178,8 +1179,9 @@
}
bool QuicSpdySession::HasActiveRequestStreams() const {
- DCHECK_GE(static_cast<size_t>(stream_map_size()), num_static_streams());
- return stream_map_size() - num_static_streams() > 0;
+ DCHECK_GE(static_cast<size_t>(stream_map_size()),
+ num_static_streams() + num_zombie_streams());
+ return stream_map_size() - num_static_streams() - num_zombie_streams() > 0;
}
bool QuicSpdySession::ProcessPendingStream(PendingStream* pending) {
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 18b3951..ede4d2c 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -331,7 +331,6 @@
using QuicSession::closed_streams;
using QuicSession::ShouldKeepConnectionAlive;
- using QuicSession::zombie_streams;
using QuicSpdySession::ProcessPendingStream;
using QuicSpdySession::UsesPendingStreams;
@@ -2096,21 +2095,6 @@
EXPECT_EQ(3u, stream->flow_controller()->highest_received_byte_offset());
}
-TEST_P(QuicSpdySessionTestServer, ZombieStreams) {
- TestStream* stream2 = session_.CreateOutgoingBidirectionalStream();
- QuicStreamPeer::SetStreamBytesWritten(3, stream2);
- EXPECT_TRUE(stream2->IsWaitingForAcks());
-
- CloseStream(stream2->id());
- EXPECT_FALSE(QuicContainsKey(session_.zombie_streams(), stream2->id()));
- ASSERT_EQ(1u, session_.closed_streams()->size());
- EXPECT_EQ(stream2->id(), session_.closed_streams()->front()->id());
- session_.OnStreamDoneWaitingForAcks(2);
- EXPECT_FALSE(QuicContainsKey(session_.zombie_streams(), stream2->id()));
- EXPECT_EQ(1u, session_.closed_streams()->size());
- EXPECT_EQ(stream2->id(), session_.closed_streams()->front()->id());
-}
-
TEST_P(QuicSpdySessionTestServer, OnStreamFrameLost) {
InSequence s;
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc
index bc09fc0..b556367 100644
--- a/quic/core/http/quic_spdy_stream_test.cc
+++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -1730,18 +1730,6 @@
QuicTime::Zero()));
}
-TEST_P(QuicSpdyStreamTest, StreamBecomesZombieWithWriteThatCloses) {
- Initialize(kShouldProcessData);
- EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(AtLeast(1));
- QuicStreamPeer::CloseReadSide(stream_);
- // This write causes stream to be closed.
- stream_->WriteOrBufferBody("Test1", true);
- // stream_ has unacked data and should become zombie.
- EXPECT_TRUE(QuicContainsKey(QuicSessionPeer::zombie_streams(session_.get()),
- stream_->id()));
- EXPECT_TRUE(QuicSessionPeer::closed_streams(session_.get()).empty());
-}
-
TEST_P(QuicSpdyStreamTest, OnPriorityFrame) {
Initialize(kShouldProcessData);
stream_->OnPriorityFrame(spdy::SpdyStreamPrecedence(kV3HighestPriority));
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 6379861..ecfb2b2 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -76,6 +76,7 @@
num_draining_streams_(0),
num_outgoing_draining_streams_(0),
num_static_streams_(0),
+ num_zombie_streams_(0),
flow_controller_(
this,
QuicUtils::GetInvalidStreamId(connection->transport_version()),
@@ -102,8 +103,10 @@
liveness_testing_in_progress_(false),
remove_streams_waiting_for_acks_(
GetQuicReloadableFlag(quic_remove_streams_waiting_for_acks)),
- do_not_use_stream_map_(
- GetQuicReloadableFlag(quic_do_not_use_stream_map)) {
+ do_not_use_stream_map_(GetQuicReloadableFlag(quic_do_not_use_stream_map)),
+ remove_zombie_streams_(
+ GetQuicReloadableFlag(quic_remove_zombie_streams) &&
+ do_not_use_stream_map_ && remove_streams_waiting_for_acks_) {
closed_streams_clean_up_alarm_ =
QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
new ClosedStreamsCleanUpDelegate(this)));
@@ -146,7 +149,13 @@
}
QuicSession::~QuicSession() {
- QUIC_LOG_IF(WARNING, !zombie_streams_.empty()) << "Still have zombie streams";
+ if (!remove_zombie_streams_) {
+ QUIC_LOG_IF(WARNING, !zombie_streams_.empty())
+ << "Still have zombie streams";
+ } else {
+ QUIC_LOG_IF(WARNING, num_zombie_streams_ > 0)
+ << "Still have zombie streams";
+ }
}
void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
@@ -412,9 +421,16 @@
stream->OnConnectionClosed(frame.quic_error_code, source);
QUIC_RELOADABLE_FLAG_COUNT(
quic_do_not_close_stream_again_on_connection_close);
- if (stream_map_.find(id) != stream_map_.end()) {
- QUIC_BUG << ENDPOINT << "Stream " << id
- << " failed to close under OnConnectionClosed";
+ auto it = stream_map_.find(id);
+ if (it != stream_map_.end()) {
+ if (!remove_zombie_streams_) {
+ QUIC_BUG << ENDPOINT << "Stream " << id
+ << " failed to close under OnConnectionClosed";
+ } else {
+ QUIC_BUG_IF(!it->second->IsZombie())
+ << ENDPOINT << "Non-zombie stream " << id
+ << " failed to close under OnConnectionClosed";
+ }
if (!GetQuicReloadableFlag(
quic_do_not_close_stream_again_on_connection_close)) {
CloseStream(id);
@@ -424,11 +440,16 @@
});
}
- // Cleanup zombie stream map on connection close.
- while (!zombie_streams_.empty()) {
- ZombieStreamMap::iterator it = zombie_streams_.begin();
- closed_streams_.push_back(std::move(it->second));
- zombie_streams_.erase(it);
+ if (!remove_zombie_streams_) {
+ // Cleanup zombie stream map on connection close.
+ while (!zombie_streams_.empty()) {
+ ZombieStreamMap::iterator it = zombie_streams_.begin();
+ closed_streams_.push_back(std::move(it->second));
+ zombie_streams_.erase(it);
+ }
+ } else {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 1, 4);
+ DCHECK(zombie_streams_.empty());
}
closed_streams_clean_up_alarm_->Cancel();
@@ -900,8 +921,15 @@
QuicStream* stream = it->second.get();
StreamType type = stream->type();
- if (stream->IsWaitingForAcks()) {
- zombie_streams_[stream_id] = std::move(it->second);
+ const bool stream_waiting_for_acks = stream->IsWaitingForAcks();
+ if (stream_waiting_for_acks) {
+ if (remove_zombie_streams_) {
+ // The stream needs to be kept alive because it's waiting for acks.
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 2, 4);
+ ++num_zombie_streams_;
+ } else {
+ zombie_streams_[stream_id] = std::move(it->second);
+ }
} else {
// Clean up the stream since it is no longer waiting for acks.
if (remove_streams_waiting_for_acks_) {
@@ -910,6 +938,11 @@
streams_waiting_for_acks_.erase(stream_id);
}
closed_streams_.push_back(std::move(it->second));
+ if (remove_zombie_streams_) {
+ // When zombie_streams_ is removed, stream is only erased from stream map
+ // if it's not zombie.
+ stream_map_.erase(it);
+ }
// Do not retransmit data of a closed stream.
streams_with_pending_retransmission_.erase(stream_id);
if (!closed_streams_clean_up_alarm_->IsSet()) {
@@ -927,14 +960,18 @@
DCHECK(!stream->was_draining());
InsertLocallyClosedStreamsHighestOffset(
stream_id, stream->flow_controller()->highest_received_byte_offset());
- stream_map_.erase(it);
+ if (!remove_zombie_streams_) {
+ stream_map_.erase(it);
+ }
return;
}
const bool stream_was_draining = stream->was_draining();
QUIC_DVLOG_IF(1, stream_was_draining)
<< ENDPOINT << "Stream " << stream_id << " was draining";
- stream_map_.erase(it);
+ if (!remove_zombie_streams_) {
+ stream_map_.erase(it);
+ }
if (stream_was_draining) {
QUIC_BUG_IF(num_draining_streams_ == 0);
--num_draining_streams_;
@@ -1811,6 +1848,10 @@
StreamMap::iterator it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
+ if (remove_zombie_streams_ && it->second->IsZombie()) {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 3, 4);
+ return nullptr;
+ }
return it->second.get();
}
@@ -1955,8 +1996,11 @@
bool QuicSession::IsOpenStream(QuicStreamId id) {
DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
- if (QuicContainsKey(stream_map_, id) ||
- QuicContainsKey(pending_stream_map_, id) ||
+ const StreamMap::iterator it = stream_map_.find(id);
+ if (it != stream_map_.end()) {
+ return remove_zombie_streams_ ? !it->second->IsZombie() : true;
+ }
+ if (QuicContainsKey(pending_stream_map_, id) ||
QuicUtils::IsCryptoStreamId(transport_version(), id)) {
// Stream is active
return true;
@@ -1981,8 +2025,9 @@
locally_closed_streams_highest_offset_.size();
}
DCHECK_GE(static_cast<QuicStreamCount>(stream_map_.size()),
- num_static_streams_ + num_draining_streams_);
- return stream_map_.size() - num_draining_streams_ - num_static_streams_;
+ num_static_streams_ + num_draining_streams_ + num_zombie_streams_);
+ return stream_map_.size() - num_draining_streams_ - num_static_streams_ -
+ num_zombie_streams_;
}
void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
@@ -2058,16 +2103,28 @@
streams_waiting_for_acks_.erase(id);
}
- auto it = zombie_streams_.find(id);
- if (it == zombie_streams_.end()) {
- return;
+ if (!remove_zombie_streams_) {
+ auto it = zombie_streams_.find(id);
+ if (it == zombie_streams_.end()) {
+ return;
+ }
+
+ closed_streams_.push_back(std::move(it->second));
+ zombie_streams_.erase(it);
+ } else {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 4, 4);
+ auto it = stream_map_.find(id);
+ if (it == stream_map_.end()) {
+ return;
+ }
+ --num_zombie_streams_;
+ closed_streams_.push_back(std::move(it->second));
+ stream_map_.erase(it);
}
- closed_streams_.push_back(std::move(it->second));
if (!closed_streams_clean_up_alarm_->IsSet()) {
closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
}
- zombie_streams_.erase(it);
// Do not retransmit data of a closed stream.
streams_with_pending_retransmission_.erase(id);
}
@@ -2086,6 +2143,7 @@
// The number of the streams waiting for acks should not be larger than the
// number of streams.
+ DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
if (static_cast<size_t>(stream_map_.size() + zombie_streams_.size()) <
streams_waiting_for_acks_.size()) {
QUIC_BUG << "More streams are waiting for acks than the number of streams. "
@@ -2101,6 +2159,8 @@
if (active_stream != stream_map_.end()) {
return active_stream->second.get();
}
+
+ DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
auto zombie_stream = zombie_streams_.find(id);
if (zombie_stream != zombie_streams_.end()) {
return zombie_stream->second.get();
@@ -2250,6 +2310,7 @@
return !streams_waiting_for_acks_.empty();
}
QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_streams_waiting_for_acks, 4, 4);
+ DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
if (!zombie_streams().empty()) {
return true;
}
@@ -2497,14 +2558,15 @@
void QuicSession::PerformActionOnActiveStreams(
std::function<bool(QuicStream*)> action) {
- QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
+ QuicSmallMap<QuicStreamId, QuicStream*, 10> active_streams;
for (const auto& it : stream_map_) {
- if (!it.second->is_static()) {
- non_static_streams[it.first] = it.second.get();
+ if (!it.second->is_static() &&
+ (!remove_zombie_streams_ || !it.second->IsZombie())) {
+ active_streams[it.first] = it.second.get();
}
}
- for (const auto& it : non_static_streams) {
+ for (const auto& it : active_streams) {
if (!action(it.second)) {
return;
}
@@ -2514,7 +2576,9 @@
void QuicSession::PerformActionOnActiveStreams(
std::function<bool(QuicStream*)> action) const {
for (const auto& it : stream_map_) {
- if (!it.second->is_static() && !action(it.second.get())) {
+ if (!it.second->is_static() &&
+ (!remove_zombie_streams_ || !it.second->IsZombie()) &&
+ !action(it.second.get())) {
return;
}
}
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index 57b95eb..0f53f09 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -347,6 +347,7 @@
// Called when stream |id| is done waiting for acks either because all data
// gets acked or is not interested in data being acked (which happens when
// a stream is reset because of an error).
+ // TODO(b/136274541): rename to CloseZombieStreams.
void OnStreamDoneWaitingForAcks(QuicStreamId id);
// TODO(b/136274541): Remove this once quic_remove_streams_waiting_for_acks is
@@ -498,6 +499,8 @@
return liveness_testing_in_progress_;
}
+ bool remove_zombie_streams() const { return remove_zombie_streams_; }
+
protected:
using StreamMap = QuicHashMap<QuicStreamId, std::unique_ptr<QuicStream>>;
@@ -621,6 +624,8 @@
size_t num_static_streams() const { return num_static_streams_; }
+ size_t num_zombie_streams() const { return num_zombie_streams_; }
+
bool was_zero_rtt_rejected() const { return was_zero_rtt_rejected_; }
bool do_not_use_stream_map() const { return do_not_use_stream_map_; }
@@ -799,6 +804,10 @@
// A counter for static streams which are in stream_map_.
size_t num_static_streams_;
+ // A counter for streams which have done reading and writing, but are waiting
+ // for acks.
+ size_t num_zombie_streams_;
+
// Received information for a connection close.
QuicConnectionCloseFrame on_closed_frame_;
@@ -860,6 +869,9 @@
// Latched value of flag quic_do_not_use_stream_map.
const bool do_not_use_stream_map_;
+
+ // Latched value of flag quic_remove_zombie_streams.
+ const bool remove_zombie_streams_;
};
} // namespace quic
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index a033573..1a5a290 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -364,6 +364,7 @@
using QuicSession::closed_streams;
using QuicSession::GetNextOutgoingBidirectionalStreamId;
using QuicSession::GetNextOutgoingUnidirectionalStreamId;
+ using QuicSession::stream_map;
using QuicSession::zombie_streams;
private:
@@ -2572,7 +2573,13 @@
stream2->WriteOrBufferData(body, true, nullptr);
EXPECT_TRUE(stream2->IsWaitingForAcks());
// Verify stream2 is a zombie streams.
- EXPECT_TRUE(QuicContainsKey(session_.zombie_streams(), stream2->id()));
+ if (!session_.remove_zombie_streams()) {
+ EXPECT_TRUE(QuicContainsKey(session_.zombie_streams(), stream2->id()));
+ } else {
+ ASSERT_TRUE(QuicContainsKey(session_.stream_map(), stream2->id()));
+ auto* stream = session_.stream_map().find(stream2->id())->second.get();
+ EXPECT_TRUE(stream->IsZombie());
+ }
QuicStreamFrame frame(stream2->id(), true, 0, 100);
EXPECT_CALL(*stream2, HasPendingRetransmission())
@@ -2620,7 +2627,13 @@
stream4->WriteOrBufferData(body, false, nullptr);
EXPECT_FALSE(QuicContainsKey(session_.zombie_streams(), stream4->id()));
stream4->WriteOrBufferData(body, true, nullptr);
- EXPECT_TRUE(QuicContainsKey(session_.zombie_streams(), stream4->id()));
+ if (!session_.remove_zombie_streams()) {
+ EXPECT_TRUE(QuicContainsKey(session_.zombie_streams(), stream4->id()));
+ } else {
+ ASSERT_TRUE(QuicContainsKey(session_.stream_map(), stream4->id()));
+ auto* stream = session_.stream_map().find(stream4->id())->second.get();
+ EXPECT_TRUE(stream->IsZombie());
+ }
}
TEST_P(QuicSessionTestServer, ReceivedDataOnWriteUnidirectionalStream) {
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 9e695d4..ee3f983 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -979,7 +979,8 @@
fin_outstanding_ = false;
fin_lost_ = false;
}
- if (!IsWaitingForAcks()) {
+ if (!IsWaitingForAcks() && (!session()->remove_zombie_streams() ||
+ (read_side_closed_ && write_side_closed_))) {
session_->OnStreamDoneWaitingForAcks(id_);
}
return new_data_acked;
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index 2a7d036..12b1192 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -203,6 +203,10 @@
}
bool write_side_closed() const { return write_side_closed_; }
+ bool IsZombie() const {
+ return read_side_closed_ && write_side_closed_ && IsWaitingForAcks();
+ }
+
bool rst_received() const { return rst_received_; }
bool rst_sent() const { return rst_sent_; }
bool fin_received() const { return fin_received_; }
diff --git a/quic/qbone/qbone_client_session.cc b/quic/qbone/qbone_client_session.cc
index 2e5d205..f375e0a 100644
--- a/quic/qbone/qbone_client_session.cc
+++ b/quic/qbone/qbone_client_session.cc
@@ -94,7 +94,7 @@
const ProofVerifyDetails& verify_details) {}
bool QboneClientSession::HasActiveRequests() const {
- return (stream_map_size() - num_static_streams()) > 0;
+ return (stream_map_size() - num_static_streams() - num_zombie_streams()) > 0;
}
} // namespace quic
diff --git a/quic/quic_transport/quic_transport_client_session_test.cc b/quic/quic_transport/quic_transport_client_session_test.cc
index ba65ce2..9c2f138 100644
--- a/quic/quic_transport/quic_transport_client_session_test.cc
+++ b/quic/quic_transport/quic_transport_client_session_test.cc
@@ -108,9 +108,16 @@
Connect();
EXPECT_TRUE(session_->IsSessionReady());
- QuicStream* client_indication_stream =
- QuicSessionPeer::zombie_streams(session_.get())[ClientIndicationStream()]
- .get();
+ QuicStream* client_indication_stream;
+ if (session_->remove_zombie_streams()) {
+ client_indication_stream =
+ QuicSessionPeer::stream_map(session_.get())[ClientIndicationStream()]
+ .get();
+ } else {
+ client_indication_stream = QuicSessionPeer::zombie_streams(
+ session_.get())[ClientIndicationStream()]
+ .get();
+ }
ASSERT_TRUE(client_indication_stream != nullptr);
const std::string client_indication = DataInStream(client_indication_stream);
const std::string expected_client_indication{
@@ -133,9 +140,16 @@
Connect();
EXPECT_TRUE(session_->IsSessionReady());
- QuicStream* client_indication_stream =
- QuicSessionPeer::zombie_streams(session_.get())[ClientIndicationStream()]
- .get();
+ QuicStream* client_indication_stream;
+ if (session_->remove_zombie_streams()) {
+ client_indication_stream =
+ QuicSessionPeer::stream_map(session_.get())[ClientIndicationStream()]
+ .get();
+ } else {
+ client_indication_stream = QuicSessionPeer::zombie_streams(
+ session_.get())[ClientIndicationStream()]
+ .get();
+ }
ASSERT_TRUE(client_indication_stream != nullptr);
const std::string client_indication = DataInStream(client_indication_stream);
const std::string expected_client_indication{