Support datagrams in QuicTransport.

gfe-relnote: n/a (no functional change to production code)
PiperOrigin-RevId: 288493541
Change-Id: I6d2a791ae062701ad4b9fb650ab267566373675f
diff --git a/quic/core/quic_datagram_queue.cc b/quic/core/quic_datagram_queue.cc
index d9ed53f..073ee10 100644
--- a/quic/core/quic_datagram_queue.cc
+++ b/quic/core/quic_datagram_queue.cc
@@ -5,6 +5,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
 
 #include "net/third_party/quiche/src/quic/core/quic_constants.h"
+#include "net/third_party/quiche/src/quic/core/quic_session.h"
 #include "net/third_party/quiche/src/quic/core/quic_time.h"
 #include "net/third_party/quiche/src/quic/core/quic_types.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
diff --git a/quic/core/quic_datagram_queue.h b/quic/core/quic_datagram_queue.h
index 2cb8576..712a56e 100644
--- a/quic/core/quic_datagram_queue.h
+++ b/quic/core/quic_datagram_queue.h
@@ -5,13 +5,16 @@
 #ifndef QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
 #define QUICHE_QUIC_CORE_QUIC_DATAGRAM_QUEUE_H_
 
-#include "net/third_party/quiche/src/quic/core/quic_session.h"
 #include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h"
 
 namespace quic {
 
+class QuicSession;
+
 // Provides a way to buffer QUIC datagrams (messages) in case they cannot
 // be sent due to congestion control.  Datagrams are buffered for a limited
 // amount of time, and deleted after that time passes.
@@ -43,6 +46,8 @@
 
   size_t queue_size() { return queue_.size(); }
 
+  bool empty() { return queue_.empty(); }
+
  private:
   struct QUIC_EXPORT_PRIVATE Datagram {
     QuicMemSlice datagram;
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index c5fe37b..55ce02d 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -94,6 +94,7 @@
       goaway_received_(false),
       control_frame_manager_(this),
       last_message_id_(0),
+      datagram_queue_(this),
       closed_streams_clean_up_alarm_(nullptr),
       supported_versions_(supported_versions),
       use_http2_priority_write_scheduler_(false),
@@ -571,6 +572,7 @@
                           ? write_blocked_streams_.NumBlockedSpecialStreams()
                           : write_blocked_streams_.NumBlockedStreams();
   if (num_writes == 0 && !control_frame_manager_.WillingToWrite() &&
+      datagram_queue_.empty() &&
       (!QuicVersionUsesCryptoFrames(transport_version()) ||
        !GetCryptoStream()->HasBufferedCryptoFrames())) {
     return;
@@ -591,6 +593,15 @@
   if (control_frame_manager_.WillingToWrite()) {
     control_frame_manager_.OnCanWrite();
   }
+  // TODO(b/147146815): this makes all datagrams go before stream data.  We
+  // should have a better priority scheme for this.
+  if (!datagram_queue_.empty()) {
+    size_t written = datagram_queue_.SendDatagrams();
+    QUIC_DVLOG(1) << ENDPOINT << "Sent " << written << " datagrams";
+    if (!datagram_queue_.empty()) {
+      return;
+    }
+  }
   for (size_t i = 0; i < num_writes; ++i) {
     if (!(write_blocked_streams_.HasWriteBlockedSpecialStream() ||
           write_blocked_streams_.HasWriteBlockedDataStreams())) {
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index b818737..f10b548 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -19,6 +19,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_connection.h"
 #include "net/third_party/quiche/src/quic/core/quic_control_frame_manager.h"
 #include "net/third_party/quiche/src/quic/core/quic_crypto_stream.h"
+#include "net/third_party/quiche/src/quic/core/quic_datagram_queue.h"
 #include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
 #include "net/third_party/quiche/src/quic/core/quic_packet_creator.h"
 #include "net/third_party/quiche/src/quic/core/quic_packets.h"
@@ -616,6 +617,8 @@
     return stream_id_manager_;
   }
 
+  QuicDatagramQueue* datagram_queue() { return &datagram_queue_; }
+
   // Processes the stream type information of |pending| depending on
   // different kinds of sessions' own rules. Returns true if the pending stream
   // is converted into a normal stream.
@@ -788,6 +791,9 @@
   // Id of latest successfully sent message.
   QuicMessageId last_message_id_;
 
+  // The buffer used to queue the DATAGRAM frames.
+  QuicDatagramQueue datagram_queue_;
+
   // TODO(fayang): switch to linked_hash_set when chromium supports it. The bool
   // is not used here.
   // List of streams with pending retransmissions.
diff --git a/quic/quic_transport/quic_transport_client_session.cc b/quic/quic_transport/quic_transport_client_session.cc
index 7bc0473..d1f6e4c 100644
--- a/quic/quic_transport/quic_transport_client_session.cc
+++ b/quic/quic_transport/quic_transport_client_session.cc
@@ -7,9 +7,11 @@
 #include <cstdint>
 #include <limits>
 #include <memory>
+#include <string>
 #include <utility>
 
 #include "url/gurl.h"
+#include "net/third_party/quiche/src/quic/core/quic_constants.h"
 #include "net/third_party/quiche/src/quic/core/quic_crypto_client_stream.h"
 #include "net/third_party/quiche/src/quic/core/quic_data_writer.h"
 #include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
@@ -18,6 +20,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_versions.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h"
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_protocol.h"
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
 #include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
@@ -35,6 +38,9 @@
   void OnProofVerifyDetailsAvailable(
       const ProofVerifyDetails& /*verify_details*/) override {}
 };
+
+constexpr float kIncomingDatagramBufferSizeInCwnds = 2;
+
 }  // namespace
 
 QuicTransportClientSession::QuicTransportClientSession(
@@ -255,4 +261,27 @@
   visitor_->OnSessionReady();
 }
 
+void QuicTransportClientSession::OnMessageReceived(
+    quiche::QuicheStringPiece message) {
+  max_incoming_datagrams_ = std::max<size_t>(
+      max_incoming_datagrams_,
+      kIncomingDatagramBufferSizeInCwnds *
+          connection()->sent_packet_manager().GetCongestionWindowInBytes());
+  if (incoming_datagrams_.size() >= max_incoming_datagrams_) {
+    return;
+  }
+
+  incoming_datagrams_.push_back(std::string(message));
+  visitor_->OnIncomingDatagramAvailable();
+}
+
+QuicOptional<std::string> QuicTransportClientSession::ReadDatagram() {
+  if (incoming_datagrams_.empty()) {
+    return QuicOptional<std::string>();
+  }
+  std::string datagram = std::move(incoming_datagrams_.front());
+  incoming_datagrams_.pop_front();
+  return datagram;
+}
+
 }  // namespace quic
diff --git a/quic/quic_transport/quic_transport_client_session.h b/quic/quic_transport/quic_transport_client_session.h
index d9c794d..7566964 100644
--- a/quic/quic_transport/quic_transport_client_session.h
+++ b/quic/quic_transport/quic_transport_client_session.h
@@ -21,6 +21,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_versions.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h"
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_protocol.h"
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_session_interface.h"
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
@@ -46,6 +47,9 @@
     // AcceptIncomingUnidirectionalStream().
     virtual void OnIncomingBidirectionalStreamAvailable() = 0;
     virtual void OnIncomingUnidirectionalStreamAvailable() = 0;
+
+    // Notifies the visitor when a new datagram has been received.
+    virtual void OnIncomingDatagramAvailable() = 0;
   };
 
   QuicTransportClientSession(QuicConnection* connection,
@@ -89,6 +93,7 @@
 
   void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override;
   void SetDefaultEncryptionLevel(EncryptionLevel level) override;
+  void OnMessageReceived(quiche::QuicheStringPiece message) override;
 
   // Return the earliest incoming stream that has been received by the session
   // but has not been accepted.  Returns nullptr if there are no incoming
@@ -101,6 +106,9 @@
   QuicTransportStream* OpenOutgoingBidirectionalStream();
   QuicTransportStream* OpenOutgoingUnidirectionalStream();
 
+  using QuicSession::datagram_queue;
+  QuicOptional<std::string> ReadDatagram();
+
  protected:
   class QUIC_EXPORT_PRIVATE ClientIndication : public QuicStream {
    public:
@@ -140,6 +148,11 @@
   // before sending MAX_STREAMS.
   QuicDeque<QuicTransportStream*> incoming_bidirectional_streams_;
   QuicDeque<QuicTransportStream*> incoming_unidirectional_streams_;
+
+  // Buffer for the incoming datagrams.
+  QuicDeque<std::string> incoming_datagrams_;
+  // Maximum size of the |incoming_datagrams_| buffer.  Can only go up.
+  size_t max_incoming_datagrams_ = 128;
 };
 
 }  // namespace quic
