Introduce QuicDatagramQueue::Observer
Introduce an observer which is notified when a datagram in the
associated queue is sent to the network or expires, in order to give
the backpressure information to the user of the queue.
Bug: https://crbug.com/114921
Protected by FLAGS_gfe2_reloadable_flag_report_frontline_vip.
PiperOrigin-RevId: 343224979
Change-Id: I9ed2b22118b2aedeb0ec2ef194b92af777af3d15
diff --git a/quic/core/quic_datagram_queue.cc b/quic/core/quic_datagram_queue.cc
index e10a919..08c349c 100644
--- a/quic/core/quic_datagram_queue.cc
+++ b/quic/core/quic_datagram_queue.cc
@@ -17,7 +17,13 @@
constexpr float kMinPacingWindows = 4;
QuicDatagramQueue::QuicDatagramQueue(QuicSession* session)
- : session_(session), clock_(session->connection()->clock()) {}
+ : QuicDatagramQueue(session, nullptr) {}
+
+QuicDatagramQueue::QuicDatagramQueue(QuicSession* session,
+ std::unique_ptr<Observer> observer)
+ : session_(session),
+ clock_(session->connection()->clock()),
+ observer_(std::move(observer)) {}
MessageStatus QuicDatagramQueue::SendOrQueueDatagram(QuicMemSlice datagram) {
// If the queue is non-empty, always queue the daragram. This ensures that
@@ -27,6 +33,9 @@
QuicMemSliceSpan span(&datagram);
MessageResult result = session_->SendMessage(span);
if (result.status != MESSAGE_STATUS_BLOCKED) {
+ if (observer_) {
+ observer_->OnDatagramProcessed(result.status);
+ }
return result.status;
}
}
@@ -46,6 +55,9 @@
MessageResult result = session_->SendMessage(span);
if (result.status != MESSAGE_STATUS_BLOCKED) {
queue_.pop_front();
+ if (observer_) {
+ observer_->OnDatagramProcessed(result.status);
+ }
}
return result.status;
}
@@ -80,6 +92,9 @@
QuicTime now = clock_->ApproximateNow();
while (!queue_.empty() && queue_.front().expiry <= now) {
queue_.pop_front();
+ if (observer_) {
+ observer_->OnDatagramProcessed(absl::nullopt);
+ }
}
}
diff --git a/quic/core/quic_datagram_queue.h b/quic/core/quic_datagram_queue.h
index 0952402..e8f2815 100644
--- a/quic/core/quic_datagram_queue.h
+++ b/quic/core/quic_datagram_queue.h
@@ -5,6 +5,8 @@
#ifndef QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
#define QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
+#include <memory>
+
#include "absl/types/optional.h"
#include "net/third_party/quiche/src/quic/core/quic_circular_deque.h"
#include "net/third_party/quiche/src/quic/core/quic_time.h"
@@ -20,9 +22,26 @@
// amount of time, and deleted after that time passes.
class QUIC_EXPORT_PRIVATE QuicDatagramQueue {
public:
+ // An interface used to monitor events on the associated `QuicDatagramQueue`.
+ class QUIC_EXPORT_PRIVATE Observer {
+ public:
+ virtual ~Observer() = default;
+
+ // Called when a datagram in the associated queue is sent or discarded.
+ // Identity information for the datagram is not given, because the sending
+ // and discarding order is always first-in-first-out.
+ // This function is called synchronously in `QuicDatagramQueue` methods.
+ // `status` is nullopt when the datagram is dropped due to being in the
+ // queue for too long.
+ virtual void OnDatagramProcessed(absl::optional<MessageStatus> status) = 0;
+ };
+
// |session| is not owned and must outlive this object.
explicit QuicDatagramQueue(QuicSession* session);
+ // |session| is not owned and must outlive this object.
+ QuicDatagramQueue(QuicSession* session, std::unique_ptr<Observer> observer);
+
// Adds the datagram to the end of the queue. May send it immediately; if
// not, MESSAGE_STATUS_BLOCKED is returned.
MessageStatus SendOrQueueDatagram(QuicMemSlice datagram);
@@ -62,6 +81,7 @@
QuicTime::Delta max_time_in_queue_ = QuicTime::Delta::Zero();
QuicCircularDeque<Datagram> queue_;
+ std::unique_ptr<Observer> observer_;
};
} // namespace quic
diff --git a/quic/core/quic_datagram_queue_test.cc b/quic/core/quic_datagram_queue_test.cc
index 0e6e3de..e437840 100644
--- a/quic/core/quic_datagram_queue_test.cc
+++ b/quic/core/quic_datagram_queue_test.cc
@@ -4,6 +4,8 @@
#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
+#include <vector>
+
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "net/third_party/quiche/src/quic/core/crypto/null_encrypter.h"
@@ -11,6 +13,7 @@
#include "net/third_party/quiche/src/quic/core/quic_time.h"
#include "net/third_party/quiche/src/quic/core/quic_types.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_reference_counted.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
#include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h"
@@ -30,20 +33,43 @@
bool encryption_established() const override { return true; }
};
-class QuicDatagramQueueTest : public QuicTest {
+class QuicDatagramQueueObserver final : public QuicDatagramQueue::Observer {
public:
- QuicDatagramQueueTest()
+ class Context : public QuicReferenceCounted {
+ public:
+ std::vector<absl::optional<MessageStatus>> statuses;
+ };
+
+ QuicDatagramQueueObserver() : context_(new Context()) {}
+ QuicDatagramQueueObserver(const QuicDatagramQueueObserver&) = delete;
+ QuicDatagramQueueObserver& operator=(const QuicDatagramQueueObserver&) =
+ delete;
+
+ void OnDatagramProcessed(absl::optional<MessageStatus> status) override {
+ context_->statuses.push_back(std::move(status));
+ }
+
+ const QuicReferenceCountedPointer<Context>& context() { return context_; }
+
+ private:
+ QuicReferenceCountedPointer<Context> context_;
+};
+
+class QuicDatagramQueueTestBase : public QuicTest {
+ protected:
+ QuicDatagramQueueTestBase()
: connection_(new MockQuicConnection(&helper_,
&alarm_factory_,
Perspective::IS_CLIENT)),
- session_(connection_),
- queue_(&session_) {
+ session_(connection_) {
session_.SetCryptoStream(new EstablishedCryptoStream(&session_));
connection_->SetEncrypter(
ENCRYPTION_FORWARD_SECURE,
std::make_unique<NullEncrypter>(connection_->perspective()));
}
+ ~QuicDatagramQueueTestBase() = default;
+
QuicMemSlice CreateMemSlice(absl::string_view data) {
QuicUniqueBufferPtr buffer =
MakeUniqueBuffer(helper_.GetStreamSendBufferAllocator(), data.size());
@@ -51,11 +77,17 @@
return QuicMemSlice(std::move(buffer), data.size());
}
- protected:
MockQuicConnectionHelper helper_;
MockAlarmFactory alarm_factory_;
MockQuicConnection* connection_; // Owned by |session_|.
MockQuicSession session_;
+};
+
+class QuicDatagramQueueTest : public QuicDatagramQueueTestBase {
+ public:
+ QuicDatagramQueueTest() : queue_(&session_) {}
+
+ protected:
QuicDatagramQueue queue_;
};
@@ -167,6 +199,99 @@
EXPECT_EQ(0u, queue_.SendDatagrams());
}
+class QuicDatagramQueueWithObserverTest : public QuicDatagramQueueTestBase {
+ public:
+ QuicDatagramQueueWithObserverTest()
+ : observer_(std::make_unique<QuicDatagramQueueObserver>()),
+ context_(observer_->context()),
+ queue_(&session_, std::move(observer_)) {}
+
+ protected:
+ // This is moved out immediately.
+ std::unique_ptr<QuicDatagramQueueObserver> observer_;
+
+ QuicReferenceCountedPointer<QuicDatagramQueueObserver::Context> context_;
+ QuicDatagramQueue queue_;
+};
+
+TEST_F(QuicDatagramQueueWithObserverTest, ObserveSuccessImmediately) {
+ EXPECT_TRUE(context_->statuses.empty());
+
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_SUCCESS));
+
+ EXPECT_EQ(MESSAGE_STATUS_SUCCESS,
+ queue_.SendOrQueueDatagram(CreateMemSlice("a")));
+
+ EXPECT_THAT(context_->statuses, ElementsAre(MESSAGE_STATUS_SUCCESS));
+}
+
+TEST_F(QuicDatagramQueueWithObserverTest, ObserveFailureImmediately) {
+ EXPECT_TRUE(context_->statuses.empty());
+
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_TOO_LARGE));
+
+ EXPECT_EQ(MESSAGE_STATUS_TOO_LARGE,
+ queue_.SendOrQueueDatagram(CreateMemSlice("a")));
+
+ EXPECT_THAT(context_->statuses, ElementsAre(MESSAGE_STATUS_TOO_LARGE));
+}
+
+TEST_F(QuicDatagramQueueWithObserverTest, BlockingShouldNotBeObserved) {
+ EXPECT_TRUE(context_->statuses.empty());
+
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillRepeatedly(Return(MESSAGE_STATUS_BLOCKED));
+
+ EXPECT_EQ(MESSAGE_STATUS_BLOCKED,
+ queue_.SendOrQueueDatagram(CreateMemSlice("a")));
+ EXPECT_EQ(0u, queue_.SendDatagrams());
+
+ EXPECT_TRUE(context_->statuses.empty());
+}
+
+TEST_F(QuicDatagramQueueWithObserverTest, ObserveSuccessAfterBuffering) {
+ EXPECT_TRUE(context_->statuses.empty());
+
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+
+ EXPECT_EQ(MESSAGE_STATUS_BLOCKED,
+ queue_.SendOrQueueDatagram(CreateMemSlice("a")));
+
+ EXPECT_TRUE(context_->statuses.empty());
+
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_SUCCESS));
+
+ EXPECT_EQ(1u, queue_.SendDatagrams());
+ EXPECT_THAT(context_->statuses, ElementsAre(MESSAGE_STATUS_SUCCESS));
+}
+
+TEST_F(QuicDatagramQueueWithObserverTest, ObserveExpiry) {
+ constexpr QuicTime::Delta expiry = QuicTime::Delta::FromMilliseconds(100);
+ queue_.SetMaxTimeInQueue(expiry);
+
+ EXPECT_TRUE(context_->statuses.empty());
+
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+
+ EXPECT_EQ(MESSAGE_STATUS_BLOCKED,
+ queue_.SendOrQueueDatagram(CreateMemSlice("a")));
+
+ EXPECT_TRUE(context_->statuses.empty());
+
+ EXPECT_CALL(*connection_, SendMessage(_, _, _)).Times(0);
+ helper_.AdvanceTime(100 * expiry);
+
+ EXPECT_TRUE(context_->statuses.empty());
+
+ EXPECT_EQ(0u, queue_.SendDatagrams());
+ EXPECT_THAT(context_->statuses, ElementsAre(absl::nullopt));
+}
+
} // namespace
} // namespace test
} // namespace quic
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 3a06af3..91aca8f 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -57,6 +57,20 @@
const QuicConfig& config,
const ParsedQuicVersionVector& supported_versions,
QuicStreamCount num_expected_unidirectional_static_streams)
+ : QuicSession(connection,
+ owner,
+ config,
+ supported_versions,
+ num_expected_unidirectional_static_streams,
+ nullptr) {}
+
+QuicSession::QuicSession(
+ QuicConnection* connection,
+ Visitor* owner,
+ const QuicConfig& config,
+ const ParsedQuicVersionVector& supported_versions,
+ QuicStreamCount num_expected_unidirectional_static_streams,
+ std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)
: connection_(connection),
perspective_(connection->perspective()),
visitor_(owner),
@@ -94,7 +108,7 @@
transport_goaway_received_(false),
control_frame_manager_(this),
last_message_id_(0),
- datagram_queue_(this),
+ datagram_queue_(this, std::move(datagram_observer)),
closed_streams_clean_up_alarm_(nullptr),
supported_versions_(supported_versions),
use_http2_priority_write_scheduler_(false),
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index e454892..a36040e 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -88,6 +88,12 @@
const QuicConfig& config,
const ParsedQuicVersionVector& supported_versions,
QuicStreamCount num_expected_unidirectional_static_streams);
+ QuicSession(QuicConnection* connection,
+ Visitor* owner,
+ const QuicConfig& config,
+ const ParsedQuicVersionVector& supported_versions,
+ QuicStreamCount num_expected_unidirectional_static_streams,
+ std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer);
QuicSession(const QuicSession&) = delete;
QuicSession& operator=(const QuicSession&) = delete;
diff --git a/quic/quic_transport/quic_transport_client_session.cc b/quic/quic_transport/quic_transport_client_session.cc
index f38d6b3..66f993a 100644
--- a/quic/quic_transport/quic_transport_client_session.cc
+++ b/quic/quic_transport/quic_transport_client_session.cc
@@ -34,12 +34,14 @@
const GURL& url,
QuicCryptoClientConfig* crypto_config,
url::Origin origin,
- ClientVisitor* visitor)
+ ClientVisitor* visitor,
+ std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)
: QuicSession(connection,
owner,
config,
supported_versions,
- /*num_expected_unidirectional_static_streams*/ 0),
+ /*num_expected_unidirectional_static_streams*/ 0,
+ std::move(datagram_observer)),
url_(url),
origin_(origin),
visitor_(visitor) {
diff --git a/quic/quic_transport/quic_transport_client_session.h b/quic/quic_transport/quic_transport_client_session.h
index c4f417f..b7687c0 100644
--- a/quic/quic_transport/quic_transport_client_session.h
+++ b/quic/quic_transport/quic_transport_client_session.h
@@ -16,6 +16,7 @@
#include "net/third_party/quiche/src/quic/core/quic_connection.h"
#include "net/third_party/quiche/src/quic/core/quic_crypto_client_stream.h"
#include "net/third_party/quiche/src/quic/core/quic_crypto_stream.h"
+#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
#include "net/third_party/quiche/src/quic/core/quic_server_id.h"
#include "net/third_party/quiche/src/quic/core/quic_session.h"
#include "net/third_party/quiche/src/quic/core/quic_stream.h"
@@ -56,14 +57,16 @@
virtual void OnCanCreateNewOutgoingUnidirectionalStream() = 0;
};
- QuicTransportClientSession(QuicConnection* connection,
- Visitor* owner,
- const QuicConfig& config,
- const ParsedQuicVersionVector& supported_versions,
- const GURL& url,
- QuicCryptoClientConfig* crypto_config,
- url::Origin origin,
- ClientVisitor* visitor);
+ QuicTransportClientSession(
+ QuicConnection* connection,
+ Visitor* owner,
+ const QuicConfig& config,
+ const ParsedQuicVersionVector& supported_versions,
+ const GURL& url,
+ QuicCryptoClientConfig* crypto_config,
+ url::Origin origin,
+ ClientVisitor* visitor,
+ std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer);
std::vector<std::string> GetAlpnsToOffer() const override {
return std::vector<std::string>({QuicTransportAlpn()});
diff --git a/quic/quic_transport/quic_transport_client_session_test.cc b/quic/quic_transport/quic_transport_client_session_test.cc
index 129c87e..b311b52 100644
--- a/quic/quic_transport/quic_transport_client_session_test.cc
+++ b/quic/quic_transport/quic_transport_client_session_test.cc
@@ -65,7 +65,7 @@
session_ = std::make_unique<QuicTransportClientSession>(
&connection_, nullptr, DefaultQuicConfig(), GetVersions(),
GURL("quic-transport://test.example.com:50000" + url_suffix),
- &crypto_config_, origin, &visitor_);
+ &crypto_config_, origin, &visitor_, /*datagram_observer=*/nullptr);
session_->Initialize();
crypto_stream_ = static_cast<QuicCryptoClientStream*>(
session_->GetMutableCryptoStream());
diff --git a/quic/quic_transport/quic_transport_integration_test.cc b/quic/quic_transport/quic_transport_integration_test.cc
index 1965c6d..99c20b2 100644
--- a/quic/quic_transport/quic_transport_integration_test.cc
+++ b/quic/quic_transport/quic_transport_integration_test.cc
@@ -90,7 +90,8 @@
GURL("quic-transport://test.example.com:50000" + path),
&crypto_config_,
origin,
- &visitor_) {
+ &visitor_,
+ /*datagram_observer=*/nullptr) {
session_.Initialize();
}