Introduce QuicDatagramQueue::Observer

Introduce an observer which is notified when a datagram in the
associated queue is sent to the network or expires, in order to give
the backpressure information to the user of the queue.

Bug: https://crbug.com/114921

Protected by FLAGS_gfe2_reloadable_flag_report_frontline_vip.

PiperOrigin-RevId: 343224979
Change-Id: I9ed2b22118b2aedeb0ec2ef194b92af777af3d15
diff --git a/quic/core/quic_datagram_queue.cc b/quic/core/quic_datagram_queue.cc
index e10a919..08c349c 100644
--- a/quic/core/quic_datagram_queue.cc
+++ b/quic/core/quic_datagram_queue.cc
@@ -17,7 +17,13 @@
 constexpr float kMinPacingWindows = 4;
 
 QuicDatagramQueue::QuicDatagramQueue(QuicSession* session)
-    : session_(session), clock_(session->connection()->clock()) {}
+    : QuicDatagramQueue(session, nullptr) {}
+
+QuicDatagramQueue::QuicDatagramQueue(QuicSession* session,
+                                     std::unique_ptr<Observer> observer)
+    : session_(session),
+      clock_(session->connection()->clock()),
+      observer_(std::move(observer)) {}
 
 MessageStatus QuicDatagramQueue::SendOrQueueDatagram(QuicMemSlice datagram) {
   // If the queue is non-empty, always queue the daragram.  This ensures that
@@ -27,6 +33,9 @@
     QuicMemSliceSpan span(&datagram);
     MessageResult result = session_->SendMessage(span);
     if (result.status != MESSAGE_STATUS_BLOCKED) {
+      if (observer_) {
+        observer_->OnDatagramProcessed(result.status);
+      }
       return result.status;
     }
   }
@@ -46,6 +55,9 @@
   MessageResult result = session_->SendMessage(span);
   if (result.status != MESSAGE_STATUS_BLOCKED) {
     queue_.pop_front();
+    if (observer_) {
+      observer_->OnDatagramProcessed(result.status);
+    }
   }
   return result.status;
 }
@@ -80,6 +92,9 @@
   QuicTime now = clock_->ApproximateNow();
   while (!queue_.empty() && queue_.front().expiry <= now) {
     queue_.pop_front();
+    if (observer_) {
+      observer_->OnDatagramProcessed(absl::nullopt);
+    }
   }
 }
 
diff --git a/quic/core/quic_datagram_queue.h b/quic/core/quic_datagram_queue.h
index 0952402..e8f2815 100644
--- a/quic/core/quic_datagram_queue.h
+++ b/quic/core/quic_datagram_queue.h
@@ -5,6 +5,8 @@
 #ifndef QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
 #define QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
 
+#include <memory>
+
 #include "absl/types/optional.h"
 #include "net/third_party/quiche/src/quic/core/quic_circular_deque.h"
 #include "net/third_party/quiche/src/quic/core/quic_time.h"
@@ -20,9 +22,26 @@
 // amount of time, and deleted after that time passes.
 class QUIC_EXPORT_PRIVATE QuicDatagramQueue {
  public:
+  // An interface used to monitor events on the associated `QuicDatagramQueue`.
+  class QUIC_EXPORT_PRIVATE Observer {
+   public:
+    virtual ~Observer() = default;
+
+    // Called when a datagram in the associated queue is sent or discarded.
+    // Identity information for the datagram is not given, because the sending
+    // and discarding order is always first-in-first-out.
+    // This function is called synchronously in `QuicDatagramQueue` methods.
+    // `status` is nullopt when the datagram is dropped due to being in the
+    // queue for too long.
+    virtual void OnDatagramProcessed(absl::optional<MessageStatus> status) = 0;
+  };
+
   // |session| is not owned and must outlive this object.
   explicit QuicDatagramQueue(QuicSession* session);
 
+  // |session| is not owned and must outlive this object.
+  QuicDatagramQueue(QuicSession* session, std::unique_ptr<Observer> observer);
+
   // Adds the datagram to the end of the queue.  May send it immediately; if
   // not, MESSAGE_STATUS_BLOCKED is returned.
   MessageStatus SendOrQueueDatagram(QuicMemSlice datagram);
@@ -62,6 +81,7 @@
 
   QuicTime::Delta max_time_in_queue_ = QuicTime::Delta::Zero();
   QuicCircularDeque<Datagram> queue_;
+  std::unique_ptr<Observer> observer_;
 };
 
 }  // namespace quic