diff --git a/quic/quic_transport/quic_transport_client_session_test.cc b/quic/quic_transport/quic_transport_client_session_test.cc
index 613ef17..3e08bf5 100644
--- a/quic/quic_transport/quic_transport_client_session_test.cc
+++ b/quic/quic_transport/quic_transport_client_session_test.cc
@@ -168,6 +168,17 @@
   EXPECT_EQ(stream->id(), id);
 }
 
+TEST_F(QuicTransportClientSessionTest, ReceiveDatagram) {
+  QuicOptional<std::string> datagram = session_->ReadDatagram();
+  EXPECT_FALSE(datagram.has_value());
+
+  EXPECT_CALL(visitor_, OnIncomingDatagramAvailable());
+  session_->OnMessageReceived("test");
+  datagram = session_->ReadDatagram();
+  ASSERT_TRUE(datagram.has_value());
+  EXPECT_EQ("test", *datagram);
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/quic_transport/quic_transport_integration_test.cc b/quic/quic_transport/quic_transport_integration_test.cc
index f5c804d..e0a1193 100644
--- a/quic/quic_transport/quic_transport_integration_test.cc
+++ b/quic/quic_transport/quic_transport_integration_test.cc
@@ -12,11 +12,13 @@
 #include "url/origin.h"
 #include "net/third_party/quiche/src/quic/core/crypto/quic_crypto_client_config.h"
 #include "net/third_party/quiche/src/quic/core/crypto/quic_crypto_server_config.h"
+#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
 #include "net/third_party/quiche/src/quic/core/quic_connection.h"
 #include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
 #include "net/third_party/quiche/src/quic/core/quic_types.h"
 #include "net/third_party/quiche/src/quic/core/quic_versions.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_optional.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_client_session.h"
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_server_session.h"
@@ -324,6 +326,58 @@
   EXPECT_EQ(buffer, "Stream One");
 }
 
