Support datagrams in QuicTransport.
gfe-relnote: n/a (no functional change to production code)
PiperOrigin-RevId: 288493541
Change-Id: I6d2a791ae062701ad4b9fb650ab267566373675f
diff --git a/quic/core/quic_datagram_queue.cc b/quic/core/quic_datagram_queue.cc
index d9ed53f..073ee10 100644
--- a/quic/core/quic_datagram_queue.cc
+++ b/quic/core/quic_datagram_queue.cc
@@ -5,6 +5,7 @@
#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
#include "net/third_party/quiche/src/quic/core/quic_constants.h"
+#include "net/third_party/quiche/src/quic/core/quic_session.h"
#include "net/third_party/quiche/src/quic/core/quic_time.h"
#include "net/third_party/quiche/src/quic/core/quic_types.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
diff --git a/quic/core/quic_datagram_queue.h b/quic/core/quic_datagram_queue.h
index 2cb8576..712a56e 100644
--- a/quic/core/quic_datagram_queue.h
+++ b/quic/core/quic_datagram_queue.h
@@ -5,13 +5,16 @@
#ifndef QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
#define QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
-#include "net/third_party/quiche/src/quic/core/quic_session.h"
#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h"
namespace quic {
+class QuicSession;
+
// Provides a way to buffer QUIC datagrams (messages) in case they cannot
// be sent due to congestion control. Datagrams are buffered for a limited
// amount of time, and deleted after that time passes.
@@ -43,6 +46,8 @@
size_t queue_size() { return queue_.size(); }
+ bool empty() { return queue_.empty(); }
+
private:
struct QUIC_EXPORT_PRIVATE Datagram {
QuicMemSlice datagram;
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index c5fe37b..55ce02d 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -94,6 +94,7 @@
goaway_received_(false),
control_frame_manager_(this),
last_message_id_(0),
+ datagram_queue_(this),
closed_streams_clean_up_alarm_(nullptr),
supported_versions_(supported_versions),
use_http2_priority_write_scheduler_(false),
@@ -571,6 +572,7 @@
? write_blocked_streams_.NumBlockedSpecialStreams()
: write_blocked_streams_.NumBlockedStreams();
if (num_writes == 0 && !control_frame_manager_.WillingToWrite() &&
+ datagram_queue_.empty() &&
(!QuicVersionUsesCryptoFrames(transport_version()) ||
!GetCryptoStream()->HasBufferedCryptoFrames())) {
return;
@@ -591,6 +593,15 @@
if (control_frame_manager_.WillingToWrite()) {
control_frame_manager_.OnCanWrite();
}
+ // TODO(b/147146815): this makes all datagrams go before stream data. We
+ // should have a better priority scheme for this.
+ if (!datagram_queue_.empty()) {
+ size_t written = datagram_queue_.SendDatagrams();
+ QUIC_DVLOG(1) << ENDPOINT << "Sent " << written << " datagrams";
+ if (!datagram_queue_.empty()) {
+ return;
+ }
+ }
for (size_t i = 0; i < num_writes; ++i) {
if (!(write_blocked_streams_.HasWriteBlockedSpecialStream() ||
write_blocked_streams_.HasWriteBlockedDataStreams())) {
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index b818737..f10b548 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -19,6 +19,7 @@
#include "net/third_party/quiche/src/quic/core/quic_connection.h"
#include "net/third_party/quiche/src/quic/core/quic_control_frame_manager.h"
#include "net/third_party/quiche/src/quic/core/quic_crypto_stream.h"
+#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
#include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
#include "net/third_party/quiche/src/quic/core/quic_packet_creator.h"
#include "net/third_party/quiche/src/quic/core/quic_packets.h"
@@ -616,6 +617,8 @@
return stream_id_manager_;
}
+ QuicDatagramQueue* datagram_queue() { return &datagram_queue_; }
+
// Processes the stream type information of |pending| depending on
// different kinds of sessions' own rules. Returns true if the pending stream
// is converted into a normal stream.
@@ -788,6 +791,9 @@
// Id of latest successfully sent message.
QuicMessageId last_message_id_;
+ // The buffer used to queue the DATAGRAM frames.
+ QuicDatagramQueue datagram_queue_;
+
// TODO(fayang): switch to linked_hash_set when chromium supports it. The bool
// is not used here.
// List of streams with pending retransmissions.
diff --git a/quic/quic_transport/quic_transport_client_session.cc b/quic/quic_transport/quic_transport_client_session.cc
index 7bc0473..d1f6e4c 100644
--- a/quic/quic_transport/quic_transport_client_session.cc
+++ b/quic/quic_transport/quic_transport_client_session.cc
@@ -7,9 +7,11 @@
#include <cstdint>
#include <limits>
#include <memory>
+#include <string>
#include <utility>
#include "url/gurl.h"
+#include "net/third_party/quiche/src/quic/core/quic_constants.h"
#include "net/third_party/quiche/src/quic/core/quic_crypto_client_stream.h"
#include "net/third_party/quiche/src/quic/core/quic_data_writer.h"
#include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
@@ -18,6 +20,7 @@
#include "net/third_party/quiche/src/quic/core/quic_versions.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h"
#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_protocol.h"
#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
@@ -35,6 +38,9 @@
void OnProofVerifyDetailsAvailable(
const ProofVerifyDetails& /*verify_details*/) override {}
};
+
+constexpr float kIncomingDatagramBufferSizeInCwnds = 2;
+
} // namespace
QuicTransportClientSession::QuicTransportClientSession(
@@ -255,4 +261,27 @@
visitor_->OnSessionReady();
}
+void QuicTransportClientSession::OnMessageReceived(
+ quiche::QuicheStringPiece message) {
+ max_incoming_datagrams_ = std::max<size_t>(
+ max_incoming_datagrams_,
+ kIncomingDatagramBufferSizeInCwnds *
+ connection()->sent_packet_manager().GetCongestionWindowInBytes());
+ if (incoming_datagrams_.size() >= max_incoming_datagrams_) {
+ return;
+ }
+
+ incoming_datagrams_.push_back(std::string(message));
+ visitor_->OnIncomingDatagramAvailable();
+}
+
+QuicOptional<std::string> QuicTransportClientSession::ReadDatagram() {
+ if (incoming_datagrams_.empty()) {
+ return QuicOptional<std::string>();
+ }
+ std::string datagram = std::move(incoming_datagrams_.front());
+ incoming_datagrams_.pop_front();
+ return datagram;
+}
+
} // namespace quic
diff --git a/quic/quic_transport/quic_transport_client_session.h b/quic/quic_transport/quic_transport_client_session.h
index d9c794d..7566964 100644
--- a/quic/quic_transport/quic_transport_client_session.h
+++ b/quic/quic_transport/quic_transport_client_session.h
@@ -21,6 +21,7 @@
#include "net/third_party/quiche/src/quic/core/quic_versions.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h"
#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_protocol.h"
#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_session_interface.h"
#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
@@ -46,6 +47,9 @@
// AcceptIncomingUnidirectionalStream().
virtual void OnIncomingBidirectionalStreamAvailable() = 0;
virtual void OnIncomingUnidirectionalStreamAvailable() = 0;
+
+ // Notifies the visitor when a new datagram has been received.
+ virtual void OnIncomingDatagramAvailable() = 0;
};
QuicTransportClientSession(QuicConnection* connection,
@@ -89,6 +93,7 @@
void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override;
void SetDefaultEncryptionLevel(EncryptionLevel level) override;
+ void OnMessageReceived(quiche::QuicheStringPiece message) override;
// Return the earliest incoming stream that has been received by the session
// but has not been accepted. Returns nullptr if there are no incoming
@@ -101,6 +106,9 @@
QuicTransportStream* OpenOutgoingBidirectionalStream();
QuicTransportStream* OpenOutgoingUnidirectionalStream();
+ using QuicSession::datagram_queue;
+ QuicOptional<std::string> ReadDatagram();
+
protected:
class QUIC_EXPORT_PRIVATE ClientIndication : public QuicStream {
public:
@@ -140,6 +148,11 @@
// before sending MAX_STREAMS.
QuicDeque<QuicTransportStream*> incoming_bidirectional_streams_;
QuicDeque<QuicTransportStream*> incoming_unidirectional_streams_;
+
+ // Buffer for the incoming datagrams.
+ QuicDeque<std::string> incoming_datagrams_;
+ // Maximum size of the |incoming_datagrams_| buffer. Can only go up.
+ size_t max_incoming_datagrams_ = 128;
};
} // namespace quic
diff --git a/quic/quic_transport/quic_transport_client_session_test.cc b/quic/quic_transport/quic_transport_client_session_test.cc
index 613ef17..3e08bf5 100644
--- a/quic/quic_transport/quic_transport_client_session_test.cc
+++ b/quic/quic_transport/quic_transport_client_session_test.cc
@@ -168,6 +168,17 @@
EXPECT_EQ(stream->id(), id);
}
+TEST_F(QuicTransportClientSessionTest, ReceiveDatagram) {
+ QuicOptional<std::string> datagram = session_->ReadDatagram();
+ EXPECT_FALSE(datagram.has_value());
+
+ EXPECT_CALL(visitor_, OnIncomingDatagramAvailable());
+ session_->OnMessageReceived("test");
+ datagram = session_->ReadDatagram();
+ ASSERT_TRUE(datagram.has_value());
+ EXPECT_EQ("test", *datagram);
+}
+
} // namespace
} // namespace test
} // namespace quic
diff --git a/quic/quic_transport/quic_transport_integration_test.cc b/quic/quic_transport/quic_transport_integration_test.cc
index f5c804d..e0a1193 100644
--- a/quic/quic_transport/quic_transport_integration_test.cc
+++ b/quic/quic_transport/quic_transport_integration_test.cc
@@ -12,11 +12,13 @@
#include "url/origin.h"
#include "net/third_party/quiche/src/quic/core/crypto/quic_crypto_client_config.h"
#include "net/third_party/quiche/src/quic/core/crypto/quic_crypto_server_config.h"
+#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
#include "net/third_party/quiche/src/quic/core/quic_connection.h"
#include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
#include "net/third_party/quiche/src/quic/core/quic_types.h"
#include "net/third_party/quiche/src/quic/core/quic_versions.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_client_session.h"
#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_server_session.h"
@@ -324,6 +326,58 @@
EXPECT_EQ(buffer, "Stream One");
}
+TEST_F(QuicTransportIntegrationTest, EchoDatagram) {
+ CreateDefaultEndpoints("/echo");
+ WireUpEndpoints();
+ RunHandshake();
+
+ client_->session()->datagram_queue()->SendOrQueueDatagram(
+ MemSliceFromString("test"));
+
+ bool datagram_received = false;
+ EXPECT_CALL(*client_->visitor(), OnIncomingDatagramAvailable())
+ .WillOnce(Assign(&datagram_received, true));
+ ASSERT_TRUE(simulator_.RunUntilOrTimeout(
+ [&datagram_received]() { return datagram_received; }, kDefaultTimeout));
+
+ QuicOptional<std::string> datagram = client_->session()->ReadDatagram();
+ ASSERT_TRUE(datagram.has_value());
+ EXPECT_EQ("test", *datagram);
+}
+
+// This test sets the datagram queue to an nearly-infinte queueing time, and
+// then sends 1000 datagrams. We expect to receive most of them back, since the
+// datagrams would be paced out by the congestion controller.
+TEST_F(QuicTransportIntegrationTest, EchoALotOfDatagrams) {
+ CreateDefaultEndpoints("/echo");
+ WireUpEndpoints();
+ RunHandshake();
+
+ // Set the datagrams to effectively never expire.
+ client_->session()->datagram_queue()->SetMaxTimeInQueue(10000 * kRtt);
+ for (int i = 0; i < 1000; i++) {
+ client_->session()->datagram_queue()->SendOrQueueDatagram(
+ MemSliceFromString(std::string(
+ client_->session()->GetGuaranteedLargestMessagePayload(), 'a')));
+ }
+
+ size_t received = 0;
+ EXPECT_CALL(*client_->visitor(), OnIncomingDatagramAvailable())
+ .WillRepeatedly([this, &received]() {
+ while (client_->session()->ReadDatagram().has_value()) {
+ received++;
+ }
+ });
+ ASSERT_TRUE(simulator_.RunUntilOrTimeout(
+ [this]() { return client_->session()->datagram_queue()->empty(); },
+ 3 * kServerBandwidth.TransferTime(1000 * kMaxOutgoingPacketSize)));
+ // Allow extra round-trips for the final flight of datagrams to arrive back.
+ simulator_.RunFor(2 * kRtt);
+
+ EXPECT_GT(received, 500u);
+ EXPECT_LT(received, 1000u);
+}
+
} // namespace
} // namespace test
} // namespace quic
diff --git a/quic/test_tools/quic_test_utils.cc b/quic/test_tools/quic_test_utils.cc
index ca89a55..edef5dd 100644
--- a/quic/test_tools/quic_test_utils.cc
+++ b/quic/test_tools/quic_test_utils.cc
@@ -17,9 +17,11 @@
#include "net/third_party/quiche/src/quic/core/crypto/null_encrypter.h"
#include "net/third_party/quiche/src/quic/core/crypto/quic_decrypter.h"
#include "net/third_party/quiche/src/quic/core/crypto/quic_encrypter.h"
+#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
#include "net/third_party/quiche/src/quic/core/quic_data_writer.h"
#include "net/third_party/quiche/src/quic/core/quic_framer.h"
#include "net/third_party/quiche/src/quic/core/quic_packet_creator.h"
+#include "net/third_party/quiche/src/quic/core/quic_simple_buffer_allocator.h"
#include "net/third_party/quiche/src/quic/core/quic_utils.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
@@ -1264,5 +1266,12 @@
return storage->ToSpan();
}
+QuicMemSlice MemSliceFromString(quiche::QuicheStringPiece data) {
+ static SimpleBufferAllocator* allocator = new SimpleBufferAllocator();
+ QuicUniqueBufferPtr buffer = MakeUniqueBuffer(allocator, data.size());
+ memcpy(buffer.get(), data.data(), data.size());
+ return QuicMemSlice(std::move(buffer), data.size());
+}
+
} // namespace test
} // namespace quic
diff --git a/quic/test_tools/quic_test_utils.h b/quic/test_tools/quic_test_utils.h
index faa505d..15d8111 100644
--- a/quic/test_tools/quic_test_utils.h
+++ b/quic/test_tools/quic_test_utils.h
@@ -1251,6 +1251,10 @@
quiche::QuicheStringPiece message_data,
QuicMemSliceStorage* storage);
+// Creates a MemSlice using a singleton trivial buffer allocator. Performs a
+// copy.
+QuicMemSlice MemSliceFromString(quiche::QuicheStringPiece data);
+
// Used to compare ReceivedPacketInfo.
MATCHER_P(ReceivedPacketInfoEquals, info, "") {
return info.ToString() == arg.ToString();
diff --git a/quic/test_tools/quic_transport_test_tools.h b/quic/test_tools/quic_transport_test_tools.h
index 9c5cef4..3d521ef 100644
--- a/quic/test_tools/quic_transport_test_tools.h
+++ b/quic/test_tools/quic_transport_test_tools.h
@@ -17,6 +17,7 @@
MOCK_METHOD0(OnSessionReady, void());
MOCK_METHOD0(OnIncomingBidirectionalStreamAvailable, void());
MOCK_METHOD0(OnIncomingUnidirectionalStreamAvailable, void());
+ MOCK_METHOD0(OnIncomingDatagramAvailable, void());
};
class MockServerVisitor : public QuicTransportServerSession::ServerVisitor {
diff --git a/quic/tools/quic_transport_simple_server_session.cc b/quic/tools/quic_transport_simple_server_session.cc
index 9fe3dfd..49c86e5 100644
--- a/quic/tools/quic_transport_simple_server_session.cc
+++ b/quic/tools/quic_transport_simple_server_session.cc
@@ -8,6 +8,7 @@
#include "url/gurl.h"
#include "url/origin.h"
+#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
#include "net/third_party/quiche/src/quic/core/quic_types.h"
#include "net/third_party/quiche/src/quic/core/quic_versions.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
@@ -212,6 +213,18 @@
return false;
}
+void QuicTransportSimpleServerSession::OnMessageReceived(
+ quiche::QuicheStringPiece message) {
+ if (mode_ != ECHO) {
+ return;
+ }
+ QuicUniqueBufferPtr buffer = MakeUniqueBuffer(
+ connection()->helper()->GetStreamSendBufferAllocator(), message.size());
+ memcpy(buffer.get(), message.data(), message.size());
+ datagram_queue()->SendOrQueueDatagram(
+ QuicMemSlice(std::move(buffer), message.size()));
+}
+
void QuicTransportSimpleServerSession::MaybeEchoStreamsBack() {
while (!streams_to_echo_back_.empty() &&
CanOpenNextOutgoingUnidirectionalStream()) {
diff --git a/quic/tools/quic_transport_simple_server_session.h b/quic/tools/quic_transport_simple_server_session.h
index a2dd6be..ccdf28b 100644
--- a/quic/tools/quic_transport_simple_server_session.h
+++ b/quic/tools/quic_transport_simple_server_session.h
@@ -51,6 +51,7 @@
void OnCanCreateNewOutgoingStream(bool unidirectional) override;
bool CheckOrigin(url::Origin origin) override;
bool ProcessPath(const GURL& url) override;
+ void OnMessageReceived(quiche::QuicheStringPiece message) override;
void EchoStreamBack(const std::string& data) {
streams_to_echo_back_.push_back(data);