Remove zombie stream map in QuicSession.
Those streams will live in stream_map_ instead.
This CL also removes some duplicated tests.
Protected by gfe2_reloadable_flag_quic_remove_zombie_streams
PiperOrigin-RevId: 324877471
Change-Id: I6d3df84048c35e7ca52ec030e019bbc0def4d3ce
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 6379861..ecfb2b2 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -76,6 +76,7 @@
num_draining_streams_(0),
num_outgoing_draining_streams_(0),
num_static_streams_(0),
+ num_zombie_streams_(0),
flow_controller_(
this,
QuicUtils::GetInvalidStreamId(connection->transport_version()),
@@ -102,8 +103,10 @@
liveness_testing_in_progress_(false),
remove_streams_waiting_for_acks_(
GetQuicReloadableFlag(quic_remove_streams_waiting_for_acks)),
- do_not_use_stream_map_(
- GetQuicReloadableFlag(quic_do_not_use_stream_map)) {
+ do_not_use_stream_map_(GetQuicReloadableFlag(quic_do_not_use_stream_map)),
+ remove_zombie_streams_(
+ GetQuicReloadableFlag(quic_remove_zombie_streams) &&
+ do_not_use_stream_map_ && remove_streams_waiting_for_acks_) {
closed_streams_clean_up_alarm_ =
QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
new ClosedStreamsCleanUpDelegate(this)));
@@ -146,7 +149,13 @@
}
QuicSession::~QuicSession() {
- QUIC_LOG_IF(WARNING, !zombie_streams_.empty()) << "Still have zombie streams";
+ if (!remove_zombie_streams_) {
+ QUIC_LOG_IF(WARNING, !zombie_streams_.empty())
+ << "Still have zombie streams";
+ } else {
+ QUIC_LOG_IF(WARNING, num_zombie_streams_ > 0)
+ << "Still have zombie streams";
+ }
}
void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
@@ -412,9 +421,16 @@
stream->OnConnectionClosed(frame.quic_error_code, source);
QUIC_RELOADABLE_FLAG_COUNT(
quic_do_not_close_stream_again_on_connection_close);
- if (stream_map_.find(id) != stream_map_.end()) {
- QUIC_BUG << ENDPOINT << "Stream " << id
- << " failed to close under OnConnectionClosed";
+ auto it = stream_map_.find(id);
+ if (it != stream_map_.end()) {
+ if (!remove_zombie_streams_) {
+ QUIC_BUG << ENDPOINT << "Stream " << id
+ << " failed to close under OnConnectionClosed";
+ } else {
+ QUIC_BUG_IF(!it->second->IsZombie())
+ << ENDPOINT << "Non-zombie stream " << id
+ << " failed to close under OnConnectionClosed";
+ }
if (!GetQuicReloadableFlag(
quic_do_not_close_stream_again_on_connection_close)) {
CloseStream(id);
@@ -424,11 +440,16 @@
});
}
- // Cleanup zombie stream map on connection close.
- while (!zombie_streams_.empty()) {
- ZombieStreamMap::iterator it = zombie_streams_.begin();
- closed_streams_.push_back(std::move(it->second));
- zombie_streams_.erase(it);
+ if (!remove_zombie_streams_) {
+ // Cleanup zombie stream map on connection close.
+ while (!zombie_streams_.empty()) {
+ ZombieStreamMap::iterator it = zombie_streams_.begin();
+ closed_streams_.push_back(std::move(it->second));
+ zombie_streams_.erase(it);
+ }
+ } else {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 1, 4);
+ DCHECK(zombie_streams_.empty());
}
closed_streams_clean_up_alarm_->Cancel();
@@ -900,8 +921,15 @@
QuicStream* stream = it->second.get();
StreamType type = stream->type();
- if (stream->IsWaitingForAcks()) {
- zombie_streams_[stream_id] = std::move(it->second);
+ const bool stream_waiting_for_acks = stream->IsWaitingForAcks();
+ if (stream_waiting_for_acks) {
+ if (remove_zombie_streams_) {
+ // The stream needs to be kept alive because it's waiting for acks.
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 2, 4);
+ ++num_zombie_streams_;
+ } else {
+ zombie_streams_[stream_id] = std::move(it->second);
+ }
} else {
// Clean up the stream since it is no longer waiting for acks.
if (remove_streams_waiting_for_acks_) {
@@ -910,6 +938,11 @@
streams_waiting_for_acks_.erase(stream_id);
}
closed_streams_.push_back(std::move(it->second));
+ if (remove_zombie_streams_) {
+ // When zombie_streams_ is removed, stream is only erased from stream map
+ // if it's not zombie.
+ stream_map_.erase(it);
+ }
// Do not retransmit data of a closed stream.
streams_with_pending_retransmission_.erase(stream_id);
if (!closed_streams_clean_up_alarm_->IsSet()) {
@@ -927,14 +960,18 @@
DCHECK(!stream->was_draining());
InsertLocallyClosedStreamsHighestOffset(
stream_id, stream->flow_controller()->highest_received_byte_offset());
- stream_map_.erase(it);
+ if (!remove_zombie_streams_) {
+ stream_map_.erase(it);
+ }
return;
}
const bool stream_was_draining = stream->was_draining();
QUIC_DVLOG_IF(1, stream_was_draining)
<< ENDPOINT << "Stream " << stream_id << " was draining";
- stream_map_.erase(it);
+ if (!remove_zombie_streams_) {
+ stream_map_.erase(it);
+ }
if (stream_was_draining) {
QUIC_BUG_IF(num_draining_streams_ == 0);
--num_draining_streams_;
@@ -1811,6 +1848,10 @@
StreamMap::iterator it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
+ if (remove_zombie_streams_ && it->second->IsZombie()) {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 3, 4);
+ return nullptr;
+ }
return it->second.get();
}
@@ -1955,8 +1996,11 @@
bool QuicSession::IsOpenStream(QuicStreamId id) {
DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
- if (QuicContainsKey(stream_map_, id) ||
- QuicContainsKey(pending_stream_map_, id) ||
+ const StreamMap::iterator it = stream_map_.find(id);
+ if (it != stream_map_.end()) {
+ return remove_zombie_streams_ ? !it->second->IsZombie() : true;
+ }
+ if (QuicContainsKey(pending_stream_map_, id) ||
QuicUtils::IsCryptoStreamId(transport_version(), id)) {
// Stream is active
return true;
@@ -1981,8 +2025,9 @@
locally_closed_streams_highest_offset_.size();
}
DCHECK_GE(static_cast<QuicStreamCount>(stream_map_.size()),
- num_static_streams_ + num_draining_streams_);
- return stream_map_.size() - num_draining_streams_ - num_static_streams_;
+ num_static_streams_ + num_draining_streams_ + num_zombie_streams_);
+ return stream_map_.size() - num_draining_streams_ - num_static_streams_ -
+ num_zombie_streams_;
}
void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
@@ -2058,16 +2103,28 @@
streams_waiting_for_acks_.erase(id);
}
- auto it = zombie_streams_.find(id);
- if (it == zombie_streams_.end()) {
- return;
+ if (!remove_zombie_streams_) {
+ auto it = zombie_streams_.find(id);
+ if (it == zombie_streams_.end()) {
+ return;
+ }
+
+ closed_streams_.push_back(std::move(it->second));
+ zombie_streams_.erase(it);
+ } else {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 4, 4);
+ auto it = stream_map_.find(id);
+ if (it == stream_map_.end()) {
+ return;
+ }
+ --num_zombie_streams_;
+ closed_streams_.push_back(std::move(it->second));
+ stream_map_.erase(it);
}
- closed_streams_.push_back(std::move(it->second));
if (!closed_streams_clean_up_alarm_->IsSet()) {
closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
}
- zombie_streams_.erase(it);
// Do not retransmit data of a closed stream.
streams_with_pending_retransmission_.erase(id);
}
@@ -2086,6 +2143,7 @@
// The number of the streams waiting for acks should not be larger than the
// number of streams.
+ DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
if (static_cast<size_t>(stream_map_.size() + zombie_streams_.size()) <
streams_waiting_for_acks_.size()) {
QUIC_BUG << "More streams are waiting for acks than the number of streams. "
@@ -2101,6 +2159,8 @@
if (active_stream != stream_map_.end()) {
return active_stream->second.get();
}
+
+ DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
auto zombie_stream = zombie_streams_.find(id);
if (zombie_stream != zombie_streams_.end()) {
return zombie_stream->second.get();
@@ -2250,6 +2310,7 @@
return !streams_waiting_for_acks_.empty();
}
QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_streams_waiting_for_acks, 4, 4);
+ DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
if (!zombie_streams().empty()) {
return true;
}
@@ -2497,14 +2558,15 @@
void QuicSession::PerformActionOnActiveStreams(
std::function<bool(QuicStream*)> action) {
- QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
+ QuicSmallMap<QuicStreamId, QuicStream*, 10> active_streams;
for (const auto& it : stream_map_) {
- if (!it.second->is_static()) {
- non_static_streams[it.first] = it.second.get();
+ if (!it.second->is_static() &&
+ (!remove_zombie_streams_ || !it.second->IsZombie())) {
+ active_streams[it.first] = it.second.get();
}
}
- for (const auto& it : non_static_streams) {
+ for (const auto& it : active_streams) {
if (!action(it.second)) {
return;
}
@@ -2514,7 +2576,9 @@
void QuicSession::PerformActionOnActiveStreams(
std::function<bool(QuicStream*)> action) const {
for (const auto& it : stream_map_) {
- if (!it.second->is_static() && !action(it.second.get())) {
+ if (!it.second->is_static() &&
+ (!remove_zombie_streams_ || !it.second->IsZombie()) &&
+ !action(it.second.get())) {
return;
}
}