+TEST_F(QuicTransportIntegrationTest, EchoDatagram) {
+  CreateDefaultEndpoints("/echo");
+  WireUpEndpoints();
+  RunHandshake();
+
+  client_->session()->datagram_queue()->SendOrQueueDatagram(
+      MemSliceFromString("test"));
+
+  bool datagram_received = false;
+  EXPECT_CALL(*client_->visitor(), OnIncomingDatagramAvailable())
+      .WillOnce(Assign(&datagram_received, true));
+  ASSERT_TRUE(simulator_.RunUntilOrTimeout(
+      [&datagram_received]() { return datagram_received; }, kDefaultTimeout));
+
+  QuicOptional<std::string> datagram = client_->session()->ReadDatagram();
+  ASSERT_TRUE(datagram.has_value());
+  EXPECT_EQ("test", *datagram);
+}
+
+// This test sets the datagram queue to an nearly-infinte queueing time, and
+// then sends 1000 datagrams.  We expect to receive most of them back, since the
+// datagrams would be paced out by the congestion controller.
+TEST_F(QuicTransportIntegrationTest, EchoALotOfDatagrams) {
+  CreateDefaultEndpoints("/echo");
+  WireUpEndpoints();
+  RunHandshake();
+
+  // Set the datagrams to effectively never expire.
+  client_->session()->datagram_queue()->SetMaxTimeInQueue(10000 * kRtt);
+  for (int i = 0; i < 1000; i++) {
+    client_->session()->datagram_queue()->SendOrQueueDatagram(
+        MemSliceFromString(std::string(
+            client_->session()->GetGuaranteedLargestMessagePayload(), 'a')));
+  }
+
+  size_t received = 0;
+  EXPECT_CALL(*client_->visitor(), OnIncomingDatagramAvailable())
+      .WillRepeatedly([this, &received]() {
+        while (client_->session()->ReadDatagram().has_value()) {
+          received++;
+        }
+      });
+  ASSERT_TRUE(simulator_.RunUntilOrTimeout(
+      [this]() { return client_->session()->datagram_queue()->empty(); },
+      3 * kServerBandwidth.TransferTime(1000 * kMaxOutgoingPacketSize)));
+  // Allow extra round-trips for the final flight of datagrams to arrive back.
+  simulator_.RunFor(2 * kRtt);
+
+  EXPECT_GT(received, 500u);
+  EXPECT_LT(received, 1000u);
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/test_tools/quic_test_utils.cc b/quic/test_tools/quic_test_utils.cc
index ca89a55..edef5dd 100644
--- a/quic/test_tools/quic_test_utils.cc
+++ b/quic/test_tools/quic_test_utils.cc
@@ -17,9 +17,11 @@
 #include "net/third_party/quiche/src/quic/core/crypto/null_encrypter.h"
 #include "net/third_party/quiche/src/quic/core/crypto/quic_decrypter.h"
 #include "net/third_party/quiche/src/quic/core/crypto/quic_encrypter.h"
+#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
 #include "net/third_party/quiche/src/quic/core/quic_data_writer.h"
 #include "net/third_party/quiche/src/quic/core/quic_framer.h"
 #include "net/third_party/quiche/src/quic/core/quic_packet_creator.h"
+#include "net/third_party/quiche/src/quic/core/quic_simple_buffer_allocator.h"
 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
@@ -1264,5 +1266,12 @@
   return storage->ToSpan();
 }
 