diff --git a/quic/core/quic_datagram_queue_test.cc b/quic/core/quic_datagram_queue_test.cc
index 0e6e3de..e437840 100644
--- a/quic/core/quic_datagram_queue_test.cc
+++ b/quic/core/quic_datagram_queue_test.cc
@@ -4,6 +4,8 @@
 
 #include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
 
+#include <vector>
+
 #include "absl/strings/string_view.h"
 #include "absl/types/optional.h"
 #include "net/third_party/quiche/src/quic/core/crypto/null_encrypter.h"
@@ -11,6 +13,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_time.h"
 #include "net/third_party/quiche/src/quic/core/quic_types.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_reference_counted.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
 #include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h"
 
@@ -30,20 +33,43 @@
   bool encryption_established() const override { return true; }
 };
 
-class QuicDatagramQueueTest : public QuicTest {
+class QuicDatagramQueueObserver final : public QuicDatagramQueue::Observer {
  public:
-  QuicDatagramQueueTest()
+  class Context : public QuicReferenceCounted {
+   public:
+    std::vector<absl::optional<MessageStatus>> statuses;
+  };
+
+  QuicDatagramQueueObserver() : context_(new Context()) {}
+  QuicDatagramQueueObserver(const QuicDatagramQueueObserver&) = delete;
+  QuicDatagramQueueObserver& operator=(const QuicDatagramQueueObserver&) =
+      delete;
+
+  void OnDatagramProcessed(absl::optional<MessageStatus> status) override {
+    context_->statuses.push_back(std::move(status));
+  }
+
+  const QuicReferenceCountedPointer<Context>& context() { return context_; }
+
+ private:
+  QuicReferenceCountedPointer<Context> context_;
+};
+
+class QuicDatagramQueueTestBase : public QuicTest {
+ protected:
+  QuicDatagramQueueTestBase()
       : connection_(new MockQuicConnection(&helper_,
                                            &alarm_factory_,
                                            Perspective::IS_CLIENT)),
-        session_(connection_),
-        queue_(&session_) {
+        session_(connection_) {
     session_.SetCryptoStream(new EstablishedCryptoStream(&session_));
     connection_->SetEncrypter(
         ENCRYPTION_FORWARD_SECURE,
         std::make_unique<NullEncrypter>(connection_->perspective()));
   }
 
+  ~QuicDatagramQueueTestBase() = default;
+
   QuicMemSlice CreateMemSlice(absl::string_view data) {
     QuicUniqueBufferPtr buffer =
         MakeUniqueBuffer(helper_.GetStreamSendBufferAllocator(), data.size());
@@ -51,11 +77,17 @@
     return QuicMemSlice(std::move(buffer), data.size());
   }
 
- protected:
   MockQuicConnectionHelper helper_;
   MockAlarmFactory alarm_factory_;
   MockQuicConnection* connection_;  // Owned by |session_|.
   MockQuicSession session_;
+};
+
+class QuicDatagramQueueTest : public QuicDatagramQueueTestBase {
+ public:
+  QuicDatagramQueueTest() : queue_(&session_) {}
+
+ protected:
   QuicDatagramQueue queue_;
 };
 
@@ -167,6 +199,99 @@
   EXPECT_EQ(0u, queue_.SendDatagrams());
 }
 
+class QuicDatagramQueueWithObserverTest : public QuicDatagramQueueTestBase {
+ public:
+  QuicDatagramQueueWithObserverTest()
+      : observer_(std::make_unique<QuicDatagramQueueObserver>()),
+        context_(observer_->context()),
+        queue_(&session_, std::move(observer_)) {}
+
+ protected:
+  // This is moved out immediately.
+  std::unique_ptr<QuicDatagramQueueObserver> observer_;
+
+  QuicReferenceCountedPointer<QuicDatagramQueueObserver::Context> context_;
+  QuicDatagramQueue queue_;
+};
+
+TEST_F(QuicDatagramQueueWithObserverTest, ObserveSuccessImmediately) {
+  EXPECT_TRUE(context_->statuses.empty());
+
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_SUCCESS));
+
+  EXPECT_EQ(MESSAGE_STATUS_SUCCESS,
+            queue_.SendOrQueueDatagram(CreateMemSlice("a")));
+
+  EXPECT_THAT(context_->statuses, ElementsAre(MESSAGE_STATUS_SUCCESS));
+}
+
+TEST_F(QuicDatagramQueueWithObserverTest, ObserveFailureImmediately) {
+  EXPECT_TRUE(context_->statuses.empty());
+
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_TOO_LARGE));
+
+  EXPECT_EQ(MESSAGE_STATUS_TOO_LARGE,
+            queue_.SendOrQueueDatagram(CreateMemSlice("a")));
+
+  EXPECT_THAT(context_->statuses, ElementsAre(MESSAGE_STATUS_TOO_LARGE));
+}
+
+TEST_F(QuicDatagramQueueWithObserverTest, BlockingShouldNotBeObserved) {
+  EXPECT_TRUE(context_->statuses.empty());
+
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillRepeatedly(Return(MESSAGE_STATUS_BLOCKED));
+
+  EXPECT_EQ(MESSAGE_STATUS_BLOCKED,
+            queue_.SendOrQueueDatagram(CreateMemSlice("a")));
+  EXPECT_EQ(0u, queue_.SendDatagrams());
+
+  EXPECT_TRUE(context_->statuses.empty());
+}
+
+TEST_F(QuicDatagramQueueWithObserverTest, ObserveSuccessAfterBuffering) {
+  EXPECT_TRUE(context_->statuses.empty());
+
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+
+  EXPECT_EQ(MESSAGE_STATUS_BLOCKED,
+            queue_.SendOrQueueDatagram(CreateMemSlice("a")));
+
+  EXPECT_TRUE(context_->statuses.empty());
+
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_SUCCESS));
+
+  EXPECT_EQ(1u, queue_.SendDatagrams());
+  EXPECT_THAT(context_->statuses, ElementsAre(MESSAGE_STATUS_SUCCESS));
+}
+
+TEST_F(QuicDatagramQueueWithObserverTest, ObserveExpiry) {
+  constexpr QuicTime::Delta expiry = QuicTime::Delta::FromMilliseconds(100);
+  queue_.SetMaxTimeInQueue(expiry);
+
+  EXPECT_TRUE(context_->statuses.empty());
+
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+
+  EXPECT_EQ(MESSAGE_STATUS_BLOCKED,
+            queue_.SendOrQueueDatagram(CreateMemSlice("a")));
+
+  EXPECT_TRUE(context_->statuses.empty());
+
+  EXPECT_CALL(*connection_, SendMessage(_, _, _)).Times(0);
+  helper_.AdvanceTime(100 * expiry);
+
+  EXPECT_TRUE(context_->statuses.empty());
+
+  EXPECT_EQ(0u, queue_.SendDatagrams());
+  EXPECT_THAT(context_->statuses, ElementsAre(absl::nullopt));
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 3a06af3..91aca8f 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -57,6 +57,20 @@
     const QuicConfig& config,
     const ParsedQuicVersionVector& supported_versions,
     QuicStreamCount num_expected_unidirectional_static_streams)
