Change SendOrQueueMessage to take a QuicMemSliceSpan.

QuicMemSliceSpan now has a `ConsumeAll` method in its API, which allows us to
move the mem slices into a queue of messages to send.  This is now the same
mechanism used to store mem slice spans in a stream or message's send buffer.
It ensures that mem slices are properly ref-counted so that the contents are not
deleted after storing them.

This should enable zero-copy writes down the line.  However, currently, we still
have two copies:
 - One in the application layer, where we create mem slices by copying data
 - One in the session, when converting mem slices back into a mem slice span

Nothing stops us from fixing the former, but it requires a bit of refactoring at
the application layer.

The latter requires an API for building a QuicMemSliceSpan out of QuicMemSlices
without copying them.  This should be possible, but doesn't seem to exist today.

gfe-relnote: n/a (Quartc only)
PiperOrigin-RevId: 247442584
Change-Id: Ie1bc1ad2f878ee312a5e13b048fd2233864c3d18
diff --git a/quic/quartc/quartc_session.cc b/quic/quartc/quartc_session.cc
index 4d36305..59bb2c2 100644
--- a/quic/quartc/quartc_session.cc
+++ b/quic/quartc/quartc_session.cc
@@ -41,14 +41,15 @@
       GetNextOutgoingBidirectionalStreamId(), QuicStream::kDefaultPriority));
 }
 