+QuicMemSlice MemSliceFromString(quiche::QuicheStringPiece data) {
+  static SimpleBufferAllocator* allocator = new SimpleBufferAllocator();
+  QuicUniqueBufferPtr buffer = MakeUniqueBuffer(allocator, data.size());
+  memcpy(buffer.get(), data.data(), data.size());
+  return QuicMemSlice(std::move(buffer), data.size());
+}
+
 }  // namespace test
 }  // namespace quic
diff --git a/quic/test_tools/quic_test_utils.h b/quic/test_tools/quic_test_utils.h
index faa505d..15d8111 100644
--- a/quic/test_tools/quic_test_utils.h
+++ b/quic/test_tools/quic_test_utils.h
@@ -1251,6 +1251,10 @@
                           quiche::QuicheStringPiece message_data,
                           QuicMemSliceStorage* storage);
 
+// Creates a MemSlice using a singleton trivial buffer allocator.  Performs a
+// copy.
+QuicMemSlice MemSliceFromString(quiche::QuicheStringPiece data);
+
 // Used to compare ReceivedPacketInfo.
 MATCHER_P(ReceivedPacketInfoEquals, info, "") {
   return info.ToString() == arg.ToString();
diff --git a/quic/test_tools/quic_transport_test_tools.h b/quic/test_tools/quic_transport_test_tools.h
index 9c5cef4..3d521ef 100644
--- a/quic/test_tools/quic_transport_test_tools.h
+++ b/quic/test_tools/quic_transport_test_tools.h
@@ -17,6 +17,7 @@
   MOCK_METHOD0(OnSessionReady, void());
   MOCK_METHOD0(OnIncomingBidirectionalStreamAvailable, void());
   MOCK_METHOD0(OnIncomingUnidirectionalStreamAvailable, void());
+  MOCK_METHOD0(OnIncomingDatagramAvailable, void());
 };
 
 class MockServerVisitor : public QuicTransportServerSession::ServerVisitor {
diff --git a/quic/tools/quic_transport_simple_server_session.cc b/quic/tools/quic_transport_simple_server_session.cc
index 9fe3dfd..49c86e5 100644
--- a/quic/tools/quic_transport_simple_server_session.cc
+++ b/quic/tools/quic_transport_simple_server_session.cc
@@ -8,6 +8,7 @@
 
 #include "url/gurl.h"
 #include "url/origin.h"
+#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
 #include "net/third_party/quiche/src/quic/core/quic_types.h"
 #include "net/third_party/quiche/src/quic/core/quic_versions.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
@@ -212,6 +213,18 @@
   return false;
 }
 
+void QuicTransportSimpleServerSession::OnMessageReceived(
+    quiche::QuicheStringPiece message) {
+  if (mode_ != ECHO) {
+    return;
+  }
+  QuicUniqueBufferPtr buffer = MakeUniqueBuffer(
+      connection()->helper()->GetStreamSendBufferAllocator(), message.size());
+  memcpy(buffer.get(), message.data(), message.size());
+  datagram_queue()->SendOrQueueDatagram(
+      QuicMemSlice(std::move(buffer), message.size()));
+}
+
 void QuicTransportSimpleServerSession::MaybeEchoStreamsBack() {
   while (!streams_to_echo_back_.empty() &&
          CanOpenNextOutgoingUnidirectionalStream()) {
diff --git a/quic/tools/quic_transport_simple_server_session.h b/quic/tools/quic_transport_simple_server_session.h
index a2dd6be..ccdf28b 100644
--- a/quic/tools/quic_transport_simple_server_session.h
+++ b/quic/tools/quic_transport_simple_server_session.h
@@ -51,6 +51,7 @@
   void OnCanCreateNewOutgoingStream(bool unidirectional) override;
   bool CheckOrigin(url::Origin origin) override;
   bool ProcessPath(const GURL& url) override;
+  void OnMessageReceived(quiche::QuicheStringPiece message) override;
 
   void EchoStreamBack(const std::string& data) {
     streams_to_echo_back_.push_back(data);