Add QuicDatagramQueue, a time-bound queue for outgoing QUIC DATAGRAM frames
This solves the problem outlined in https://github.com/WICG/web-transport/issues/93#issuecomment-569120267
The plan is to use this in QuicTransport and QBONE.
gfe-relnote: n/a (code not currently used)
PiperOrigin-RevId: 288024608
Change-Id: I1d414ceb82ab1f34e64cf7af84da836f080e3b8a
diff --git a/quic/core/quic_datagram_queue.cc b/quic/core/quic_datagram_queue.cc
new file mode 100644
index 0000000..d9ed53f
--- /dev/null
+++ b/quic/core/quic_datagram_queue.cc
@@ -0,0 +1,84 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
+
+#include "net/third_party/quiche/src/quic/core/quic_constants.h"
+#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
+
+namespace quic {
+
+constexpr float kExpiryInMinRtts = 1.25;
+constexpr float kMinPacingWindows = 4;
+
+QuicDatagramQueue::QuicDatagramQueue(QuicSession* session)
+ : session_(session), clock_(session->connection()->clock()) {}
+
+MessageStatus QuicDatagramQueue::SendOrQueueDatagram(QuicMemSlice datagram) {
+ // If the queue is non-empty, always queue the daragram. This ensures that
+ // the datagrams are sent in the same order that they were sent by the
+ // application.
+ if (queue_.empty()) {
+ QuicMemSliceSpan span(&datagram);
+ MessageResult result = session_->SendMessage(span);
+ if (result.status != MESSAGE_STATUS_BLOCKED) {
+ return result.status;
+ }
+ }
+
+ queue_.emplace_back(Datagram{std::move(datagram),
+ clock_->ApproximateNow() + GetMaxTimeInQueue()});
+ return MESSAGE_STATUS_BLOCKED;
+}
+
+QuicOptional<MessageStatus> QuicDatagramQueue::TrySendingNextDatagram() {
+ RemoveExpiredDatagrams();
+ if (queue_.empty()) {
+ return QuicOptional<MessageStatus>();
+ }
+
+ QuicMemSliceSpan span(&queue_.front().datagram);
+ MessageResult result = session_->SendMessage(span);
+ if (result.status != MESSAGE_STATUS_BLOCKED) {
+ queue_.pop_front();
+ }
+ return result.status;
+}
+
+size_t QuicDatagramQueue::SendDatagrams() {
+ size_t num_datagrams = 0;
+ for (;;) {
+ QuicOptional<MessageStatus> status = TrySendingNextDatagram();
+ if (!status.has_value()) {
+ break;
+ }
+ if (*status == MESSAGE_STATUS_BLOCKED) {
+ break;
+ }
+ num_datagrams++;
+ }
+ return num_datagrams;
+}
+
+QuicTime::Delta QuicDatagramQueue::GetMaxTimeInQueue() const {
+ if (!max_time_in_queue_.IsZero()) {
+ return max_time_in_queue_;
+ }
+
+ const QuicTime::Delta min_rtt =
+ session_->connection()->sent_packet_manager().GetRttStats()->min_rtt();
+ return std::max(kExpiryInMinRtts * min_rtt,
+ kMinPacingWindows * kAlarmGranularity);
+}
+
+void QuicDatagramQueue::RemoveExpiredDatagrams() {
+ QuicTime now = clock_->ApproximateNow();
+ while (!queue_.empty() && queue_.front().expiry <= now) {
+ queue_.pop_front();
+ }
+}
+
+} // namespace quic
diff --git a/quic/core/quic_datagram_queue.h b/quic/core/quic_datagram_queue.h
new file mode 100644
index 0000000..2cb8576
--- /dev/null
+++ b/quic/core/quic_datagram_queue.h
@@ -0,0 +1,64 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
+#define QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
+
+#include "net/third_party/quiche/src/quic/core/quic_session.h"
+#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
+
+namespace quic {
+
+// Provides a way to buffer QUIC datagrams (messages) in case they cannot
+// be sent due to congestion control. Datagrams are buffered for a limited
+// amount of time, and deleted after that time passes.
+class QUIC_EXPORT_PRIVATE QuicDatagramQueue {
+ public:
+ // |session| is not owned and must outlive this object.
+ explicit QuicDatagramQueue(QuicSession* session);
+
+ // Adds the datagram to the end of the queue. May send it immediately; if
+ // not, MESSAGE_STATUS_BLOCKED is returned.
+ MessageStatus SendOrQueueDatagram(QuicMemSlice datagram);
+
+ // Attempts to send a single datagram from the queue. Returns the result of
+ // SendMessage(), or nullopt if there were no unexpired datagrams to send.
+ QuicOptional<MessageStatus> TrySendingNextDatagram();
+
+ // Sends all of the unexpired datagrams until either the connection becomes
+ // write-blocked or the queue is empty. Returns the number of datagrams sent.
+ size_t SendDatagrams();
+
+ // Returns the amount of time a datagram is allowed to be in the queue before
+ // it is dropped. If not set explicitly using SetMaxTimeInQueue(), an
+ // RTT-based heuristic is used.
+ QuicTime::Delta GetMaxTimeInQueue() const;
+
+ void SetMaxTimeInQueue(QuicTime::Delta max_time_in_queue) {
+ max_time_in_queue_ = max_time_in_queue;
+ }
+
+ size_t queue_size() { return queue_.size(); }
+
+ private:
+ struct QUIC_EXPORT_PRIVATE Datagram {
+ QuicMemSlice datagram;
+ QuicTime expiry;
+ };
+
+ // Removes expired datagrams from the front of the queue.
+ void RemoveExpiredDatagrams();
+
+ QuicSession* session_; // Not owned.
+ const QuicClock* clock_;
+
+ QuicTime::Delta max_time_in_queue_ = QuicTime::Delta::Zero();
+ QuicDeque<Datagram> queue_;
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
diff --git a/quic/core/quic_datagram_queue_test.cc b/quic/core/quic_datagram_queue_test.cc
new file mode 100644
index 0000000..06bddd7
--- /dev/null
+++ b/quic/core/quic_datagram_queue_test.cc
@@ -0,0 +1,167 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
+
+#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
+#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+#include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h"
+#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
+
+namespace quic {
+namespace test {
+namespace {
+
+using testing::_;
+using testing::ElementsAre;
+using testing::Return;
+
+class EstablishedCryptoStream : public MockQuicCryptoStream {
+ public:
+ using MockQuicCryptoStream::MockQuicCryptoStream;
+
+ bool encryption_established() const override { return true; }
+};
+
+class QuicDatagramQueueTest : public QuicTest {
+ public:
+ QuicDatagramQueueTest()
+ : connection_(new MockQuicConnection(&helper_,
+ &alarm_factory_,
+ Perspective::IS_CLIENT)),
+ session_(connection_),
+ queue_(&session_) {
+ session_.SetCryptoStream(new EstablishedCryptoStream(&session_));
+ }
+
+ QuicMemSlice CreateMemSlice(quiche::QuicheStringPiece data) {
+ QuicUniqueBufferPtr buffer =
+ MakeUniqueBuffer(helper_.GetStreamSendBufferAllocator(), data.size());
+ memcpy(buffer.get(), data.data(), data.size());
+ return QuicMemSlice(std::move(buffer), data.size());
+ }
+
+ protected:
+ MockQuicConnectionHelper helper_;
+ MockAlarmFactory alarm_factory_;
+ MockQuicConnection* connection_; // Owned by |session_|.
+ MockQuicSession session_;
+ QuicDatagramQueue queue_;
+};
+
+TEST_F(QuicDatagramQueueTest, SendDatagramImmediately) {
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_SUCCESS));
+ MessageStatus status = queue_.SendOrQueueDatagram(CreateMemSlice("test"));
+ EXPECT_EQ(MESSAGE_STATUS_SUCCESS, status);
+ EXPECT_EQ(0u, queue_.queue_size());
+}
+
+TEST_F(QuicDatagramQueueTest, SendDatagramAfterBuffering) {
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+ MessageStatus initial_status =
+ queue_.SendOrQueueDatagram(CreateMemSlice("test"));
+ EXPECT_EQ(MESSAGE_STATUS_BLOCKED, initial_status);
+ EXPECT_EQ(1u, queue_.queue_size());
+
+ // Verify getting write blocked does not remove the datagram from the queue.
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+ QuicOptional<MessageStatus> status = queue_.TrySendingNextDatagram();
+ ASSERT_TRUE(status.has_value());
+ EXPECT_EQ(MESSAGE_STATUS_BLOCKED, *status);
+ EXPECT_EQ(1u, queue_.queue_size());
+
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_SUCCESS));
+ status = queue_.TrySendingNextDatagram();
+ ASSERT_TRUE(status.has_value());
+ EXPECT_EQ(MESSAGE_STATUS_SUCCESS, *status);
+ EXPECT_EQ(0u, queue_.queue_size());
+}
+
+TEST_F(QuicDatagramQueueTest, EmptyBuffer) {
+ QuicOptional<MessageStatus> status = queue_.TrySendingNextDatagram();
+ EXPECT_FALSE(status.has_value());
+
+ size_t num_messages = queue_.SendDatagrams();
+ EXPECT_EQ(0u, num_messages);
+}
+
+TEST_F(QuicDatagramQueueTest, MultipleDatagrams) {
+ // Note that SendMessage() is called only once here, since all the remaining
+ // messages are automatically queued due to the queue being non-empty.
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+ queue_.SendOrQueueDatagram(CreateMemSlice("a"));
+ queue_.SendOrQueueDatagram(CreateMemSlice("b"));
+ queue_.SendOrQueueDatagram(CreateMemSlice("c"));
+ queue_.SendOrQueueDatagram(CreateMemSlice("d"));
+ queue_.SendOrQueueDatagram(CreateMemSlice("e"));
+
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .Times(5)
+ .WillRepeatedly(Return(MESSAGE_STATUS_SUCCESS));
+ size_t num_messages = queue_.SendDatagrams();
+ EXPECT_EQ(5u, num_messages);
+}
+
+TEST_F(QuicDatagramQueueTest, DefaultMaxTimeInQueue) {
+ EXPECT_EQ(QuicTime::Delta::Zero(),
+ connection_->sent_packet_manager().GetRttStats()->min_rtt());
+ EXPECT_EQ(QuicTime::Delta::FromMilliseconds(4), queue_.GetMaxTimeInQueue());
+
+ RttStats* stats =
+ const_cast<RttStats*>(connection_->sent_packet_manager().GetRttStats());
+ stats->UpdateRtt(QuicTime::Delta::FromMilliseconds(100),
+ QuicTime::Delta::Zero(), helper_.GetClock()->Now());
+ EXPECT_EQ(QuicTime::Delta::FromMilliseconds(125), queue_.GetMaxTimeInQueue());
+}
+
+TEST_F(QuicDatagramQueueTest, Expiry) {
+ constexpr QuicTime::Delta expiry = QuicTime::Delta::FromMilliseconds(100);
+ queue_.SetMaxTimeInQueue(expiry);
+
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+ queue_.SendOrQueueDatagram(CreateMemSlice("a"));
+ helper_.AdvanceTime(0.6 * expiry);
+ queue_.SendOrQueueDatagram(CreateMemSlice("b"));
+ helper_.AdvanceTime(0.6 * expiry);
+ queue_.SendOrQueueDatagram(CreateMemSlice("c"));
+
+ std::vector<std::string> messages;
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillRepeatedly([&messages](QuicMessageId /*id*/,
+ QuicMemSliceSpan message, bool /*flush*/) {
+ messages.push_back(std::string(message.GetData(0)));
+ return MESSAGE_STATUS_SUCCESS;
+ });
+ EXPECT_EQ(2u, queue_.SendDatagrams());
+ EXPECT_THAT(messages, ElementsAre("b", "c"));
+}
+
+TEST_F(QuicDatagramQueueTest, ExpireAll) {
+ constexpr QuicTime::Delta expiry = QuicTime::Delta::FromMilliseconds(100);
+ queue_.SetMaxTimeInQueue(expiry);
+
+ EXPECT_CALL(*connection_, SendMessage(_, _, _))
+ .WillOnce(Return(MESSAGE_STATUS_BLOCKED));
+ queue_.SendOrQueueDatagram(CreateMemSlice("a"));
+ queue_.SendOrQueueDatagram(CreateMemSlice("b"));
+ queue_.SendOrQueueDatagram(CreateMemSlice("c"));
+
+ helper_.AdvanceTime(100 * expiry);
+ EXPECT_CALL(*connection_, SendMessage(_, _, _)).Times(0);
+ EXPECT_EQ(0u, queue_.SendDatagrams());
+}
+
+} // namespace
+} // namespace test
+} // namespace quic