Change SendOrQueueMessage to take a QuicMemSliceSpan.

QuicMemSliceSpan now has a `ConsumeAll` method in its API, which allows us to
move the mem slices into a queue of messages to send.  This is now the same
mechanism used to store mem slice spans in a stream or message's send buffer.
It ensures that mem slices are properly ref-counted so that the contents are not
deleted after storing them.

This should enable zero-copy writes down the line.  However, currently, we still
have two copies:
 - One in the application layer, where we create mem slices by copying data
 - One in the session, when converting mem slices back into a mem slice span

Nothing stops us from fixing the former, but it requires a bit of refactoring at
the application layer.

The latter requires an API for building a QuicMemSliceSpan out of QuicMemSlices
without copying them.  This should be possible, but doesn't seem to exist today.

gfe-relnote: n/a (Quartc only)
PiperOrigin-RevId: 247442584
Change-Id: Ie1bc1ad2f878ee312a5e13b048fd2233864c3d18
diff --git a/quic/quartc/quartc_session_test.cc b/quic/quartc/quartc_session_test.cc
index c6488a1..75fd20a 100644
--- a/quic/quartc/quartc_session_test.cc
+++ b/quic/quartc/quartc_session_test.cc
@@ -34,6 +34,11 @@
 
 static QuicByteCount kDefaultMaxPacketSize = 1200;
 
+test::QuicTestMemSliceVector CreateMemSliceVector(QuicStringPiece data) {
+  return test::QuicTestMemSliceVector(
+      {std::pair<char*, size_t>(const_cast<char*>(data.data()), data.size())});
+}
+
 class QuartcSessionTest : public QuicTest {
  public:
   ~QuartcSessionTest() override {}
@@ -130,9 +135,7 @@
     outgoing_stream->SetDelegate(server_stream_delegate_.get());
 
     // Send a test message from peer 1 to peer 2.
-    char kTestMessage[] = "Hello";
-    test::QuicTestMemSliceVector data(
-        {std::make_pair(kTestMessage, strlen(kTestMessage))});
+    test::QuicTestMemSliceVector data = CreateMemSliceVector("Hello");
     outgoing_stream->WriteMemSlices(data.span(), /*fin=*/false);
     RunTasks();
 
@@ -144,17 +147,15 @@
     EXPECT_EQ(incoming->id(), stream_id);
     EXPECT_TRUE(client_peer_->ShouldKeepConnectionAlive());
 
-    EXPECT_EQ(client_stream_delegate_->data()[stream_id], kTestMessage);
+    EXPECT_EQ(client_stream_delegate_->data()[stream_id], "Hello");
     // Send a test message from peer 2 to peer 1.
-    char kTestResponse[] = "Response";
-    test::QuicTestMemSliceVector response(
-        {std::make_pair(kTestResponse, strlen(kTestResponse))});
+    test::QuicTestMemSliceVector response = CreateMemSliceVector("Response");
     incoming->WriteMemSlices(response.span(), /*fin=*/false);
     RunTasks();
     // Wait for peer 1 to receive messages.
     ASSERT_TRUE(server_stream_delegate_->has_data());
 
-    EXPECT_EQ(server_stream_delegate_->data()[stream_id], kTestResponse);
+    EXPECT_EQ(server_stream_delegate_->data()[stream_id], "Response");
   }
 
   // Test sending/receiving of messages for two directions.
@@ -163,7 +164,9 @@
     ASSERT_TRUE(client_peer_->CanSendMessage());
 
     // Send message from peer 1 to peer 2.
-    ASSERT_TRUE(server_peer_->SendOrQueueMessage("Message from server"));
+    test::QuicTestMemSliceVector message =
+        CreateMemSliceVector("Message from server");
+    ASSERT_TRUE(server_peer_->SendOrQueueMessage(message.span()));
 
     // First message in each direction should not be queued.
     EXPECT_EQ(server_peer_->send_message_queue_size(), 0u);
@@ -175,7 +178,8 @@
                 testing::ElementsAre("Message from server"));
 
     // Send message from peer 2 to peer 1.
