Turn QuicWriteBlockedList into an abstract interface. Also fix a typo in a method name. PiperOrigin-RevId: 517946673
diff --git a/quiche/quic/core/quic_session.cc b/quiche/quic/core/quic_session.cc index dde9f3d..a5664ac 100644 --- a/quiche/quic/core/quic_session.cc +++ b/quiche/quic/core/quic_session.cc
@@ -21,6 +21,7 @@ #include "quiche/quic/core/quic_types.h" #include "quiche/quic/core/quic_utils.h" #include "quiche/quic/core/quic_versions.h" +#include "quiche/quic/core/quic_write_blocked_list.h" #include "quiche/quic/platform/api/quic_bug_tracker.h" #include "quiche/quic/platform/api/quic_flag_utils.h" #include "quiche/quic/platform/api/quic_flags.h" @@ -74,6 +75,7 @@ : connection_(connection), perspective_(connection->perspective()), visitor_(owner), + write_blocked_streams_(std::make_unique<QuicWriteBlockedList>()), config_(config), stream_id_manager_(perspective(), connection->transport_version(), kDefaultMaxStreamsPerConnection, @@ -577,7 +579,7 @@ bool QuicSession::CheckStreamWriteBlocked(QuicStream* stream) const { if (!stream->write_side_closed() && stream->HasBufferedData() && !stream->IsFlowControlBlocked() && - !write_blocked_streams_.IsStreamBlocked(stream->id())) { + !write_blocked_streams_->IsStreamBlocked(stream->id())) { QUIC_DLOG(ERROR) << ENDPOINT << "stream " << stream->id() << " has buffered " << stream->BufferedDataBytes() << " bytes, and is not flow control blocked, " @@ -612,8 +614,8 @@ // crypto and headers streams to try writing as all other streams will be // blocked. size_t num_writes = flow_controller_.IsBlocked() - ? write_blocked_streams_.NumBlockedSpecialStreams() - : write_blocked_streams_.NumBlockedStreams(); + ? write_blocked_streams_->NumBlockedSpecialStreams() + : write_blocked_streams_->NumBlockedStreams(); if (num_writes == 0 && !control_frame_manager_.WillingToWrite() && datagram_queue_.empty() && (!QuicVersionUsesCryptoFrames(transport_version()) || @@ -656,8 +658,8 @@ } std::vector<QuicStreamId> last_writing_stream_ids; for (size_t i = 0; i < num_writes; ++i) { - if (!(write_blocked_streams_.HasWriteBlockedSpecialStream() || - write_blocked_streams_.HasWriteBlockedDataStreams())) { + if (!(write_blocked_streams_->HasWriteBlockedSpecialStream() || + write_blocked_streams_->HasWriteBlockedDataStreams())) { // Writing one stream removed another!? Something's broken. QUIC_BUG(quic_bug_10866_1) << "WriteBlockedStream is missing, num_writes: " << num_writes @@ -676,7 +678,7 @@ if (!CanWriteStreamData()) { return; } - currently_writing_stream_id_ = write_blocked_streams_.PopFront(); + currently_writing_stream_id_ = write_blocked_streams_->PopFront(); last_writing_stream_ids.push_back(currently_writing_stream_id_); QUIC_DVLOG(1) << ENDPOINT << "Removing stream " << currently_writing_stream_id_ << " from write-blocked list"; @@ -723,10 +725,10 @@ } // Crypto and headers streams are not blocked by connection level flow // control. - return write_blocked_streams_.HasWriteBlockedSpecialStream(); + return write_blocked_streams_->HasWriteBlockedSpecialStream(); } - return write_blocked_streams_.HasWriteBlockedSpecialStream() || - write_blocked_streams_.HasWriteBlockedDataStreams(); + return write_blocked_streams_->HasWriteBlockedSpecialStream() || + write_blocked_streams_->HasWriteBlockedDataStreams(); } std::string QuicSession::GetStreamsInfoForLogging() const { @@ -764,7 +766,7 @@ } return streams_with_pending_retransmission_.contains( QuicUtils::GetCryptoStreamId(transport_version())) || - write_blocked_streams_.IsStreamBlocked( + write_blocked_streams_->IsStreamBlocked( QuicUtils::GetCryptoStreamId(transport_version())); } @@ -829,7 +831,7 @@ connection_->SendStreamData(id, write_length, offset, state); if (type == NOT_RETRANSMISSION) { // This is new stream data. - write_blocked_streams_.UpdateBytesForStream(id, data.bytes_consumed); + write_blocked_streams_->UpdateBytesForStream(id, data.bytes_consumed); } return data; @@ -2128,12 +2130,12 @@ QUIC_DVLOG(1) << ENDPOINT << "Adding stream " << id << " to write-blocked list"; - write_blocked_streams_.AddStream(id); + write_blocked_streams_->AddStream(id); } bool QuicSession::HasDataToWrite() const { - return write_blocked_streams_.HasWriteBlockedSpecialStream() || - write_blocked_streams_.HasWriteBlockedDataStreams() || + return write_blocked_streams_->HasWriteBlockedSpecialStream() || + write_blocked_streams_->HasWriteBlockedDataStreams() || connection_->HasQueuedData() || !streams_with_pending_retransmission_.empty() || control_frame_manager_.WillingToWrite();
diff --git a/quiche/quic/core/quic_session.h b/quiche/quic/core/quic_session.h index e811ed4..e12bb8f 100644 --- a/quiche/quic/core/quic_session.h +++ b/quiche/quic/core/quic_session.h
@@ -723,7 +723,7 @@ virtual bool ShouldProcessPendingStreamImmediately() const { return true; } spdy::SpdyPriority GetSpdyPriorityofStream(QuicStreamId stream_id) const { - return write_blocked_streams_.GetPriorityofStream(stream_id).urgency; + return write_blocked_streams_->GetPriorityOfStream(stream_id).urgency; } size_t pending_streams_size() const { return pending_stream_map_.size(); } @@ -733,8 +733,8 @@ void set_largest_peer_created_stream_id( QuicStreamId largest_peer_created_stream_id); - QuicWriteBlockedList* write_blocked_streams() { - return &write_blocked_streams_; + QuicWriteBlockedListInterface* write_blocked_streams() { + return write_blocked_streams_.get(); } // Returns true if the stream is still active. @@ -929,7 +929,7 @@ // A list of streams which need to write more data. Stream register // themselves in their constructor, and unregisterm themselves in their // destructors, so the write blocked list must outlive all streams. - QuicWriteBlockedList write_blocked_streams_; + std::unique_ptr<QuicWriteBlockedList> write_blocked_streams_; ClosedStreams closed_streams_;
diff --git a/quiche/quic/core/quic_stream_test.cc b/quiche/quic/core/quic_stream_test.cc index a0837be..7afbb19 100644 --- a/quiche/quic/core/quic_stream_test.cc +++ b/quiche/quic/core/quic_stream_test.cc
@@ -160,7 +160,7 @@ MockQuicConnection* connection_; std::unique_ptr<MockQuicSession> session_; StrictMock<TestStream>* stream_; - QuicWriteBlockedList* write_blocked_list_; + QuicWriteBlockedListInterface* write_blocked_list_; QuicTime::Delta zero_; ParsedQuicVersionVector supported_versions_; QuicStreamId kTestStreamId = GetNthClientInitiatedBidirectionalStreamId(
diff --git a/quiche/quic/core/quic_write_blocked_list.h b/quiche/quic/core/quic_write_blocked_list.h index ba1fd18..f6726dc 100644 --- a/quiche/quic/core/quic_write_blocked_list.h +++ b/quiche/quic/core/quic_write_blocked_list.h
@@ -24,67 +24,114 @@ // Static streams come first, in the order they were registered with // QuicWriteBlockedList. They are followed by non-static streams, ordered by // priority. -class QUIC_EXPORT_PRIVATE QuicWriteBlockedList { +class QUICHE_EXPORT QuicWriteBlockedListInterface { public: - explicit QuicWriteBlockedList(); - QuicWriteBlockedList(const QuicWriteBlockedList&) = delete; - QuicWriteBlockedList& operator=(const QuicWriteBlockedList&) = delete; - ~QuicWriteBlockedList() = default; + virtual ~QuicWriteBlockedListInterface() = default; - bool HasWriteBlockedDataStreams() const { - return priority_write_scheduler_.HasReadyStreams(); - } - + virtual bool HasWriteBlockedDataStreams() const = 0; + virtual size_t NumBlockedSpecialStreams() const = 0; + virtual size_t NumBlockedStreams() const = 0; bool HasWriteBlockedSpecialStream() const { - return static_stream_collection_.num_blocked() > 0; + return NumBlockedSpecialStreams() > 0; } - size_t NumBlockedSpecialStreams() const { - return static_stream_collection_.num_blocked(); - } + // Returns true if there is another stream with higher priority in the queue. + virtual bool ShouldYield(QuicStreamId id) const = 0; - size_t NumBlockedStreams() const { - return NumBlockedSpecialStreams() + - priority_write_scheduler_.NumReadyStreams(); - } - - bool ShouldYield(QuicStreamId id) const; - - QuicStreamPriority GetPriorityofStream(QuicStreamId id) const { - return priority_write_scheduler_.GetStreamPriority(id); - } + // Returns the priority of the specified stream. + virtual QuicStreamPriority GetPriorityOfStream(QuicStreamId id) const = 0; // Pops the highest priority stream, special casing static streams. Latches // the most recently popped data stream for batch writing purposes. - QuicStreamId PopFront(); + virtual QuicStreamId PopFront() = 0; // Register a stream with given priority. // `priority` is ignored for static streams. - void RegisterStream(QuicStreamId stream_id, bool is_static_stream, - const QuicStreamPriority& priority); + virtual void RegisterStream(QuicStreamId stream_id, bool is_static_stream, + const QuicStreamPriority& priority) = 0; // Unregister a stream. `stream_id` must be registered, either as a static // stream or as a non-static stream. - void UnregisterStream(QuicStreamId stream_id); + virtual void UnregisterStream(QuicStreamId stream_id) = 0; // Updates the stored priority of a stream. Must not be called for static // streams. - void UpdateStreamPriority(QuicStreamId stream_id, - const QuicStreamPriority& new_priority); + virtual void UpdateStreamPriority(QuicStreamId stream_id, + const QuicStreamPriority& new_priority) = 0; // TODO(b/147306124): Remove when deprecating // reloadable_flag_quic_disable_batch_write. - void UpdateBytesForStream(QuicStreamId stream_id, size_t bytes); + virtual void UpdateBytesForStream(QuicStreamId stream_id, size_t bytes) = 0; // Pushes a stream to the back of the list for its priority level *unless* it // is latched for doing batched writes in which case it goes to the front of // the list for its priority level. // Static streams are special cased to always resume first. // Stream must already be registered. - void AddStream(QuicStreamId stream_id); + virtual void AddStream(QuicStreamId stream_id) = 0; // Returns true if stream with |stream_id| is write blocked. - bool IsStreamBlocked(QuicStreamId stream_id) const; + virtual bool IsStreamBlocked(QuicStreamId stream_id) const = 0; +}; + +// Default implementation of QuicWriteBlockedListInterface. +class QUIC_EXPORT_PRIVATE QuicWriteBlockedList + : public QuicWriteBlockedListInterface { + public: + explicit QuicWriteBlockedList(); + QuicWriteBlockedList(const QuicWriteBlockedList&) = delete; + QuicWriteBlockedList& operator=(const QuicWriteBlockedList&) = delete; + + bool HasWriteBlockedDataStreams() const override { + return priority_write_scheduler_.HasReadyStreams(); + } + + size_t NumBlockedSpecialStreams() const override { + return static_stream_collection_.num_blocked(); + } + + size_t NumBlockedStreams() const override { + return NumBlockedSpecialStreams() + + priority_write_scheduler_.NumReadyStreams(); + } + + bool ShouldYield(QuicStreamId id) const override; + + QuicStreamPriority GetPriorityOfStream(QuicStreamId id) const override { + return priority_write_scheduler_.GetStreamPriority(id); + } + + // Pops the highest priority stream, special casing static streams. Latches + // the most recently popped data stream for batch writing purposes. + QuicStreamId PopFront() override; + + // Register a stream with given priority. + // `priority` is ignored for static streams. + void RegisterStream(QuicStreamId stream_id, bool is_static_stream, + const QuicStreamPriority& priority) override; + + // Unregister a stream. `stream_id` must be registered, either as a static + // stream or as a non-static stream. + void UnregisterStream(QuicStreamId stream_id) override; + + // Updates the stored priority of a stream. Must not be called for static + // streams. + void UpdateStreamPriority(QuicStreamId stream_id, + const QuicStreamPriority& new_priority) override; + + // TODO(b/147306124): Remove when deprecating + // reloadable_flag_quic_disable_batch_write. + void UpdateBytesForStream(QuicStreamId stream_id, size_t bytes) override; + + // Pushes a stream to the back of the list for its priority level *unless* it + // is latched for doing batched writes in which case it goes to the front of + // the list for its priority level. + // Static streams are special cased to always resume first. + // Stream must already be registered. + void AddStream(QuicStreamId stream_id) override; + + // Returns true if stream with |stream_id| is write blocked. + bool IsStreamBlocked(QuicStreamId stream_id) const override; private: http2::PriorityWriteScheduler<QuicStreamId, QuicStreamPriority,
diff --git a/quiche/quic/core/quic_write_blocked_list_test.cc b/quiche/quic/core/quic_write_blocked_list_test.cc index 812e936..4272358 100644 --- a/quiche/quic/core/quic_write_blocked_list_test.cc +++ b/quiche/quic/core/quic_write_blocked_list_test.cc
@@ -50,8 +50,8 @@ return write_blocked_list_->ShouldYield(id); } - QuicStreamPriority GetPriorityofStream(QuicStreamId id) const { - return write_blocked_list_->GetPriorityofStream(id); + QuicStreamPriority GetPriorityOfStream(QuicStreamId id) const { + return write_blocked_list_->GetPriorityOfStream(id); } QuicStreamId PopFront() { return write_blocked_list_->PopFront(); } @@ -95,14 +95,14 @@ RegisterStream(1, kStatic, {kV3HighestPriority, kNotIncremental}); RegisterStream(3, kStatic, {kV3HighestPriority, kNotIncremental}); - EXPECT_EQ(kV3LowestPriority, GetPriorityofStream(40).urgency); - EXPECT_EQ(kNotIncremental, GetPriorityofStream(40).incremental); + EXPECT_EQ(kV3LowestPriority, GetPriorityOfStream(40).urgency); + EXPECT_EQ(kNotIncremental, GetPriorityOfStream(40).incremental); - EXPECT_EQ(kV3HighestPriority, GetPriorityofStream(23).urgency); - EXPECT_EQ(kIncremental, GetPriorityofStream(23).incremental); + EXPECT_EQ(kV3HighestPriority, GetPriorityOfStream(23).urgency); + EXPECT_EQ(kIncremental, GetPriorityOfStream(23).incremental); - EXPECT_EQ(kV3HighestPriority, GetPriorityofStream(17).urgency); - EXPECT_EQ(kNotIncremental, GetPriorityofStream(17).incremental); + EXPECT_EQ(kV3HighestPriority, GetPriorityOfStream(17).urgency); + EXPECT_EQ(kNotIncremental, GetPriorityOfStream(17).incremental); AddStream(40); EXPECT_TRUE(IsStreamBlocked(40)); @@ -577,27 +577,27 @@ RegisterStream(1, kStatic, {2, kNotIncremental}); RegisterStream(3, kStatic, {kV3HighestPriority, kNotIncremental}); - EXPECT_EQ(kV3LowestPriority, GetPriorityofStream(40).urgency); - EXPECT_EQ(kNotIncremental, GetPriorityofStream(40).incremental); + EXPECT_EQ(kV3LowestPriority, GetPriorityOfStream(40).urgency); + EXPECT_EQ(kNotIncremental, GetPriorityOfStream(40).incremental); - EXPECT_EQ(6, GetPriorityofStream(23).urgency); - EXPECT_EQ(kIncremental, GetPriorityofStream(23).incremental); + EXPECT_EQ(6, GetPriorityOfStream(23).urgency); + EXPECT_EQ(kIncremental, GetPriorityOfStream(23).incremental); - EXPECT_EQ(kV3HighestPriority, GetPriorityofStream(17).urgency); - EXPECT_EQ(kNotIncremental, GetPriorityofStream(17).incremental); + EXPECT_EQ(kV3HighestPriority, GetPriorityOfStream(17).urgency); + EXPECT_EQ(kNotIncremental, GetPriorityOfStream(17).incremental); UpdateStreamPriority(40, {3, kIncremental}); UpdateStreamPriority(23, {kV3HighestPriority, kNotIncremental}); UpdateStreamPriority(17, {5, kNotIncremental}); - EXPECT_EQ(3, GetPriorityofStream(40).urgency); - EXPECT_EQ(kIncremental, GetPriorityofStream(40).incremental); + EXPECT_EQ(3, GetPriorityOfStream(40).urgency); + EXPECT_EQ(kIncremental, GetPriorityOfStream(40).incremental); - EXPECT_EQ(kV3HighestPriority, GetPriorityofStream(23).urgency); - EXPECT_EQ(kNotIncremental, GetPriorityofStream(23).incremental); + EXPECT_EQ(kV3HighestPriority, GetPriorityOfStream(23).urgency); + EXPECT_EQ(kNotIncremental, GetPriorityOfStream(23).incremental); - EXPECT_EQ(5, GetPriorityofStream(17).urgency); - EXPECT_EQ(kNotIncremental, GetPriorityofStream(17).incremental); + EXPECT_EQ(5, GetPriorityOfStream(17).urgency); + EXPECT_EQ(kNotIncremental, GetPriorityOfStream(17).incremental); AddStream(40); AddStream(23); @@ -637,13 +637,13 @@ RegisterStream(3, kNotStatic, {6, kNotIncremental}); RegisterStream(4, kNotStatic, {6, kNotIncremental}); - EXPECT_EQ(6, GetPriorityofStream(3).urgency); - EXPECT_EQ(kNotIncremental, GetPriorityofStream(3).incremental); + EXPECT_EQ(6, GetPriorityOfStream(3).urgency); + EXPECT_EQ(kNotIncremental, GetPriorityOfStream(3).incremental); UpdateStreamPriority(3, {6, kIncremental}); - EXPECT_EQ(6, GetPriorityofStream(3).urgency); - EXPECT_EQ(kIncremental, GetPriorityofStream(3).incremental); + EXPECT_EQ(6, GetPriorityOfStream(3).urgency); + EXPECT_EQ(kIncremental, GetPriorityOfStream(3).incremental); AddStream(3); AddStream(4); @@ -656,13 +656,13 @@ RegisterStream(5, kNotStatic, {6, kIncremental}); RegisterStream(6, kNotStatic, {6, kIncremental}); - EXPECT_EQ(6, GetPriorityofStream(6).urgency); - EXPECT_EQ(kIncremental, GetPriorityofStream(6).incremental); + EXPECT_EQ(6, GetPriorityOfStream(6).urgency); + EXPECT_EQ(kIncremental, GetPriorityOfStream(6).incremental); UpdateStreamPriority(6, {6, kNotIncremental}); - EXPECT_EQ(6, GetPriorityofStream(6).urgency); - EXPECT_EQ(kNotIncremental, GetPriorityofStream(6).incremental); + EXPECT_EQ(6, GetPriorityOfStream(6).urgency); + EXPECT_EQ(kNotIncremental, GetPriorityOfStream(6).incremental); AddStream(5); AddStream(6);
diff --git a/quiche/quic/test_tools/quic_session_peer.cc b/quiche/quic/test_tools/quic_session_peer.cc index ed12e8f..3986a0e 100644 --- a/quiche/quic/test_tools/quic_session_peer.cc +++ b/quiche/quic/test_tools/quic_session_peer.cc
@@ -106,9 +106,9 @@ } // static -QuicWriteBlockedList* QuicSessionPeer::GetWriteBlockedStreams( +QuicWriteBlockedListInterface* QuicSessionPeer::GetWriteBlockedStreams( QuicSession* session) { - return &session->write_blocked_streams_; + return session->write_blocked_streams(); } // static @@ -171,7 +171,7 @@ // static bool QuicSessionPeer::IsStreamWriteBlocked(QuicSession* session, QuicStreamId id) { - return session->write_blocked_streams_.IsStreamBlocked(id); + return session->write_blocked_streams()->IsStreamBlocked(id); } // static
diff --git a/quiche/quic/test_tools/quic_session_peer.h b/quiche/quic/test_tools/quic_session_peer.h index 041d921..f0e83c9 100644 --- a/quiche/quic/test_tools/quic_session_peer.h +++ b/quiche/quic/test_tools/quic_session_peer.h
@@ -51,7 +51,8 @@ uint32_t max_streams); static QuicCryptoStream* GetMutableCryptoStream(QuicSession* session); - static QuicWriteBlockedList* GetWriteBlockedStreams(QuicSession* session); + static QuicWriteBlockedListInterface* GetWriteBlockedStreams( + QuicSession* session); static QuicStream* GetOrCreateStream(QuicSession* session, QuicStreamId stream_id); static absl::flat_hash_map<QuicStreamId, QuicStreamOffset>&