+    : QuicSession(connection,
+                  owner,
+                  config,
+                  supported_versions,
+                  num_expected_unidirectional_static_streams,
+                  nullptr) {}
+
+QuicSession::QuicSession(
+    QuicConnection* connection,
+    Visitor* owner,
+    const QuicConfig& config,
+    const ParsedQuicVersionVector& supported_versions,
+    QuicStreamCount num_expected_unidirectional_static_streams,
+    std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)
     : connection_(connection),
       perspective_(connection->perspective()),
       visitor_(owner),
@@ -94,7 +108,7 @@
       transport_goaway_received_(false),
       control_frame_manager_(this),
       last_message_id_(0),
-      datagram_queue_(this),
+      datagram_queue_(this, std::move(datagram_observer)),
       closed_streams_clean_up_alarm_(nullptr),
       supported_versions_(supported_versions),
       use_http2_priority_write_scheduler_(false),
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index e454892..a36040e 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -88,6 +88,12 @@
               const QuicConfig& config,
               const ParsedQuicVersionVector& supported_versions,
               QuicStreamCount num_expected_unidirectional_static_streams);
+  QuicSession(QuicConnection* connection,
+              Visitor* owner,
+              const QuicConfig& config,
+              const ParsedQuicVersionVector& supported_versions,
+              QuicStreamCount num_expected_unidirectional_static_streams,
+              std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer);
   QuicSession(const QuicSession&) = delete;
   QuicSession& operator=(const QuicSession&) = delete;
 