-    ASSERT_TRUE(client_peer_->SendOrQueueMessage("Message from client"));
+    message = CreateMemSliceVector("Message from client");
+    ASSERT_TRUE(client_peer_->SendOrQueueMessage(message.span()));
 
     // First message in each direction should not be queued.
     EXPECT_EQ(client_peer_->send_message_queue_size(), 0u);
@@ -212,7 +216,8 @@
     while (peer_sending->send_message_queue_size() < queue_size) {
       sent_messages.push_back(
           QuicStrCat("Sending message, index=", sent_messages.size()));
-      ASSERT_TRUE(peer_sending->SendOrQueueMessage(sent_messages.back()));
+      ASSERT_TRUE(peer_sending->SendOrQueueMessage(
+          CreateMemSliceVector(sent_messages.back()).span()));
     }
 
     // Wait for peer 2 to receive all messages.
@@ -231,12 +236,15 @@
     // Send message of maximum allowed length.
     std::string message_max_long =
         std::string(server_peer_->GetCurrentLargestMessagePayload(), 'A');
-    ASSERT_TRUE(server_peer_->SendOrQueueMessage(message_max_long));
+    test::QuicTestMemSliceVector message =
+        CreateMemSliceVector(message_max_long);
+    ASSERT_TRUE(server_peer_->SendOrQueueMessage(message.span()));
 
     // Send long message which should fail.
     std::string message_too_long =
         std::string(server_peer_->GetCurrentLargestMessagePayload() + 1, 'B');
-    ASSERT_FALSE(server_peer_->SendOrQueueMessage(message_too_long));
+    message = CreateMemSliceVector(message_too_long);
+    ASSERT_FALSE(server_peer_->SendOrQueueMessage(message.span()));
 
     // Wait for peer 2 to receive message.
     RunTasks();
@@ -375,9 +383,7 @@
   QuartcStream* stream = client_peer_->CreateOutgoingBidirectionalStream();
   stream->SetDelegate(client_stream_delegate_.get());
 
-  char kClientMessage[] = "Hello";
-  test::QuicTestMemSliceVector stream_data(
-      {std::make_pair(kClientMessage, strlen(kClientMessage))});
+  test::QuicTestMemSliceVector stream_data = CreateMemSliceVector("Hello");
   stream->WriteMemSlices(stream_data.span(), /*fin=*/false);
   RunTasks();
 
@@ -412,15 +418,13 @@
 
   client_filter_->set_packets_to_drop(1);
 
-  char kClientMessage[] = "Hello";
-  test::QuicTestMemSliceVector stream_data(
-      {std::make_pair(kClientMessage, strlen(kClientMessage))});
+  test::QuicTestMemSliceVector stream_data = CreateMemSliceVector("Hello");
   stream->WriteMemSlices(stream_data.span(), /*fin=*/false);
   RunTasks();
 
   // Stream data should make it despite packet loss.
   ASSERT_TRUE(server_stream_delegate_->has_data());
-  EXPECT_EQ(server_stream_delegate_->data()[stream_id], kClientMessage);
+  EXPECT_EQ(server_stream_delegate_->data()[stream_id], "Hello");
 }
 
 TEST_F(QuartcSessionTest, StreamRetransmissionDisabled) {
@@ -450,9 +454,7 @@
 
   client_filter_->set_packets_to_drop(1);
 
-  char kMessage[] = "Hello";
-  test::QuicTestMemSliceVector stream_data(
-      {std::make_pair(kMessage, strlen(kMessage))});
+  test::QuicTestMemSliceVector stream_data = CreateMemSliceVector("Hello");
   stream->WriteMemSlices(stream_data.span(), /*fin=*/false);
   simulator_.RunFor(QuicTime::Delta::FromMilliseconds(1));
 
@@ -460,9 +462,8 @@
   QuartcStream* stream_1 = client_peer_->CreateOutgoingBidirectionalStream();
   stream_1->SetDelegate(client_stream_delegate_.get());
 
-  char kMessage1[] = "Second message";
-  test::QuicTestMemSliceVector stream_data_1(
-      {std::make_pair(kMessage1, strlen(kMessage1))});
+  test::QuicTestMemSliceVector stream_data_1 =
+      CreateMemSliceVector("Second message");
   stream_1->WriteMemSlices(stream_data_1.span(), /*fin=*/false);
   RunTasks();