Add QuicDatagramQueue, a time-bound queue for outgoing QUIC DATAGRAM frames

This solves the problem outlined in https://github.com/WICG/web-transport/issues/93#issuecomment-569120267

The plan is to use this in QuicTransport and QBONE.

gfe-relnote: n/a (code not currently used)
PiperOrigin-RevId: 288024608
Change-Id: I1d414ceb82ab1f34e64cf7af84da836f080e3b8a
diff --git a/quic/core/quic_datagram_queue.cc b/quic/core/quic_datagram_queue.cc
new file mode 100644
index 0000000..d9ed53f
--- /dev/null
+++ b/quic/core/quic_datagram_queue.cc
@@ -0,0 +1,84 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
+
+#include "net/third_party/quiche/src/quic/core/quic_constants.h"
+#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
+
+namespace quic {
+
+constexpr float kExpiryInMinRtts = 1.25;
+constexpr float kMinPacingWindows = 4;
+
+QuicDatagramQueue::QuicDatagramQueue(QuicSession* session)
+    : session_(session), clock_(session->connection()->clock()) {}
+
+MessageStatus QuicDatagramQueue::SendOrQueueDatagram(QuicMemSlice datagram) {
+  // If the queue is non-empty, always queue the daragram.  This ensures that
+  // the datagrams are sent in the same order that they were sent by the
+  // application.
+  if (queue_.empty()) {
+    QuicMemSliceSpan span(&datagram);
+    MessageResult result = session_->SendMessage(span);
+    if (result.status != MESSAGE_STATUS_BLOCKED) {
+      return result.status;
+    }
+  }
+
+  queue_.emplace_back(Datagram{std::move(datagram),
+                               clock_->ApproximateNow() + GetMaxTimeInQueue()});
+  return MESSAGE_STATUS_BLOCKED;
+}
+
+QuicOptional<MessageStatus> QuicDatagramQueue::TrySendingNextDatagram() {
+  RemoveExpiredDatagrams();
+  if (queue_.empty()) {
+    return QuicOptional<MessageStatus>();
+  }
+
+  QuicMemSliceSpan span(&queue_.front().datagram);
+  MessageResult result = session_->SendMessage(span);
+  if (result.status != MESSAGE_STATUS_BLOCKED) {
+    queue_.pop_front();
+  }
+  return result.status;
+}
+
+size_t QuicDatagramQueue::SendDatagrams() {
+  size_t num_datagrams = 0;
+  for (;;) {
+    QuicOptional<MessageStatus> status = TrySendingNextDatagram();
+    if (!status.has_value()) {
+      break;
+    }
+    if (*status == MESSAGE_STATUS_BLOCKED) {
+      break;
+    }
+    num_datagrams++;
+  }
+  return num_datagrams;
+}
+
+QuicTime::Delta QuicDatagramQueue::GetMaxTimeInQueue() const {
+  if (!max_time_in_queue_.IsZero()) {
+    return max_time_in_queue_;
+  }
+
+  const QuicTime::Delta min_rtt =
+      session_->connection()->sent_packet_manager().GetRttStats()->min_rtt();
+  return std::max(kExpiryInMinRtts * min_rtt,
+                  kMinPacingWindows * kAlarmGranularity);
+}
+
+void QuicDatagramQueue::RemoveExpiredDatagrams() {
+  QuicTime now = clock_->ApproximateNow();
+  while (!queue_.empty() && queue_.front().expiry <= now) {
+    queue_.pop_front();
+  }
+}
+
+}  // namespace quic
diff --git a/quic/core/quic_datagram_queue.h b/quic/core/quic_datagram_queue.h
new file mode 100644
index 0000000..2cb8576
--- /dev/null
+++ b/quic/core/quic_datagram_queue.h
@@ -0,0 +1,64 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
+#define QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
+
+#include "net/third_party/quiche/src/quic/core/quic_session.h"
+#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
+
+namespace quic {
+
+// Provides a way to buffer QUIC datagrams (messages) in case they cannot
+// be sent due to congestion control.  Datagrams are buffered for a limited
+// amount of time, and deleted after that time passes.
+class QUIC_EXPORT_PRIVATE QuicDatagramQueue {
+ public:
+  // |session| is not owned and must outlive this object.
+  explicit QuicDatagramQueue(QuicSession* session);
+
+  // Adds the datagram to the end of the queue.  May send it immediately; if
+  // not, MESSAGE_STATUS_BLOCKED is returned.
+  MessageStatus SendOrQueueDatagram(QuicMemSlice datagram);
+
+  // Attempts to send a single datagram from the queue.  Returns the result of
+  // SendMessage(), or nullopt if there were no unexpired datagrams to send.
+  QuicOptional<MessageStatus> TrySendingNextDatagram();
+
+  // Sends all of the unexpired datagrams until either the connection becomes
+  // write-blocked or the queue is empty.  Returns the number of datagrams sent.
+  size_t SendDatagrams();
+
+  // Returns the amount of time a datagram is allowed to be in the queue before
+  // it is dropped.  If not set explicitly using SetMaxTimeInQueue(), an
+  // RTT-based heuristic is used.
+  QuicTime::Delta GetMaxTimeInQueue() const;
+
+  void SetMaxTimeInQueue(QuicTime::Delta max_time_in_queue) {
+    max_time_in_queue_ = max_time_in_queue;
+  }
+
+  size_t queue_size() { return queue_.size(); }
+
+ private:
+  struct QUIC_EXPORT_PRIVATE Datagram {
+    QuicMemSlice datagram;
+    QuicTime expiry;
+  };
+
+  // Removes expired datagrams from the front of the queue.
+  void RemoveExpiredDatagrams();
+
+  QuicSession* session_;  // Not owned.
+  const QuicClock* clock_;
+
+  QuicTime::Delta max_time_in_queue_ = QuicTime::Delta::Zero();
+  QuicDeque<Datagram> queue_;
+};
+
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
diff --git a/quic/core/quic_datagram_queue_test.cc b/quic/core/quic_datagram_queue_test.cc
new file mode 100644
index 0000000..06bddd7
--- /dev/null
+++ b/quic/core/quic_datagram_queue_test.cc
@@ -0,0 +1,167 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
+
+#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
+#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+#include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h"
+#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
+
+namespace quic {
+namespace test {
+namespace {
+
+using testing::_;
+using testing::ElementsAre;
+using testing::Return;
+
+class EstablishedCryptoStream : public MockQuicCryptoStream {
+ public:
+  using MockQuicCryptoStream::MockQuicCryptoStream;
+
+  bool encryption_established() const override { return true; }
+};
+
+class QuicDatagramQueueTest : public QuicTest {
+ public:
+  QuicDatagramQueueTest()
+      : connection_(new MockQuicConnection(&helper_,
+                                           &alarm_factory_,
+                                           Perspective::IS_CLIENT)),
+        session_(connection_),
+        queue_(&session_) {
+    session_.SetCryptoStream(new EstablishedCryptoStream(&session_));
+  }
+
+  QuicMemSlice CreateMemSlice(quiche::QuicheStringPiece data) {
+    QuicUniqueBufferPtr buffer =
+        MakeUniqueBuffer(helper_.GetStreamSendBufferAllocator(), data.size());
+    memcpy(buffer.get(), data.data(), data.size());
+    return QuicMemSlice(std::move(buffer), data.size());
+  }
+
+ protected:
+  MockQuicConnectionHelper helper_;
+  MockAlarmFactory alarm_factory_;
+  MockQuicConnection* connection_;  // Owned by |session_|.
+  MockQuicSession session_;
+  QuicDatagramQueue queue_;
+};
+
+TEST_F(QuicDatagramQueueTest, SendDatagramImmediately) {
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_SUCCESS));
+  MessageStatus status = queue_.SendOrQueueDatagram(CreateMemSlice("test"));
+  EXPECT_EQ(MESSAGE_STATUS_SUCCESS, status);
+  EXPECT_EQ(0u, queue_.queue_size());
+}
+
+TEST_F(QuicDatagramQueueTest, SendDatagramAfterBuffering) {
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+  MessageStatus initial_status =
+      queue_.SendOrQueueDatagram(CreateMemSlice("test"));
+  EXPECT_EQ(MESSAGE_STATUS_BLOCKED, initial_status);
+  EXPECT_EQ(1u, queue_.queue_size());
+
+  // Verify getting write blocked does not remove the datagram from the queue.
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+  QuicOptional<MessageStatus> status = queue_.TrySendingNextDatagram();
+  ASSERT_TRUE(status.has_value());
+  EXPECT_EQ(MESSAGE_STATUS_BLOCKED, *status);
+  EXPECT_EQ(1u, queue_.queue_size());
+
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_SUCCESS));
+  status = queue_.TrySendingNextDatagram();
+  ASSERT_TRUE(status.has_value());
+  EXPECT_EQ(MESSAGE_STATUS_SUCCESS, *status);
+  EXPECT_EQ(0u, queue_.queue_size());
+}
+
+TEST_F(QuicDatagramQueueTest, EmptyBuffer) {
+  QuicOptional<MessageStatus> status = queue_.TrySendingNextDatagram();
+  EXPECT_FALSE(status.has_value());
+
+  size_t num_messages = queue_.SendDatagrams();
+  EXPECT_EQ(0u, num_messages);
+}
+
+TEST_F(QuicDatagramQueueTest, MultipleDatagrams) {
+  // Note that SendMessage() is called only once here, since all the remaining
+  // messages are automatically queued due to the queue being non-empty.
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+  queue_.SendOrQueueDatagram(CreateMemSlice("a"));
+  queue_.SendOrQueueDatagram(CreateMemSlice("b"));
+  queue_.SendOrQueueDatagram(CreateMemSlice("c"));
+  queue_.SendOrQueueDatagram(CreateMemSlice("d"));
+  queue_.SendOrQueueDatagram(CreateMemSlice("e"));
+
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .Times(5)
+      .WillRepeatedly(Return(MESSAGE_STATUS_SUCCESS));
+  size_t num_messages = queue_.SendDatagrams();
+  EXPECT_EQ(5u, num_messages);
+}
+
+TEST_F(QuicDatagramQueueTest, DefaultMaxTimeInQueue) {
+  EXPECT_EQ(QuicTime::Delta::Zero(),
+            connection_->sent_packet_manager().GetRttStats()->min_rtt());
+  EXPECT_EQ(QuicTime::Delta::FromMilliseconds(4), queue_.GetMaxTimeInQueue());
+
+  RttStats* stats =
+      const_cast<RttStats*>(connection_->sent_packet_manager().GetRttStats());
+  stats->UpdateRtt(QuicTime::Delta::FromMilliseconds(100),
+                   QuicTime::Delta::Zero(), helper_.GetClock()->Now());
+  EXPECT_EQ(QuicTime::Delta::FromMilliseconds(125), queue_.GetMaxTimeInQueue());
+}
+
+TEST_F(QuicDatagramQueueTest, Expiry) {
+  constexpr QuicTime::Delta expiry = QuicTime::Delta::FromMilliseconds(100);
+  queue_.SetMaxTimeInQueue(expiry);
+
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+  queue_.SendOrQueueDatagram(CreateMemSlice("a"));
+  helper_.AdvanceTime(0.6 * expiry);
+  queue_.SendOrQueueDatagram(CreateMemSlice("b"));
+  helper_.AdvanceTime(0.6 * expiry);
+  queue_.SendOrQueueDatagram(CreateMemSlice("c"));
+
+  std::vector<std::string> messages;
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillRepeatedly([&messages](QuicMessageId /*id*/,
+                                  QuicMemSliceSpan message, bool /*flush*/) {
+        messages.push_back(std::string(message.GetData(0)));
+        return MESSAGE_STATUS_SUCCESS;
+      });
+  EXPECT_EQ(2u, queue_.SendDatagrams());
+  EXPECT_THAT(messages, ElementsAre("b", "c"));
+}
+
+TEST_F(QuicDatagramQueueTest, ExpireAll) {
+  constexpr QuicTime::Delta expiry = QuicTime::Delta::FromMilliseconds(100);
+  queue_.SetMaxTimeInQueue(expiry);
+
+  EXPECT_CALL(*connection_, SendMessage(_, _, _))
+      .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+  queue_.SendOrQueueDatagram(CreateMemSlice("a"));
+  queue_.SendOrQueueDatagram(CreateMemSlice("b"));
+  queue_.SendOrQueueDatagram(CreateMemSlice("c"));
+
+  helper_.AdvanceTime(100 * expiry);
+  EXPECT_CALL(*connection_, SendMessage(_, _, _)).Times(0);
+  EXPECT_EQ(0u, queue_.SendDatagrams());
+}
+
+}  // namespace
+}  // namespace test
+}  // namespace quic