Introduce QuicDatagramQueue::Observer Introduce an observer which is notified when a datagram in the associated queue is sent to the network or expires, in order to give the backpressure information to the user of the queue. Bug: https://crbug.com/114921 Protected by FLAGS_gfe2_reloadable_flag_report_frontline_vip. PiperOrigin-RevId: 343224979 Change-Id: I9ed2b22118b2aedeb0ec2ef194b92af777af3d15
diff --git a/quic/core/quic_datagram_queue.cc b/quic/core/quic_datagram_queue.cc index e10a919..08c349c 100644 --- a/quic/core/quic_datagram_queue.cc +++ b/quic/core/quic_datagram_queue.cc
@@ -17,7 +17,13 @@ constexpr float kMinPacingWindows = 4; QuicDatagramQueue::QuicDatagramQueue(QuicSession* session) - : session_(session), clock_(session->connection()->clock()) {} + : QuicDatagramQueue(session, nullptr) {} + +QuicDatagramQueue::QuicDatagramQueue(QuicSession* session, + std::unique_ptr<Observer> observer) + : session_(session), + clock_(session->connection()->clock()), + observer_(std::move(observer)) {} MessageStatus QuicDatagramQueue::SendOrQueueDatagram(QuicMemSlice datagram) { // If the queue is non-empty, always queue the daragram. This ensures that @@ -27,6 +33,9 @@ QuicMemSliceSpan span(&datagram); MessageResult result = session_->SendMessage(span); if (result.status != MESSAGE_STATUS_BLOCKED) { + if (observer_) { + observer_->OnDatagramProcessed(result.status); + } return result.status; } } @@ -46,6 +55,9 @@ MessageResult result = session_->SendMessage(span); if (result.status != MESSAGE_STATUS_BLOCKED) { queue_.pop_front(); + if (observer_) { + observer_->OnDatagramProcessed(result.status); + } } return result.status; } @@ -80,6 +92,9 @@ QuicTime now = clock_->ApproximateNow(); while (!queue_.empty() && queue_.front().expiry <= now) { queue_.pop_front(); + if (observer_) { + observer_->OnDatagramProcessed(absl::nullopt); + } } }
diff --git a/quic/core/quic_datagram_queue.h b/quic/core/quic_datagram_queue.h index 0952402..e8f2815 100644 --- a/quic/core/quic_datagram_queue.h +++ b/quic/core/quic_datagram_queue.h
@@ -5,6 +5,8 @@ #ifndef QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_ #define QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_ +#include <memory> + #include "absl/types/optional.h" #include "net/third_party/quiche/src/quic/core/quic_circular_deque.h" #include "net/third_party/quiche/src/quic/core/quic_time.h" @@ -20,9 +22,26 @@ // amount of time, and deleted after that time passes. class QUIC_EXPORT_PRIVATE QuicDatagramQueue { public: + // An interface used to monitor events on the associated `QuicDatagramQueue`. + class QUIC_EXPORT_PRIVATE Observer { + public: + virtual ~Observer() = default; + + // Called when a datagram in the associated queue is sent or discarded. + // Identity information for the datagram is not given, because the sending + // and discarding order is always first-in-first-out. + // This function is called synchronously in `QuicDatagramQueue` methods. + // `status` is nullopt when the datagram is dropped due to being in the + // queue for too long. + virtual void OnDatagramProcessed(absl::optional<MessageStatus> status) = 0; + }; + // |session| is not owned and must outlive this object. explicit QuicDatagramQueue(QuicSession* session); + // |session| is not owned and must outlive this object. + QuicDatagramQueue(QuicSession* session, std::unique_ptr<Observer> observer); + // Adds the datagram to the end of the queue. May send it immediately; if // not, MESSAGE_STATUS_BLOCKED is returned. MessageStatus SendOrQueueDatagram(QuicMemSlice datagram); @@ -62,6 +81,7 @@ QuicTime::Delta max_time_in_queue_ = QuicTime::Delta::Zero(); QuicCircularDeque<Datagram> queue_; + std::unique_ptr<Observer> observer_; }; } // namespace quic
diff --git a/quic/core/quic_datagram_queue_test.cc b/quic/core/quic_datagram_queue_test.cc index 0e6e3de..e437840 100644 --- a/quic/core/quic_datagram_queue_test.cc +++ b/quic/core/quic_datagram_queue_test.cc
@@ -4,6 +4,8 @@ #include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h" +#include <vector> + #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "net/third_party/quiche/src/quic/core/crypto/null_encrypter.h" @@ -11,6 +13,7 @@ #include "net/third_party/quiche/src/quic/core/quic_time.h" #include "net/third_party/quiche/src/quic/core/quic_types.h" #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_reference_counted.h" #include "net/third_party/quiche/src/quic/platform/api/quic_test.h" #include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h" @@ -30,20 +33,43 @@ bool encryption_established() const override { return true; } }; -class QuicDatagramQueueTest : public QuicTest { +class QuicDatagramQueueObserver final : public QuicDatagramQueue::Observer { public: - QuicDatagramQueueTest() + class Context : public QuicReferenceCounted { + public: + std::vector<absl::optional<MessageStatus>> statuses; + }; + + QuicDatagramQueueObserver() : context_(new Context()) {} + QuicDatagramQueueObserver(const QuicDatagramQueueObserver&) = delete; + QuicDatagramQueueObserver& operator=(const QuicDatagramQueueObserver&) = + delete; + + void OnDatagramProcessed(absl::optional<MessageStatus> status) override { + context_->statuses.push_back(std::move(status)); + } + + const QuicReferenceCountedPointer<Context>& context() { return context_; } + + private: + QuicReferenceCountedPointer<Context> context_; +}; + +class QuicDatagramQueueTestBase : public QuicTest { + protected: + QuicDatagramQueueTestBase() : connection_(new MockQuicConnection(&helper_, &alarm_factory_, Perspective::IS_CLIENT)), - session_(connection_), - queue_(&session_) { + session_(connection_) { session_.SetCryptoStream(new EstablishedCryptoStream(&session_)); connection_->SetEncrypter( ENCRYPTION_FORWARD_SECURE, std::make_unique<NullEncrypter>(connection_->perspective())); } + ~QuicDatagramQueueTestBase() = default; + QuicMemSlice CreateMemSlice(absl::string_view data) { QuicUniqueBufferPtr buffer = MakeUniqueBuffer(helper_.GetStreamSendBufferAllocator(), data.size()); @@ -51,11 +77,17 @@ return QuicMemSlice(std::move(buffer), data.size()); } - protected: MockQuicConnectionHelper helper_; MockAlarmFactory alarm_factory_; MockQuicConnection* connection_; // Owned by |session_|. MockQuicSession session_; +}; + +class QuicDatagramQueueTest : public QuicDatagramQueueTestBase { + public: + QuicDatagramQueueTest() : queue_(&session_) {} + + protected: QuicDatagramQueue queue_; }; @@ -167,6 +199,99 @@ EXPECT_EQ(0u, queue_.SendDatagrams()); } +class QuicDatagramQueueWithObserverTest : public QuicDatagramQueueTestBase { + public: + QuicDatagramQueueWithObserverTest() + : observer_(std::make_unique<QuicDatagramQueueObserver>()), + context_(observer_->context()), + queue_(&session_, std::move(observer_)) {} + + protected: + // This is moved out immediately. + std::unique_ptr<QuicDatagramQueueObserver> observer_; + + QuicReferenceCountedPointer<QuicDatagramQueueObserver::Context> context_; + QuicDatagramQueue queue_; +}; + +TEST_F(QuicDatagramQueueWithObserverTest, ObserveSuccessImmediately) { + EXPECT_TRUE(context_->statuses.empty()); + + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_SUCCESS)); + + EXPECT_EQ(MESSAGE_STATUS_SUCCESS, + queue_.SendOrQueueDatagram(CreateMemSlice("a"))); + + EXPECT_THAT(context_->statuses, ElementsAre(MESSAGE_STATUS_SUCCESS)); +} + +TEST_F(QuicDatagramQueueWithObserverTest, ObserveFailureImmediately) { + EXPECT_TRUE(context_->statuses.empty()); + + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_TOO_LARGE)); + + EXPECT_EQ(MESSAGE_STATUS_TOO_LARGE, + queue_.SendOrQueueDatagram(CreateMemSlice("a"))); + + EXPECT_THAT(context_->statuses, ElementsAre(MESSAGE_STATUS_TOO_LARGE)); +} + +TEST_F(QuicDatagramQueueWithObserverTest, BlockingShouldNotBeObserved) { + EXPECT_TRUE(context_->statuses.empty()); + + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillRepeatedly(Return(MESSAGE_STATUS_BLOCKED)); + + EXPECT_EQ(MESSAGE_STATUS_BLOCKED, + queue_.SendOrQueueDatagram(CreateMemSlice("a"))); + EXPECT_EQ(0u, queue_.SendDatagrams()); + + EXPECT_TRUE(context_->statuses.empty()); +} + +TEST_F(QuicDatagramQueueWithObserverTest, ObserveSuccessAfterBuffering) { + EXPECT_TRUE(context_->statuses.empty()); + + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_BLOCKED)); + + EXPECT_EQ(MESSAGE_STATUS_BLOCKED, + queue_.SendOrQueueDatagram(CreateMemSlice("a"))); + + EXPECT_TRUE(context_->statuses.empty()); + + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_SUCCESS)); + + EXPECT_EQ(1u, queue_.SendDatagrams()); + EXPECT_THAT(context_->statuses, ElementsAre(MESSAGE_STATUS_SUCCESS)); +} + +TEST_F(QuicDatagramQueueWithObserverTest, ObserveExpiry) { + constexpr QuicTime::Delta expiry = QuicTime::Delta::FromMilliseconds(100); + queue_.SetMaxTimeInQueue(expiry); + + EXPECT_TRUE(context_->statuses.empty()); + + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_BLOCKED)); + + EXPECT_EQ(MESSAGE_STATUS_BLOCKED, + queue_.SendOrQueueDatagram(CreateMemSlice("a"))); + + EXPECT_TRUE(context_->statuses.empty()); + + EXPECT_CALL(*connection_, SendMessage(_, _, _)).Times(0); + helper_.AdvanceTime(100 * expiry); + + EXPECT_TRUE(context_->statuses.empty()); + + EXPECT_EQ(0u, queue_.SendDatagrams()); + EXPECT_THAT(context_->statuses, ElementsAre(absl::nullopt)); +} + } // namespace } // namespace test } // namespace quic
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc index 3a06af3..91aca8f 100644 --- a/quic/core/quic_session.cc +++ b/quic/core/quic_session.cc
@@ -57,6 +57,20 @@ const QuicConfig& config, const ParsedQuicVersionVector& supported_versions, QuicStreamCount num_expected_unidirectional_static_streams) + : QuicSession(connection, + owner, + config, + supported_versions, + num_expected_unidirectional_static_streams, + nullptr) {} + +QuicSession::QuicSession( + QuicConnection* connection, + Visitor* owner, + const QuicConfig& config, + const ParsedQuicVersionVector& supported_versions, + QuicStreamCount num_expected_unidirectional_static_streams, + std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer) : connection_(connection), perspective_(connection->perspective()), visitor_(owner), @@ -94,7 +108,7 @@ transport_goaway_received_(false), control_frame_manager_(this), last_message_id_(0), - datagram_queue_(this), + datagram_queue_(this, std::move(datagram_observer)), closed_streams_clean_up_alarm_(nullptr), supported_versions_(supported_versions), use_http2_priority_write_scheduler_(false),
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h index e454892..a36040e 100644 --- a/quic/core/quic_session.h +++ b/quic/core/quic_session.h
@@ -88,6 +88,12 @@ const QuicConfig& config, const ParsedQuicVersionVector& supported_versions, QuicStreamCount num_expected_unidirectional_static_streams); + QuicSession(QuicConnection* connection, + Visitor* owner, + const QuicConfig& config, + const ParsedQuicVersionVector& supported_versions, + QuicStreamCount num_expected_unidirectional_static_streams, + std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer); QuicSession(const QuicSession&) = delete; QuicSession& operator=(const QuicSession&) = delete;
diff --git a/quic/quic_transport/quic_transport_client_session.cc b/quic/quic_transport/quic_transport_client_session.cc index f38d6b3..66f993a 100644 --- a/quic/quic_transport/quic_transport_client_session.cc +++ b/quic/quic_transport/quic_transport_client_session.cc
@@ -34,12 +34,14 @@ const GURL& url, QuicCryptoClientConfig* crypto_config, url::Origin origin, - ClientVisitor* visitor) + ClientVisitor* visitor, + std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer) : QuicSession(connection, owner, config, supported_versions, - /*num_expected_unidirectional_static_streams*/ 0), + /*num_expected_unidirectional_static_streams*/ 0, + std::move(datagram_observer)), url_(url), origin_(origin), visitor_(visitor) {
diff --git a/quic/quic_transport/quic_transport_client_session.h b/quic/quic_transport/quic_transport_client_session.h index c4f417f..b7687c0 100644 --- a/quic/quic_transport/quic_transport_client_session.h +++ b/quic/quic_transport/quic_transport_client_session.h
@@ -16,6 +16,7 @@ #include "net/third_party/quiche/src/quic/core/quic_connection.h" #include "net/third_party/quiche/src/quic/core/quic_crypto_client_stream.h" #include "net/third_party/quiche/src/quic/core/quic_crypto_stream.h" +#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h" #include "net/third_party/quiche/src/quic/core/quic_server_id.h" #include "net/third_party/quiche/src/quic/core/quic_session.h" #include "net/third_party/quiche/src/quic/core/quic_stream.h" @@ -56,14 +57,16 @@ virtual void OnCanCreateNewOutgoingUnidirectionalStream() = 0; }; - QuicTransportClientSession(QuicConnection* connection, - Visitor* owner, - const QuicConfig& config, - const ParsedQuicVersionVector& supported_versions, - const GURL& url, - QuicCryptoClientConfig* crypto_config, - url::Origin origin, - ClientVisitor* visitor); + QuicTransportClientSession( + QuicConnection* connection, + Visitor* owner, + const QuicConfig& config, + const ParsedQuicVersionVector& supported_versions, + const GURL& url, + QuicCryptoClientConfig* crypto_config, + url::Origin origin, + ClientVisitor* visitor, + std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer); std::vector<std::string> GetAlpnsToOffer() const override { return std::vector<std::string>({QuicTransportAlpn()});
diff --git a/quic/quic_transport/quic_transport_client_session_test.cc b/quic/quic_transport/quic_transport_client_session_test.cc index 129c87e..b311b52 100644 --- a/quic/quic_transport/quic_transport_client_session_test.cc +++ b/quic/quic_transport/quic_transport_client_session_test.cc
@@ -65,7 +65,7 @@ session_ = std::make_unique<QuicTransportClientSession>( &connection_, nullptr, DefaultQuicConfig(), GetVersions(), GURL("quic-transport://test.example.com:50000" + url_suffix), - &crypto_config_, origin, &visitor_); + &crypto_config_, origin, &visitor_, /*datagram_observer=*/nullptr); session_->Initialize(); crypto_stream_ = static_cast<QuicCryptoClientStream*>( session_->GetMutableCryptoStream());
diff --git a/quic/quic_transport/quic_transport_integration_test.cc b/quic/quic_transport/quic_transport_integration_test.cc index 1965c6d..99c20b2 100644 --- a/quic/quic_transport/quic_transport_integration_test.cc +++ b/quic/quic_transport/quic_transport_integration_test.cc
@@ -90,7 +90,8 @@ GURL("quic-transport://test.example.com:50000" + path), &crypto_config_, origin, - &visitor_) { + &visitor_, + /*datagram_observer=*/nullptr) { session_.Initialize(); }