-bool QuartcSession::SendOrQueueMessage(std::string message) {
+bool QuartcSession::SendOrQueueMessage(QuicMemSliceSpan message) {
   if (!CanSendMessage()) {
     QUIC_LOG(ERROR) << "Quic session does not support SendMessage";
     return false;
   }
 
-  if (message.size() > GetCurrentLargestMessagePayload()) {
-    QUIC_LOG(ERROR) << "Message is too big, message_size=" << message.size()
+  if (message.total_length() > GetCurrentLargestMessagePayload()) {
+    QUIC_LOG(ERROR) << "Message is too big, message_size="
+                    << message.total_length()
                     << ", GetCurrentLargestMessagePayload="
                     << GetCurrentLargestMessagePayload();
     return false;
@@ -56,7 +57,9 @@
 
   // There may be other messages in send queue, so we have to add message
   // to the queue and call queue processing helper.
-  send_message_queue_.emplace_back(std::move(message));
+  message.ConsumeAll([this](QuicMemSlice slice) {
+    send_message_queue_.emplace_back(std::move(slice));
+  });
 
   ProcessSendMessageQueue();
 
@@ -74,7 +77,7 @@
         send_message_queue_.front().length());
     MessageResult result = SendMessage(storage.ToSpan());
 
-    const size_t message_size = send_message_queue_.front().size();
+    const size_t message_size = send_message_queue_.front().length();
 
     // Handle errors.
     switch (result.status) {
diff --git a/quic/quartc/quartc_session.h b/quic/quartc/quartc_session.h
index 9da306e..8e28add 100644
--- a/quic/quartc/quartc_session.h
+++ b/quic/quartc/quartc_session.h
@@ -48,7 +48,7 @@
   // support SendMessage API. Other unexpected errors during send will not be
   // returned, because messages can be sent later if connection is congestion
   // controlled.
-  bool SendOrQueueMessage(std::string message);
+  bool SendOrQueueMessage(QuicMemSliceSpan message);
 
   // Returns largest message payload acceptable in SendQuartcMessage.
   QuicPacketLength GetCurrentLargestMessagePayload() const {
@@ -190,7 +190,7 @@
   // Queue of pending messages sent by SendQuartcMessage that were not sent
   // yet or blocked by congestion control. Messages are queued in the order
   // of sent by SendOrQueueMessage().
-  QuicDeque<std::string> send_message_queue_;
+  QuicDeque<QuicMemSlice> send_message_queue_;
 };
 
 class QuartcClientSession : public QuartcSession,
diff --git a/quic/quartc/quartc_session_test.cc b/quic/quartc/quartc_session_test.cc
index c6488a1..75fd20a 100644
--- a/quic/quartc/quartc_session_test.cc
+++ b/quic/quartc/quartc_session_test.cc
@@ -34,6 +34,11 @@
 
 static QuicByteCount kDefaultMaxPacketSize = 1200;
 
+test::QuicTestMemSliceVector CreateMemSliceVector(QuicStringPiece data) {
+  return test::QuicTestMemSliceVector(
+      {std::pair<char*, size_t>(const_cast<char*>(data.data()), data.size())});
+}
+
 class QuartcSessionTest : public QuicTest {
  public:
   ~QuartcSessionTest() override {}
@@ -130,9 +135,7 @@
     outgoing_stream->SetDelegate(server_stream_delegate_.get());
 
     // Send a test message from peer 1 to peer 2.
-    char kTestMessage[] = "Hello";
-    test::QuicTestMemSliceVector data(
-        {std::make_pair(kTestMessage, strlen(kTestMessage))});
+    test::QuicTestMemSliceVector data = CreateMemSliceVector("Hello");
     outgoing_stream->WriteMemSlices(data.span(), /*fin=*/false);
     RunTasks();
 
@@ -144,17 +147,15 @@
     EXPECT_EQ(incoming->id(), stream_id);
     EXPECT_TRUE(client_peer_->ShouldKeepConnectionAlive());
 
-    EXPECT_EQ(client_stream_delegate_->data()[stream_id], kTestMessage);
+    EXPECT_EQ(client_stream_delegate_->data()[stream_id], "Hello");
     // Send a test message from peer 2 to peer 1.
-    char kTestResponse[] = "Response";
-    test::QuicTestMemSliceVector response(
-        {std::make_pair(kTestResponse, strlen(kTestResponse))});
+    test::QuicTestMemSliceVector response = CreateMemSliceVector("Response");
     incoming->WriteMemSlices(response.span(), /*fin=*/false);
     RunTasks();
     // Wait for peer 1 to receive messages.
     ASSERT_TRUE(server_stream_delegate_->has_data());
 
-    EXPECT_EQ(server_stream_delegate_->data()[stream_id], kTestResponse);
+    EXPECT_EQ(server_stream_delegate_->data()[stream_id], "Response");
   }
 
   // Test sending/receiving of messages for two directions.
@@ -163,7 +164,9 @@
     ASSERT_TRUE(client_peer_->CanSendMessage());
 
     // Send message from peer 1 to peer 2.
-    ASSERT_TRUE(server_peer_->SendOrQueueMessage("Message from server"));
+    test::QuicTestMemSliceVector message =
+        CreateMemSliceVector("Message from server");
+    ASSERT_TRUE(server_peer_->SendOrQueueMessage(message.span()));
 
     // First message in each direction should not be queued.
     EXPECT_EQ(server_peer_->send_message_queue_size(), 0u);
@@ -175,7 +178,8 @@
                 testing::ElementsAre("Message from server"));
 
     // Send message from peer 2 to peer 1.
-    ASSERT_TRUE(client_peer_->SendOrQueueMessage("Message from client"));
+    message = CreateMemSliceVector("Message from client");
+    ASSERT_TRUE(client_peer_->SendOrQueueMessage(message.span()));
 
     // First message in each direction should not be queued.
     EXPECT_EQ(client_peer_->send_message_queue_size(), 0u);
@@ -212,7 +216,8 @@
     while (peer_sending->send_message_queue_size() < queue_size) {
       sent_messages.push_back(
           QuicStrCat("Sending message, index=", sent_messages.size()));
-      ASSERT_TRUE(peer_sending->SendOrQueueMessage(sent_messages.back()));
+      ASSERT_TRUE(peer_sending->SendOrQueueMessage(
+          CreateMemSliceVector(sent_messages.back()).span()));
     }
 
     // Wait for peer 2 to receive all messages.
@@ -231,12 +236,15 @@
     // Send message of maximum allowed length.
     std::string message_max_long =
         std::string(server_peer_->GetCurrentLargestMessagePayload(), 'A');
-    ASSERT_TRUE(server_peer_->SendOrQueueMessage(message_max_long));
+    test::QuicTestMemSliceVector message =
+        CreateMemSliceVector(message_max_long);
+    ASSERT_TRUE(server_peer_->SendOrQueueMessage(message.span()));
 
     // Send long message which should fail.
     std::string message_too_long =
         std::string(server_peer_->GetCurrentLargestMessagePayload() + 1, 'B');
-    ASSERT_FALSE(server_peer_->SendOrQueueMessage(message_too_long));
+    message = CreateMemSliceVector(message_too_long);
+    ASSERT_FALSE(server_peer_->SendOrQueueMessage(message.span()));
 
     // Wait for peer 2 to receive message.
     RunTasks();
@@ -375,9 +383,7 @@
   QuartcStream* stream = client_peer_->CreateOutgoingBidirectionalStream();
   stream->SetDelegate(client_stream_delegate_.get());
 
-  char kClientMessage[] = "Hello";
-  test::QuicTestMemSliceVector stream_data(
-      {std::make_pair(kClientMessage, strlen(kClientMessage))});
+  test::QuicTestMemSliceVector stream_data = CreateMemSliceVector("Hello");
   stream->WriteMemSlices(stream_data.span(), /*fin=*/false);
   RunTasks();
 
@@ -412,15 +418,13 @@
 
   client_filter_->set_packets_to_drop(1);
 
-  char kClientMessage[] = "Hello";
-  test::QuicTestMemSliceVector stream_data(
-      {std::make_pair(kClientMessage, strlen(kClientMessage))});
+  test::QuicTestMemSliceVector stream_data = CreateMemSliceVector("Hello");
   stream->WriteMemSlices(stream_data.span(), /*fin=*/false);
   RunTasks();
 
   // Stream data should make it despite packet loss.
   ASSERT_TRUE(server_stream_delegate_->has_data());
-  EXPECT_EQ(server_stream_delegate_->data()[stream_id], kClientMessage);
+  EXPECT_EQ(server_stream_delegate_->data()[stream_id], "Hello");
 }
 
 TEST_F(QuartcSessionTest, StreamRetransmissionDisabled) {
@@ -450,9 +454,7 @@
 
   client_filter_->set_packets_to_drop(1);
 
-  char kMessage[] = "Hello";
-  test::QuicTestMemSliceVector stream_data(
-      {std::make_pair(kMessage, strlen(kMessage))});
+  test::QuicTestMemSliceVector stream_data = CreateMemSliceVector("Hello");
   stream->WriteMemSlices(stream_data.span(), /*fin=*/false);
   simulator_.RunFor(QuicTime::Delta::FromMilliseconds(1));
 
@@ -460,9 +462,8 @@
   QuartcStream* stream_1 = client_peer_->CreateOutgoingBidirectionalStream();
   stream_1->SetDelegate(client_stream_delegate_.get());
 
-  char kMessage1[] = "Second message";
-  test::QuicTestMemSliceVector stream_data_1(
-      {std::make_pair(kMessage1, strlen(kMessage1))});
+  test::QuicTestMemSliceVector stream_data_1 =
+      CreateMemSliceVector("Second message");
   stream_1->WriteMemSlices(stream_data_1.span(), /*fin=*/false);
   RunTasks();
 
diff --git a/quic/quartc/test/bidi_test_runner.cc b/quic/quartc/test/bidi_test_runner.cc
index 2e9ba87..07ad80e 100644
--- a/quic/quartc/test/bidi_test_runner.cc
+++ b/quic/quartc/test/bidi_test_runner.cc
@@ -90,10 +90,12 @@
 bool BidiTestRunner::RunTest(QuicTime::Delta test_duration) {
   client_peer_ = QuicMakeUnique<QuartcPeer>(
       simulator_->GetClock(), simulator_->GetAlarmFactory(),
-      simulator_->GetRandomGenerator(), client_configs_);
+      simulator_->GetRandomGenerator(),
+      simulator_->GetStreamSendBufferAllocator(), client_configs_);
   server_peer_ = QuicMakeUnique<QuartcPeer>(
       simulator_->GetClock(), simulator_->GetAlarmFactory(),
-      simulator_->GetRandomGenerator(), server_configs_);
+      simulator_->GetRandomGenerator(),
+      simulator_->GetStreamSendBufferAllocator(), server_configs_);
 
   QuartcEndpoint::Delegate* server_delegate = server_peer_.get();
   if (server_interceptor_) {
diff --git a/quic/quartc/test/quartc_peer.cc b/quic/quartc/test/quartc_peer.cc
index 3918c33..9cecd18 100644
--- a/quic/quartc/test/quartc_peer.cc
+++ b/quic/quartc/test/quartc_peer.cc
@@ -4,6 +4,7 @@
 
 #include "net/third_party/quiche/src/quic/quartc/test/quartc_peer.h"
 
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_storage.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
 
 namespace quic {
@@ -12,10 +13,12 @@
 QuartcPeer::QuartcPeer(const QuicClock* clock,
                        QuicAlarmFactory* alarm_factory,
                        QuicRandom* random,
+                       QuicBufferAllocator* buffer_allocator,
                        const std::vector<QuartcDataSource::Config>& configs)
     : clock_(clock),
       alarm_factory_(alarm_factory),
       random_(random),
+      buffer_allocator_(buffer_allocator),
       enabled_(false),
       session_(nullptr),
       configs_(configs) {}
@@ -118,7 +121,9 @@
   // Further packetization is not required, as sources are configured to produce
   // frames that fit within message payloads.
   DCHECK_LE(length, session_->GetCurrentLargestMessagePayload());
-  session_->SendOrQueueMessage(std::string(data, length));
+  struct iovec iov = {const_cast<char*>(data), length};
+  QuicMemSliceStorage storage(&iov, 1, buffer_allocator_, length);
+  session_->SendOrQueueMessage(storage.ToSpan());
 }
 
 }  // namespace test
diff --git a/quic/quartc/test/quartc_peer.h b/quic/quartc/test/quartc_peer.h
index 68cb9ee..8a62ba7 100644
--- a/quic/quartc/test/quartc_peer.h
+++ b/quic/quartc/test/quartc_peer.h
@@ -48,6 +48,7 @@
   QuartcPeer(const QuicClock* clock,
              QuicAlarmFactory* alarm_factory,
              QuicRandom* random,
+             QuicBufferAllocator* buffer_allocator,
              const std::vector<QuartcDataSource::Config>& configs);
   QuartcPeer(QuartcPeer&) = delete;
   QuartcPeer& operator=(QuartcPeer&) = delete;
@@ -94,6 +95,7 @@
   const QuicClock* clock_;
   QuicAlarmFactory* alarm_factory_;
   QuicRandom* random_;
+  QuicBufferAllocator* buffer_allocator_;
 
   // Whether the peer is currently sending.
   bool enabled_;
diff --git a/quic/quartc/test/quartc_peer_test.cc b/quic/quartc/test/quartc_peer_test.cc
index 3eca703..fe76acf 100644
--- a/quic/quartc/test/quartc_peer_test.cc
+++ b/quic/quartc/test/quartc_peer_test.cc
@@ -40,10 +40,12 @@
   void CreatePeers(const std::vector<QuartcDataSource::Config>& configs) {
     client_peer_ = QuicMakeUnique<QuartcPeer>(
         simulator_.GetClock(), simulator_.GetAlarmFactory(),
-        simulator_.GetRandomGenerator(), configs);
+        simulator_.GetRandomGenerator(),
+        simulator_.GetStreamSendBufferAllocator(), configs);
     server_peer_ = QuicMakeUnique<QuartcPeer>(
         simulator_.GetClock(), simulator_.GetAlarmFactory(),
-        simulator_.GetRandomGenerator(), configs);
+        simulator_.GetRandomGenerator(),
+        simulator_.GetStreamSendBufferAllocator(), configs);
   }
 
   void Connect() {