Support datagrams in QuicTransport. gfe-relnote: n/a (no functional change to production code) PiperOrigin-RevId: 288493541 Change-Id: I6d2a791ae062701ad4b9fb650ab267566373675f
diff --git a/quic/core/quic_datagram_queue.cc b/quic/core/quic_datagram_queue.cc index d9ed53f..073ee10 100644 --- a/quic/core/quic_datagram_queue.cc +++ b/quic/core/quic_datagram_queue.cc
@@ -5,6 +5,7 @@ #include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h" #include "net/third_party/quiche/src/quic/core/quic_constants.h" +#include "net/third_party/quiche/src/quic/core/quic_session.h" #include "net/third_party/quiche/src/quic/core/quic_time.h" #include "net/third_party/quiche/src/quic/core/quic_types.h" #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
diff --git a/quic/core/quic_datagram_queue.h b/quic/core/quic_datagram_queue.h index 2cb8576..712a56e 100644 --- a/quic/core/quic_datagram_queue.h +++ b/quic/core/quic_datagram_queue.h
@@ -5,13 +5,16 @@ #ifndef QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_ #define QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_ -#include "net/third_party/quiche/src/quic/core/quic_session.h" #include "net/third_party/quiche/src/quic/core/quic_time.h" +#include "net/third_party/quiche/src/quic/core/quic_types.h" #include "net/third_party/quiche/src/quic/platform/api/quic_containers.h" #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h" namespace quic { +class QuicSession; + // Provides a way to buffer QUIC datagrams (messages) in case they cannot // be sent due to congestion control. Datagrams are buffered for a limited // amount of time, and deleted after that time passes. @@ -43,6 +46,8 @@ size_t queue_size() { return queue_.size(); } + bool empty() { return queue_.empty(); } + private: struct QUIC_EXPORT_PRIVATE Datagram { QuicMemSlice datagram;
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc index c5fe37b..55ce02d 100644 --- a/quic/core/quic_session.cc +++ b/quic/core/quic_session.cc
@@ -94,6 +94,7 @@ goaway_received_(false), control_frame_manager_(this), last_message_id_(0), + datagram_queue_(this), closed_streams_clean_up_alarm_(nullptr), supported_versions_(supported_versions), use_http2_priority_write_scheduler_(false), @@ -571,6 +572,7 @@ ? write_blocked_streams_.NumBlockedSpecialStreams() : write_blocked_streams_.NumBlockedStreams(); if (num_writes == 0 && !control_frame_manager_.WillingToWrite() && + datagram_queue_.empty() && (!QuicVersionUsesCryptoFrames(transport_version()) || !GetCryptoStream()->HasBufferedCryptoFrames())) { return; @@ -591,6 +593,15 @@ if (control_frame_manager_.WillingToWrite()) { control_frame_manager_.OnCanWrite(); } + // TODO(b/147146815): this makes all datagrams go before stream data. We + // should have a better priority scheme for this. + if (!datagram_queue_.empty()) { + size_t written = datagram_queue_.SendDatagrams(); + QUIC_DVLOG(1) << ENDPOINT << "Sent " << written << " datagrams"; + if (!datagram_queue_.empty()) { + return; + } + } for (size_t i = 0; i < num_writes; ++i) { if (!(write_blocked_streams_.HasWriteBlockedSpecialStream() || write_blocked_streams_.HasWriteBlockedDataStreams())) {
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h index b818737..f10b548 100644 --- a/quic/core/quic_session.h +++ b/quic/core/quic_session.h
@@ -19,6 +19,7 @@ #include "net/third_party/quiche/src/quic/core/quic_connection.h" #include "net/third_party/quiche/src/quic/core/quic_control_frame_manager.h" #include "net/third_party/quiche/src/quic/core/quic_crypto_stream.h" +#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h" #include "net/third_party/quiche/src/quic/core/quic_error_codes.h" #include "net/third_party/quiche/src/quic/core/quic_packet_creator.h" #include "net/third_party/quiche/src/quic/core/quic_packets.h" @@ -616,6 +617,8 @@ return stream_id_manager_; } + QuicDatagramQueue* datagram_queue() { return &datagram_queue_; } + // Processes the stream type information of |pending| depending on // different kinds of sessions' own rules. Returns true if the pending stream // is converted into a normal stream. @@ -788,6 +791,9 @@ // Id of latest successfully sent message. QuicMessageId last_message_id_; + // The buffer used to queue the DATAGRAM frames. + QuicDatagramQueue datagram_queue_; + // TODO(fayang): switch to linked_hash_set when chromium supports it. The bool // is not used here. // List of streams with pending retransmissions.
diff --git a/quic/quic_transport/quic_transport_client_session.cc b/quic/quic_transport/quic_transport_client_session.cc index 7bc0473..d1f6e4c 100644 --- a/quic/quic_transport/quic_transport_client_session.cc +++ b/quic/quic_transport/quic_transport_client_session.cc
@@ -7,9 +7,11 @@ #include <cstdint> #include <limits> #include <memory> +#include <string> #include <utility> #include "url/gurl.h" +#include "net/third_party/quiche/src/quic/core/quic_constants.h" #include "net/third_party/quiche/src/quic/core/quic_crypto_client_stream.h" #include "net/third_party/quiche/src/quic/core/quic_data_writer.h" #include "net/third_party/quiche/src/quic/core/quic_error_codes.h" @@ -18,6 +20,7 @@ #include "net/third_party/quiche/src/quic/core/quic_versions.h" #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h" #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h" #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_protocol.h" #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h" #include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h" @@ -35,6 +38,9 @@ void OnProofVerifyDetailsAvailable( const ProofVerifyDetails& /*verify_details*/) override {} }; + +constexpr float kIncomingDatagramBufferSizeInCwnds = 2; + } // namespace QuicTransportClientSession::QuicTransportClientSession( @@ -255,4 +261,27 @@ visitor_->OnSessionReady(); } +void QuicTransportClientSession::OnMessageReceived( + quiche::QuicheStringPiece message) { + max_incoming_datagrams_ = std::max<size_t>( + max_incoming_datagrams_, + kIncomingDatagramBufferSizeInCwnds * + connection()->sent_packet_manager().GetCongestionWindowInBytes()); + if (incoming_datagrams_.size() >= max_incoming_datagrams_) { + return; + } + + incoming_datagrams_.push_back(std::string(message)); + visitor_->OnIncomingDatagramAvailable(); +} + +QuicOptional<std::string> QuicTransportClientSession::ReadDatagram() { + if (incoming_datagrams_.empty()) { + return QuicOptional<std::string>(); + } + std::string datagram = std::move(incoming_datagrams_.front()); + incoming_datagrams_.pop_front(); + return datagram; +} + } // namespace quic
diff --git a/quic/quic_transport/quic_transport_client_session.h b/quic/quic_transport/quic_transport_client_session.h index d9c794d..7566964 100644 --- a/quic/quic_transport/quic_transport_client_session.h +++ b/quic/quic_transport/quic_transport_client_session.h
@@ -21,6 +21,7 @@ #include "net/third_party/quiche/src/quic/core/quic_versions.h" #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h" #include "net/third_party/quiche/src/quic/platform/api/quic_containers.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h" #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_protocol.h" #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_session_interface.h" #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h" @@ -46,6 +47,9 @@ // AcceptIncomingUnidirectionalStream(). virtual void OnIncomingBidirectionalStreamAvailable() = 0; virtual void OnIncomingUnidirectionalStreamAvailable() = 0; + + // Notifies the visitor when a new datagram has been received. + virtual void OnIncomingDatagramAvailable() = 0; }; QuicTransportClientSession(QuicConnection* connection, @@ -89,6 +93,7 @@ void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override; void SetDefaultEncryptionLevel(EncryptionLevel level) override; + void OnMessageReceived(quiche::QuicheStringPiece message) override; // Return the earliest incoming stream that has been received by the session // but has not been accepted. Returns nullptr if there are no incoming @@ -101,6 +106,9 @@ QuicTransportStream* OpenOutgoingBidirectionalStream(); QuicTransportStream* OpenOutgoingUnidirectionalStream(); + using QuicSession::datagram_queue; + QuicOptional<std::string> ReadDatagram(); + protected: class QUIC_EXPORT_PRIVATE ClientIndication : public QuicStream { public: @@ -140,6 +148,11 @@ // before sending MAX_STREAMS. QuicDeque<QuicTransportStream*> incoming_bidirectional_streams_; QuicDeque<QuicTransportStream*> incoming_unidirectional_streams_; + + // Buffer for the incoming datagrams. + QuicDeque<std::string> incoming_datagrams_; + // Maximum size of the |incoming_datagrams_| buffer. Can only go up. + size_t max_incoming_datagrams_ = 128; }; } // namespace quic
diff --git a/quic/quic_transport/quic_transport_client_session_test.cc b/quic/quic_transport/quic_transport_client_session_test.cc index 613ef17..3e08bf5 100644 --- a/quic/quic_transport/quic_transport_client_session_test.cc +++ b/quic/quic_transport/quic_transport_client_session_test.cc
@@ -168,6 +168,17 @@ EXPECT_EQ(stream->id(), id); } +TEST_F(QuicTransportClientSessionTest, ReceiveDatagram) { + QuicOptional<std::string> datagram = session_->ReadDatagram(); + EXPECT_FALSE(datagram.has_value()); + + EXPECT_CALL(visitor_, OnIncomingDatagramAvailable()); + session_->OnMessageReceived("test"); + datagram = session_->ReadDatagram(); + ASSERT_TRUE(datagram.has_value()); + EXPECT_EQ("test", *datagram); +} + } // namespace } // namespace test } // namespace quic
diff --git a/quic/quic_transport/quic_transport_integration_test.cc b/quic/quic_transport/quic_transport_integration_test.cc index f5c804d..e0a1193 100644 --- a/quic/quic_transport/quic_transport_integration_test.cc +++ b/quic/quic_transport/quic_transport_integration_test.cc
@@ -12,11 +12,13 @@ #include "url/origin.h" #include "net/third_party/quiche/src/quic/core/crypto/quic_crypto_client_config.h" #include "net/third_party/quiche/src/quic/core/crypto/quic_crypto_server_config.h" +#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h" #include "net/third_party/quiche/src/quic/core/quic_connection.h" #include "net/third_party/quiche/src/quic/core/quic_error_codes.h" #include "net/third_party/quiche/src/quic/core/quic_types.h" #include "net/third_party/quiche/src/quic/core/quic_versions.h" #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h" #include "net/third_party/quiche/src/quic/platform/api/quic_test.h" #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_client_session.h" #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_server_session.h" @@ -324,6 +326,58 @@ EXPECT_EQ(buffer, "Stream One"); } +TEST_F(QuicTransportIntegrationTest, EchoDatagram) { + CreateDefaultEndpoints("/echo"); + WireUpEndpoints(); + RunHandshake(); + + client_->session()->datagram_queue()->SendOrQueueDatagram( + MemSliceFromString("test")); + + bool datagram_received = false; + EXPECT_CALL(*client_->visitor(), OnIncomingDatagramAvailable()) + .WillOnce(Assign(&datagram_received, true)); + ASSERT_TRUE(simulator_.RunUntilOrTimeout( + [&datagram_received]() { return datagram_received; }, kDefaultTimeout)); + + QuicOptional<std::string> datagram = client_->session()->ReadDatagram(); + ASSERT_TRUE(datagram.has_value()); + EXPECT_EQ("test", *datagram); +} + +// This test sets the datagram queue to an nearly-infinte queueing time, and +// then sends 1000 datagrams. We expect to receive most of them back, since the +// datagrams would be paced out by the congestion controller. +TEST_F(QuicTransportIntegrationTest, EchoALotOfDatagrams) { + CreateDefaultEndpoints("/echo"); + WireUpEndpoints(); + RunHandshake(); + + // Set the datagrams to effectively never expire. + client_->session()->datagram_queue()->SetMaxTimeInQueue(10000 * kRtt); + for (int i = 0; i < 1000; i++) { + client_->session()->datagram_queue()->SendOrQueueDatagram( + MemSliceFromString(std::string( + client_->session()->GetGuaranteedLargestMessagePayload(), 'a'))); + } + + size_t received = 0; + EXPECT_CALL(*client_->visitor(), OnIncomingDatagramAvailable()) + .WillRepeatedly([this, &received]() { + while (client_->session()->ReadDatagram().has_value()) { + received++; + } + }); + ASSERT_TRUE(simulator_.RunUntilOrTimeout( + [this]() { return client_->session()->datagram_queue()->empty(); }, + 3 * kServerBandwidth.TransferTime(1000 * kMaxOutgoingPacketSize))); + // Allow extra round-trips for the final flight of datagrams to arrive back. + simulator_.RunFor(2 * kRtt); + + EXPECT_GT(received, 500u); + EXPECT_LT(received, 1000u); +} + } // namespace } // namespace test } // namespace quic
diff --git a/quic/test_tools/quic_test_utils.cc b/quic/test_tools/quic_test_utils.cc index ca89a55..edef5dd 100644 --- a/quic/test_tools/quic_test_utils.cc +++ b/quic/test_tools/quic_test_utils.cc
@@ -17,9 +17,11 @@ #include "net/third_party/quiche/src/quic/core/crypto/null_encrypter.h" #include "net/third_party/quiche/src/quic/core/crypto/quic_decrypter.h" #include "net/third_party/quiche/src/quic/core/crypto/quic_encrypter.h" +#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h" #include "net/third_party/quiche/src/quic/core/quic_data_writer.h" #include "net/third_party/quiche/src/quic/core/quic_framer.h" #include "net/third_party/quiche/src/quic/core/quic_packet_creator.h" +#include "net/third_party/quiche/src/quic/core/quic_simple_buffer_allocator.h" #include "net/third_party/quiche/src/quic/core/quic_utils.h" #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h" #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h" @@ -1264,5 +1266,12 @@ return storage->ToSpan(); } +QuicMemSlice MemSliceFromString(quiche::QuicheStringPiece data) { + static SimpleBufferAllocator* allocator = new SimpleBufferAllocator(); + QuicUniqueBufferPtr buffer = MakeUniqueBuffer(allocator, data.size()); + memcpy(buffer.get(), data.data(), data.size()); + return QuicMemSlice(std::move(buffer), data.size()); +} + } // namespace test } // namespace quic
diff --git a/quic/test_tools/quic_test_utils.h b/quic/test_tools/quic_test_utils.h index faa505d..15d8111 100644 --- a/quic/test_tools/quic_test_utils.h +++ b/quic/test_tools/quic_test_utils.h
@@ -1251,6 +1251,10 @@ quiche::QuicheStringPiece message_data, QuicMemSliceStorage* storage); +// Creates a MemSlice using a singleton trivial buffer allocator. Performs a +// copy. +QuicMemSlice MemSliceFromString(quiche::QuicheStringPiece data); + // Used to compare ReceivedPacketInfo. MATCHER_P(ReceivedPacketInfoEquals, info, "") { return info.ToString() == arg.ToString();
diff --git a/quic/test_tools/quic_transport_test_tools.h b/quic/test_tools/quic_transport_test_tools.h index 9c5cef4..3d521ef 100644 --- a/quic/test_tools/quic_transport_test_tools.h +++ b/quic/test_tools/quic_transport_test_tools.h
@@ -17,6 +17,7 @@ MOCK_METHOD0(OnSessionReady, void()); MOCK_METHOD0(OnIncomingBidirectionalStreamAvailable, void()); MOCK_METHOD0(OnIncomingUnidirectionalStreamAvailable, void()); + MOCK_METHOD0(OnIncomingDatagramAvailable, void()); }; class MockServerVisitor : public QuicTransportServerSession::ServerVisitor {
diff --git a/quic/tools/quic_transport_simple_server_session.cc b/quic/tools/quic_transport_simple_server_session.cc index 9fe3dfd..49c86e5 100644 --- a/quic/tools/quic_transport_simple_server_session.cc +++ b/quic/tools/quic_transport_simple_server_session.cc
@@ -8,6 +8,7 @@ #include "url/gurl.h" #include "url/origin.h" +#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h" #include "net/third_party/quiche/src/quic/core/quic_types.h" #include "net/third_party/quiche/src/quic/core/quic_versions.h" #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h" @@ -212,6 +213,18 @@ return false; } +void QuicTransportSimpleServerSession::OnMessageReceived( + quiche::QuicheStringPiece message) { + if (mode_ != ECHO) { + return; + } + QuicUniqueBufferPtr buffer = MakeUniqueBuffer( + connection()->helper()->GetStreamSendBufferAllocator(), message.size()); + memcpy(buffer.get(), message.data(), message.size()); + datagram_queue()->SendOrQueueDatagram( + QuicMemSlice(std::move(buffer), message.size())); +} + void QuicTransportSimpleServerSession::MaybeEchoStreamsBack() { while (!streams_to_echo_back_.empty() && CanOpenNextOutgoingUnidirectionalStream()) {
diff --git a/quic/tools/quic_transport_simple_server_session.h b/quic/tools/quic_transport_simple_server_session.h index a2dd6be..ccdf28b 100644 --- a/quic/tools/quic_transport_simple_server_session.h +++ b/quic/tools/quic_transport_simple_server_session.h
@@ -51,6 +51,7 @@ void OnCanCreateNewOutgoingStream(bool unidirectional) override; bool CheckOrigin(url::Origin origin) override; bool ProcessPath(const GURL& url) override; + void OnMessageReceived(quiche::QuicheStringPiece message) override; void EchoStreamBack(const std::string& data) { streams_to_echo_back_.push_back(data);