Add QuicDatagramQueue, a time-bound queue for outgoing QUIC DATAGRAM frames This solves the problem outlined in https://github.com/WICG/web-transport/issues/93#issuecomment-569120267 The plan is to use this in QuicTransport and QBONE. gfe-relnote: n/a (code not currently used) PiperOrigin-RevId: 288024608 Change-Id: I1d414ceb82ab1f34e64cf7af84da836f080e3b8a
diff --git a/quic/core/quic_datagram_queue.cc b/quic/core/quic_datagram_queue.cc new file mode 100644 index 0000000..d9ed53f --- /dev/null +++ b/quic/core/quic_datagram_queue.cc
@@ -0,0 +1,84 @@ +// Copyright 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h" + +#include "net/third_party/quiche/src/quic/core/quic_constants.h" +#include "net/third_party/quiche/src/quic/core/quic_time.h" +#include "net/third_party/quiche/src/quic/core/quic_types.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h" + +namespace quic { + +constexpr float kExpiryInMinRtts = 1.25; +constexpr float kMinPacingWindows = 4; + +QuicDatagramQueue::QuicDatagramQueue(QuicSession* session) + : session_(session), clock_(session->connection()->clock()) {} + +MessageStatus QuicDatagramQueue::SendOrQueueDatagram(QuicMemSlice datagram) { + // If the queue is non-empty, always queue the daragram. This ensures that + // the datagrams are sent in the same order that they were sent by the + // application. + if (queue_.empty()) { + QuicMemSliceSpan span(&datagram); + MessageResult result = session_->SendMessage(span); + if (result.status != MESSAGE_STATUS_BLOCKED) { + return result.status; + } + } + + queue_.emplace_back(Datagram{std::move(datagram), + clock_->ApproximateNow() + GetMaxTimeInQueue()}); + return MESSAGE_STATUS_BLOCKED; +} + +QuicOptional<MessageStatus> QuicDatagramQueue::TrySendingNextDatagram() { + RemoveExpiredDatagrams(); + if (queue_.empty()) { + return QuicOptional<MessageStatus>(); + } + + QuicMemSliceSpan span(&queue_.front().datagram); + MessageResult result = session_->SendMessage(span); + if (result.status != MESSAGE_STATUS_BLOCKED) { + queue_.pop_front(); + } + return result.status; +} + +size_t QuicDatagramQueue::SendDatagrams() { + size_t num_datagrams = 0; + for (;;) { + QuicOptional<MessageStatus> status = TrySendingNextDatagram(); + if (!status.has_value()) { + break; + } + if (*status == MESSAGE_STATUS_BLOCKED) { + break; + } + num_datagrams++; + } + return num_datagrams; +} + +QuicTime::Delta QuicDatagramQueue::GetMaxTimeInQueue() const { + if (!max_time_in_queue_.IsZero()) { + return max_time_in_queue_; + } + + const QuicTime::Delta min_rtt = + session_->connection()->sent_packet_manager().GetRttStats()->min_rtt(); + return std::max(kExpiryInMinRtts * min_rtt, + kMinPacingWindows * kAlarmGranularity); +} + +void QuicDatagramQueue::RemoveExpiredDatagrams() { + QuicTime now = clock_->ApproximateNow(); + while (!queue_.empty() && queue_.front().expiry <= now) { + queue_.pop_front(); + } +} + +} // namespace quic
diff --git a/quic/core/quic_datagram_queue.h b/quic/core/quic_datagram_queue.h new file mode 100644 index 0000000..2cb8576 --- /dev/null +++ b/quic/core/quic_datagram_queue.h
@@ -0,0 +1,64 @@ +// Copyright 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_ +#define QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_ + +#include "net/third_party/quiche/src/quic/core/quic_session.h" +#include "net/third_party/quiche/src/quic/core/quic_time.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h" + +namespace quic { + +// Provides a way to buffer QUIC datagrams (messages) in case they cannot +// be sent due to congestion control. Datagrams are buffered for a limited +// amount of time, and deleted after that time passes. +class QUIC_EXPORT_PRIVATE QuicDatagramQueue { + public: + // |session| is not owned and must outlive this object. + explicit QuicDatagramQueue(QuicSession* session); + + // Adds the datagram to the end of the queue. May send it immediately; if + // not, MESSAGE_STATUS_BLOCKED is returned. + MessageStatus SendOrQueueDatagram(QuicMemSlice datagram); + + // Attempts to send a single datagram from the queue. Returns the result of + // SendMessage(), or nullopt if there were no unexpired datagrams to send. + QuicOptional<MessageStatus> TrySendingNextDatagram(); + + // Sends all of the unexpired datagrams until either the connection becomes + // write-blocked or the queue is empty. Returns the number of datagrams sent. + size_t SendDatagrams(); + + // Returns the amount of time a datagram is allowed to be in the queue before + // it is dropped. If not set explicitly using SetMaxTimeInQueue(), an + // RTT-based heuristic is used. + QuicTime::Delta GetMaxTimeInQueue() const; + + void SetMaxTimeInQueue(QuicTime::Delta max_time_in_queue) { + max_time_in_queue_ = max_time_in_queue; + } + + size_t queue_size() { return queue_.size(); } + + private: + struct QUIC_EXPORT_PRIVATE Datagram { + QuicMemSlice datagram; + QuicTime expiry; + }; + + // Removes expired datagrams from the front of the queue. + void RemoveExpiredDatagrams(); + + QuicSession* session_; // Not owned. + const QuicClock* clock_; + + QuicTime::Delta max_time_in_queue_ = QuicTime::Delta::Zero(); + QuicDeque<Datagram> queue_; +}; + +} // namespace quic + +#endif // QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
diff --git a/quic/core/quic_datagram_queue_test.cc b/quic/core/quic_datagram_queue_test.cc new file mode 100644 index 0000000..06bddd7 --- /dev/null +++ b/quic/core/quic_datagram_queue_test.cc
@@ -0,0 +1,167 @@ +// Copyright 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h" + +#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h" +#include "net/third_party/quiche/src/quic/core/quic_time.h" +#include "net/third_party/quiche/src/quic/core/quic_types.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_test.h" +#include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h" +#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h" + +namespace quic { +namespace test { +namespace { + +using testing::_; +using testing::ElementsAre; +using testing::Return; + +class EstablishedCryptoStream : public MockQuicCryptoStream { + public: + using MockQuicCryptoStream::MockQuicCryptoStream; + + bool encryption_established() const override { return true; } +}; + +class QuicDatagramQueueTest : public QuicTest { + public: + QuicDatagramQueueTest() + : connection_(new MockQuicConnection(&helper_, + &alarm_factory_, + Perspective::IS_CLIENT)), + session_(connection_), + queue_(&session_) { + session_.SetCryptoStream(new EstablishedCryptoStream(&session_)); + } + + QuicMemSlice CreateMemSlice(quiche::QuicheStringPiece data) { + QuicUniqueBufferPtr buffer = + MakeUniqueBuffer(helper_.GetStreamSendBufferAllocator(), data.size()); + memcpy(buffer.get(), data.data(), data.size()); + return QuicMemSlice(std::move(buffer), data.size()); + } + + protected: + MockQuicConnectionHelper helper_; + MockAlarmFactory alarm_factory_; + MockQuicConnection* connection_; // Owned by |session_|. + MockQuicSession session_; + QuicDatagramQueue queue_; +}; + +TEST_F(QuicDatagramQueueTest, SendDatagramImmediately) { + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_SUCCESS)); + MessageStatus status = queue_.SendOrQueueDatagram(CreateMemSlice("test")); + EXPECT_EQ(MESSAGE_STATUS_SUCCESS, status); + EXPECT_EQ(0u, queue_.queue_size()); +} + +TEST_F(QuicDatagramQueueTest, SendDatagramAfterBuffering) { + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_BLOCKED)); + MessageStatus initial_status = + queue_.SendOrQueueDatagram(CreateMemSlice("test")); + EXPECT_EQ(MESSAGE_STATUS_BLOCKED, initial_status); + EXPECT_EQ(1u, queue_.queue_size()); + + // Verify getting write blocked does not remove the datagram from the queue. + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_BLOCKED)); + QuicOptional<MessageStatus> status = queue_.TrySendingNextDatagram(); + ASSERT_TRUE(status.has_value()); + EXPECT_EQ(MESSAGE_STATUS_BLOCKED, *status); + EXPECT_EQ(1u, queue_.queue_size()); + + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_SUCCESS)); + status = queue_.TrySendingNextDatagram(); + ASSERT_TRUE(status.has_value()); + EXPECT_EQ(MESSAGE_STATUS_SUCCESS, *status); + EXPECT_EQ(0u, queue_.queue_size()); +} + +TEST_F(QuicDatagramQueueTest, EmptyBuffer) { + QuicOptional<MessageStatus> status = queue_.TrySendingNextDatagram(); + EXPECT_FALSE(status.has_value()); + + size_t num_messages = queue_.SendDatagrams(); + EXPECT_EQ(0u, num_messages); +} + +TEST_F(QuicDatagramQueueTest, MultipleDatagrams) { + // Note that SendMessage() is called only once here, since all the remaining + // messages are automatically queued due to the queue being non-empty. + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_BLOCKED)); + queue_.SendOrQueueDatagram(CreateMemSlice("a")); + queue_.SendOrQueueDatagram(CreateMemSlice("b")); + queue_.SendOrQueueDatagram(CreateMemSlice("c")); + queue_.SendOrQueueDatagram(CreateMemSlice("d")); + queue_.SendOrQueueDatagram(CreateMemSlice("e")); + + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .Times(5) + .WillRepeatedly(Return(MESSAGE_STATUS_SUCCESS)); + size_t num_messages = queue_.SendDatagrams(); + EXPECT_EQ(5u, num_messages); +} + +TEST_F(QuicDatagramQueueTest, DefaultMaxTimeInQueue) { + EXPECT_EQ(QuicTime::Delta::Zero(), + connection_->sent_packet_manager().GetRttStats()->min_rtt()); + EXPECT_EQ(QuicTime::Delta::FromMilliseconds(4), queue_.GetMaxTimeInQueue()); + + RttStats* stats = + const_cast<RttStats*>(connection_->sent_packet_manager().GetRttStats()); + stats->UpdateRtt(QuicTime::Delta::FromMilliseconds(100), + QuicTime::Delta::Zero(), helper_.GetClock()->Now()); + EXPECT_EQ(QuicTime::Delta::FromMilliseconds(125), queue_.GetMaxTimeInQueue()); +} + +TEST_F(QuicDatagramQueueTest, Expiry) { + constexpr QuicTime::Delta expiry = QuicTime::Delta::FromMilliseconds(100); + queue_.SetMaxTimeInQueue(expiry); + + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_BLOCKED)); + queue_.SendOrQueueDatagram(CreateMemSlice("a")); + helper_.AdvanceTime(0.6 * expiry); + queue_.SendOrQueueDatagram(CreateMemSlice("b")); + helper_.AdvanceTime(0.6 * expiry); + queue_.SendOrQueueDatagram(CreateMemSlice("c")); + + std::vector<std::string> messages; + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillRepeatedly([&messages](QuicMessageId /*id*/, + QuicMemSliceSpan message, bool /*flush*/) { + messages.push_back(std::string(message.GetData(0))); + return MESSAGE_STATUS_SUCCESS; + }); + EXPECT_EQ(2u, queue_.SendDatagrams()); + EXPECT_THAT(messages, ElementsAre("b", "c")); +} + +TEST_F(QuicDatagramQueueTest, ExpireAll) { + constexpr QuicTime::Delta expiry = QuicTime::Delta::FromMilliseconds(100); + queue_.SetMaxTimeInQueue(expiry); + + EXPECT_CALL(*connection_, SendMessage(_, _, _)) + .WillOnce(Return(MESSAGE_STATUS_BLOCKED)); + queue_.SendOrQueueDatagram(CreateMemSlice("a")); + queue_.SendOrQueueDatagram(CreateMemSlice("b")); + queue_.SendOrQueueDatagram(CreateMemSlice("c")); + + helper_.AdvanceTime(100 * expiry); + EXPECT_CALL(*connection_, SendMessage(_, _, _)).Times(0); + EXPECT_EQ(0u, queue_.SendDatagrams()); +} + +} // namespace +} // namespace test +} // namespace quic