diff --git a/quic/quic_transport/quic_transport_client_session.cc b/quic/quic_transport/quic_transport_client_session.cc
index f38d6b3..66f993a 100644
--- a/quic/quic_transport/quic_transport_client_session.cc
+++ b/quic/quic_transport/quic_transport_client_session.cc
@@ -34,12 +34,14 @@
     const GURL& url,
     QuicCryptoClientConfig* crypto_config,
     url::Origin origin,
-    ClientVisitor* visitor)
+    ClientVisitor* visitor,
+    std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)
     : QuicSession(connection,
                   owner,
                   config,
                   supported_versions,
-                  /*num_expected_unidirectional_static_streams*/ 0),
+                  /*num_expected_unidirectional_static_streams*/ 0,
+                  std::move(datagram_observer)),
       url_(url),
       origin_(origin),
       visitor_(visitor) {
diff --git a/quic/quic_transport/quic_transport_client_session.h b/quic/quic_transport/quic_transport_client_session.h
index c4f417f..b7687c0 100644
--- a/quic/quic_transport/quic_transport_client_session.h
+++ b/quic/quic_transport/quic_transport_client_session.h
@@ -16,6 +16,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_connection.h"
 #include "net/third_party/quiche/src/quic/core/quic_crypto_client_stream.h"
 #include "net/third_party/quiche/src/quic/core/quic_crypto_stream.h"
+#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
 #include "net/third_party/quiche/src/quic/core/quic_server_id.h"
 #include "net/third_party/quiche/src/quic/core/quic_session.h"
 #include "net/third_party/quiche/src/quic/core/quic_stream.h"
@@ -56,14 +57,16 @@
     virtual void OnCanCreateNewOutgoingUnidirectionalStream() = 0;
   };
 
-  QuicTransportClientSession(QuicConnection* connection,
-                             Visitor* owner,
-                             const QuicConfig& config,
-                             const ParsedQuicVersionVector& supported_versions,
-                             const GURL& url,
-                             QuicCryptoClientConfig* crypto_config,
-                             url::Origin origin,
-                             ClientVisitor* visitor);
+  QuicTransportClientSession(
+      QuicConnection* connection,
+      Visitor* owner,
+      const QuicConfig& config,
+      const ParsedQuicVersionVector& supported_versions,
+      const GURL& url,
+      QuicCryptoClientConfig* crypto_config,
+      url::Origin origin,
+      ClientVisitor* visitor,
+      std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer);
 
   std::vector<std::string> GetAlpnsToOffer() const override {
     return std::vector<std::string>({QuicTransportAlpn()});
diff --git a/quic/quic_transport/quic_transport_client_session_test.cc b/quic/quic_transport/quic_transport_client_session_test.cc
index 129c87e..b311b52 100644
--- a/quic/quic_transport/quic_transport_client_session_test.cc
+++ b/quic/quic_transport/quic_transport_client_session_test.cc
@@ -65,7 +65,7 @@
     session_ = std::make_unique<QuicTransportClientSession>(
         &connection_, nullptr, DefaultQuicConfig(), GetVersions(),
         GURL("quic-transport://test.example.com:50000" + url_suffix),
-        &crypto_config_, origin, &visitor_);
+        &crypto_config_, origin, &visitor_, /*datagram_observer=*/nullptr);
     session_->Initialize();
     crypto_stream_ = static_cast<QuicCryptoClientStream*>(
         session_->GetMutableCryptoStream());
diff --git a/quic/quic_transport/quic_transport_integration_test.cc b/quic/quic_transport/quic_transport_integration_test.cc
index 1965c6d..99c20b2 100644
--- a/quic/quic_transport/quic_transport_integration_test.cc
+++ b/quic/quic_transport/quic_transport_integration_test.cc
@@ -90,7 +90,8 @@
                  GURL("quic-transport://test.example.com:50000" + path),
                  &crypto_config_,
                  origin,
-                 &visitor_) {
+                 &visitor_,
+                 /*datagram_observer=*/nullptr) {
     session_.Initialize();
   }