Use absl::Span<QuicMemSlice> for QUIC datagrams.

This also introduces a single-slice version of SendMessage() and some related functions, and updates some of the comments.

PiperOrigin-RevId: 384722430
diff --git a/quic/core/frames/quic_frames_test.cc b/quic/core/frames/quic_frames_test.cc
index c11be9e..038924e 100644
--- a/quic/core/frames/quic_frames_test.cc
+++ b/quic/core/frames/quic_frames_test.cc
@@ -543,10 +543,8 @@
 
 TEST_F(QuicFramesTest, CopyQuicFrames) {
   QuicFrames frames;
-  SimpleBufferAllocator allocator;
-  QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);
   QuicMessageFrame* message_frame =
-      new QuicMessageFrame(1, MakeSpan(&allocator, "message", &storage));
+      new QuicMessageFrame(1, MemSliceFromString("message"));
   // Construct a frame list.
   for (uint8_t i = 0; i < NUM_FRAME_TYPES; ++i) {
     switch (i) {
@@ -626,7 +624,7 @@
     }
   }
 
-  QuicFrames copy = CopyQuicFrames(&allocator, frames);
+  QuicFrames copy = CopyQuicFrames(SimpleBufferAllocator::Get(), frames);
   ASSERT_EQ(NUM_FRAME_TYPES, copy.size());
   for (uint8_t i = 0; i < NUM_FRAME_TYPES; ++i) {
     EXPECT_EQ(i, copy[i].type);
diff --git a/quic/core/frames/quic_message_frame.cc b/quic/core/frames/quic_message_frame.cc
index 054303a..1ac0de6 100644
--- a/quic/core/frames/quic_message_frame.cc
+++ b/quic/core/frames/quic_message_frame.cc
@@ -14,13 +14,18 @@
     : message_id(message_id), data(nullptr), message_length(0) {}
 
 QuicMessageFrame::QuicMessageFrame(QuicMessageId message_id,
-                                   QuicMemSliceSpan span)
+                                   absl::Span<QuicMemSlice> span)
     : message_id(message_id), data(nullptr), message_length(0) {
-  span.ConsumeAll([&](QuicMemSlice slice) {
+  for (QuicMemSlice& slice : span) {
+    if (slice.empty()) {
+      continue;
+    }
     message_length += slice.length();
     message_data.push_back(std::move(slice));
-  });
+  }
 }
+QuicMessageFrame::QuicMessageFrame(QuicMessageId message_id, QuicMemSlice slice)
+    : QuicMessageFrame(message_id, absl::MakeSpan(&slice, 1)) {}
 
 QuicMessageFrame::QuicMessageFrame(const char* data, QuicPacketLength length)
     : message_id(0), data(data), message_length(length) {}
diff --git a/quic/core/frames/quic_message_frame.h b/quic/core/frames/quic_message_frame.h
index 3064ddb..1d090c9 100644
--- a/quic/core/frames/quic_message_frame.h
+++ b/quic/core/frames/quic_message_frame.h
@@ -6,11 +6,11 @@
 #define QUICHE_QUIC_CORE_FRAMES_QUIC_MESSAGE_FRAME_H_
 
 #include "absl/container/inlined_vector.h"
+#include "absl/types/span.h"
 #include "quic/core/quic_types.h"
 #include "quic/platform/api/quic_containers.h"
 #include "quic/platform/api/quic_export.h"
 #include "quic/platform/api/quic_mem_slice.h"
-#include "quic/platform/api/quic_mem_slice_span.h"
 
 namespace quic {
 
@@ -19,7 +19,8 @@
 struct QUIC_EXPORT_PRIVATE QuicMessageFrame {
   QuicMessageFrame() = default;
   explicit QuicMessageFrame(QuicMessageId message_id);
-  QuicMessageFrame(QuicMessageId message_id, QuicMemSliceSpan span);
+  QuicMessageFrame(QuicMessageId message_id, absl::Span<QuicMemSlice> span);
+  QuicMessageFrame(QuicMessageId message_id, QuicMemSlice slice);
   QuicMessageFrame(const char* data, QuicPacketLength length);
 
   QuicMessageFrame(const QuicMessageFrame& other) = delete;
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc
index df52e0f..913b5b2 100644
--- a/quic/core/http/end_to_end_test.cc
+++ b/quic/core/http/end_to_end_test.cc
@@ -4738,20 +4738,15 @@
   ASSERT_LT(0, client_session->GetCurrentLargestMessagePayload());
 
   std::string message_string(kMaxOutgoingPacketSize, 'a');
-  absl::string_view message_buffer(message_string);
   QuicRandom* random =
       QuicConnectionPeer::GetHelper(client_connection)->GetRandomGenerator();
-  QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);
   {
     QuicConnection::ScopedPacketFlusher flusher(client_session->connection());
     // Verify the largest message gets successfully sent.
     EXPECT_EQ(MessageResult(MESSAGE_STATUS_SUCCESS, 1),
-              client_session->SendMessage(MakeSpan(
-                  client_connection->helper()->GetStreamSendBufferAllocator(),
-                  absl::string_view(
-                      message_buffer.data(),
-                      client_session->GetCurrentLargestMessagePayload()),
-                  &storage)));
+              client_session->SendMessage(MemSliceFromString(absl::string_view(
+                  message_string.data(),
+                  client_session->GetCurrentLargestMessagePayload()))));
     // Send more messages with size (0, largest_payload] until connection is
     // write blocked.
     const int kTestMaxNumberOfMessages = 100;
@@ -4760,9 +4755,8 @@
           random->RandUint64() %
               client_session->GetGuaranteedLargestMessagePayload() +
           1;
-      MessageResult result = client_session->SendMessage(MakeSpan(
-          client_connection->helper()->GetStreamSendBufferAllocator(),
-          absl::string_view(message_buffer.data(), message_length), &storage));
+      MessageResult result = client_session->SendMessage(MemSliceFromString(
+          absl::string_view(message_string.data(), message_length)));
       if (result.status == MESSAGE_STATUS_BLOCKED) {
         // Connection is write blocked.
         break;
@@ -4774,12 +4768,9 @@
   client_->WaitForDelayedAcks();
   EXPECT_EQ(MESSAGE_STATUS_TOO_LARGE,
             client_session
-                ->SendMessage(MakeSpan(
-                    client_connection->helper()->GetStreamSendBufferAllocator(),
-                    absl::string_view(
-                        message_buffer.data(),
-                        client_session->GetCurrentLargestMessagePayload() + 1),
-                    &storage))
+                ->SendMessage(MemSliceFromString(absl::string_view(
+                    message_string.data(),
+                    client_session->GetCurrentLargestMessagePayload() + 1)))
                 .status);
   EXPECT_THAT(client_->connection_error(), IsQuicNoError());
 }
diff --git a/quic/core/quic_connection.cc b/quic/core/quic_connection.cc
index bb84ef7..9defbe3 100644
--- a/quic/core/quic_connection.cc
+++ b/quic/core/quic_connection.cc
@@ -5862,14 +5862,14 @@
 }
 
 MessageStatus QuicConnection::SendMessage(QuicMessageId message_id,
-                                          QuicMemSliceSpan message,
+                                          absl::Span<QuicMemSlice> message,
                                           bool flush) {
   if (!VersionSupportsMessageFrames(transport_version())) {
     QUIC_BUG(quic_bug_10511_38)
         << "MESSAGE frame is not supported for version " << transport_version();
     return MESSAGE_STATUS_UNSUPPORTED;
   }
-  if (message.total_length() > GetCurrentLargestMessagePayload()) {
+  if (MemSliceSpanTotalSize(message) > GetCurrentLargestMessagePayload()) {
     return MESSAGE_STATUS_TOO_LARGE;
   }
   if (!connected_ || (!flush && !CanWrite(HAS_RETRANSMITTABLE_DATA))) {
diff --git a/quic/core/quic_connection.h b/quic/core/quic_connection.h
index 19faa19..9a02675 100644
--- a/quic/core/quic_connection.h
+++ b/quic/core/quic_connection.h
@@ -987,7 +987,7 @@
   // If |flush| is false, this will return a MESSAGE_STATUS_BLOCKED
   // when the connection is deemed unwritable.
   virtual MessageStatus SendMessage(QuicMessageId message_id,
-                                    QuicMemSliceSpan message,
+                                    absl::Span<QuicMemSlice> message,
                                     bool flush);
 
   // Returns the largest payload that will fit into a single MESSAGE frame.
diff --git a/quic/core/quic_connection_test.cc b/quic/core/quic_connection_test.cc
index 865daa5..5e6c8ee 100644
--- a/quic/core/quic_connection_test.cc
+++ b/quic/core/quic_connection_test.cc
@@ -1103,12 +1103,9 @@
 
   MessageStatus SendMessage(absl::string_view message) {
     connection_.SetDefaultEncryptionLevel(ENCRYPTION_FORWARD_SECURE);
-    QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);
-    return connection_.SendMessage(
-        1,
-        MakeSpan(connection_.helper()->GetStreamSendBufferAllocator(), message,
-                 &storage),
-        false);
+    QuicMemSlice slice(QuicBuffer::Copy(
+        connection_.helper()->GetStreamSendBufferAllocator(), message));
+    return connection_.SendMessage(1, absl::MakeSpan(&slice, 1), false);
   }
 
   void ProcessAckPacket(uint64_t packet_number, QuicAckFrame* frame) {
@@ -8780,8 +8777,7 @@
     connection_.SetFromConfig(config);
   }
   std::string message(connection_.GetCurrentLargestMessagePayload() * 2, 'a');
-  absl::string_view message_data(message);
-  QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);
+  QuicMemSlice slice;
   {
     QuicConnection::ScopedPacketFlusher flusher(&connection_);
     connection_.SendStreamData3();
@@ -8789,36 +8785,23 @@
     // get sent, one contains stream frame, and the other only contains the
     // message frame.
     EXPECT_CALL(*send_algorithm_, OnPacketSent(_, _, _, _, _)).Times(2);
+    slice = MemSliceFromString(absl::string_view(
+        message.data(), connection_.GetCurrentLargestMessagePayload()));
     EXPECT_EQ(MESSAGE_STATUS_SUCCESS,
-              connection_.SendMessage(
-                  1,
-                  MakeSpan(connection_.helper()->GetStreamSendBufferAllocator(),
-                           absl::string_view(
-                               message_data.data(),
-                               connection_.GetCurrentLargestMessagePayload()),
-                           &storage),
-                  false));
+              connection_.SendMessage(1, absl::MakeSpan(&slice, 1), false));
   }
   // Fail to send a message if connection is congestion control blocked.
   EXPECT_CALL(*send_algorithm_, CanSend(_)).WillOnce(Return(false));
+  slice = MemSliceFromString("message");
   EXPECT_EQ(MESSAGE_STATUS_BLOCKED,
-            connection_.SendMessage(
-                2,
-                MakeSpan(connection_.helper()->GetStreamSendBufferAllocator(),
-                         "message", &storage),
-                false));
+            connection_.SendMessage(2, absl::MakeSpan(&slice, 1), false));
 
   // Always fail to send a message which cannot fit into one packet.
   EXPECT_CALL(*send_algorithm_, OnPacketSent(_, _, _, _, _)).Times(0);
+  slice = MemSliceFromString(absl::string_view(
+      message.data(), connection_.GetCurrentLargestMessagePayload() + 1));
   EXPECT_EQ(MESSAGE_STATUS_TOO_LARGE,
-            connection_.SendMessage(
-                3,
-                MakeSpan(connection_.helper()->GetStreamSendBufferAllocator(),
-                         absl::string_view(
-                             message_data.data(),
-                             connection_.GetCurrentLargestMessagePayload() + 1),
-                         &storage),
-                false));
+            connection_.SendMessage(3, absl::MakeSpan(&slice, 1), false));
 }
 
 TEST_P(QuicConnectionTest, GetCurrentLargestMessagePayload) {
diff --git a/quic/core/quic_datagram_queue.cc b/quic/core/quic_datagram_queue.cc
index d4513e4..e6f3e6f 100644
--- a/quic/core/quic_datagram_queue.cc
+++ b/quic/core/quic_datagram_queue.cc
@@ -4,6 +4,7 @@
 
 #include "quic/core/quic_datagram_queue.h"
 
+#include "absl/types/span.h"
 #include "quic/core/quic_constants.h"
 #include "quic/core/quic_session.h"
 #include "quic/core/quic_time.h"
@@ -30,8 +31,7 @@
   // the datagrams are sent in the same order that they were sent by the
   // application.
   if (queue_.empty()) {
-    QuicMemSliceSpan span(&datagram);
-    MessageResult result = session_->SendMessage(span);
+    MessageResult result = session_->SendMessage(absl::MakeSpan(&datagram, 1));
     if (result.status != MESSAGE_STATUS_BLOCKED) {
       if (observer_) {
         observer_->OnDatagramProcessed(result.status);
@@ -51,8 +51,8 @@
     return absl::nullopt;
   }
 
-  QuicMemSliceSpan span(&queue_.front().datagram);
-  MessageResult result = session_->SendMessage(span);
+  MessageResult result =
+      session_->SendMessage(absl::MakeSpan(&queue_.front().datagram, 1));
   if (result.status != MESSAGE_STATUS_BLOCKED) {
     queue_.pop_front();
     if (observer_) {
diff --git a/quic/core/quic_datagram_queue_test.cc b/quic/core/quic_datagram_queue_test.cc
index f47bbb8..68a876b 100644
--- a/quic/core/quic_datagram_queue_test.cc
+++ b/quic/core/quic_datagram_queue_test.cc
@@ -176,8 +176,9 @@
   std::vector<std::string> messages;
   EXPECT_CALL(*connection_, SendMessage(_, _, _))
       .WillRepeatedly([&messages](QuicMessageId /*id*/,
-                                  QuicMemSliceSpan message, bool /*flush*/) {
-        messages.push_back(std::string(message.GetData(0)));
+                                  absl::Span<QuicMemSlice> message,
+                                  bool /*flush*/) {
+        messages.push_back(std::string(message[0].AsStringView()));
         return MESSAGE_STATUS_SUCCESS;
       });
   EXPECT_EQ(2u, queue_.SendDatagrams());
diff --git a/quic/core/quic_framer_test.cc b/quic/core/quic_framer_test.cc
index edc4ace..f36bf5c 100644
--- a/quic/core/quic_framer_test.cc
+++ b/quic/core/quic_framer_test.cc
@@ -8947,10 +8947,9 @@
   header.reset_flag = false;
   header.version_flag = false;
   header.packet_number = kPacketNumber;
-  QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);
 
-  QuicMessageFrame frame(1, MakeSpan(&allocator_, "message", &storage));
-  QuicMessageFrame frame2(2, MakeSpan(&allocator_, "message2", &storage));
+  QuicMessageFrame frame(1, MemSliceFromString("message"));
+  QuicMessageFrame frame2(2, MemSliceFromString("message2"));
   QuicFrames frames = {QuicFrame(&frame), QuicFrame(&frame2)};
 
   // clang-format off
diff --git a/quic/core/quic_packet_creator.cc b/quic/core/quic_packet_creator.cc
index 9c4e71e..3be1067 100644
--- a/quic/core/quic_packet_creator.cc
+++ b/quic/core/quic_packet_creator.cc
@@ -1597,14 +1597,14 @@
   next_transmission_type_ = type;
 }
 
-MessageStatus QuicPacketCreator::AddMessageFrame(QuicMessageId message_id,
-                                                 QuicMemSliceSpan message) {
+MessageStatus QuicPacketCreator::AddMessageFrame(
+    QuicMessageId message_id, absl::Span<QuicMemSlice> message) {
   QUIC_BUG_IF(quic_bug_10752_33, !flusher_attached_)
       << ENDPOINT
       << "Packet flusher is not attached when "
          "generator tries to add message frame.";
   MaybeBundleAckOpportunistically();
-  const QuicByteCount message_length = message.total_length();
+  const QuicByteCount message_length = MemSliceSpanTotalSize(message);
   if (message_length > GetCurrentLargestMessagePayload()) {
     return MESSAGE_STATUS_TOO_LARGE;
   }
@@ -1619,6 +1619,8 @@
     delete frame;
     return MESSAGE_STATUS_INTERNAL_ERROR;
   }
+  QUICHE_DCHECK_EQ(MemSliceSpanTotalSize(message),
+                   0u);  // Ensure the old slices are empty.
   return MESSAGE_STATUS_SUCCESS;
 }
 
diff --git a/quic/core/quic_packet_creator.h b/quic/core/quic_packet_creator.h
index d4a5805..abf24f6 100644
--- a/quic/core/quic_packet_creator.h
+++ b/quic/core/quic_packet_creator.h
@@ -417,7 +417,7 @@
 
   // Tries to add a message frame containing |message| and returns the status.
   MessageStatus AddMessageFrame(QuicMessageId message_id,
-                                QuicMemSliceSpan message);
+                                absl::Span<QuicMemSlice> message);
 
   // Returns the largest payload that will fit into a single MESSAGE frame.
   QuicPacketLength GetCurrentLargestMessagePayload() const;
diff --git a/quic/core/quic_packet_creator_test.cc b/quic/core/quic_packet_creator_test.cc
index 57d1018..a4e7a0a 100644
--- a/quic/core/quic_packet_creator_test.cc
+++ b/quic/core/quic_packet_creator_test.cc
@@ -1767,25 +1767,24 @@
       .Times(3)
       .WillRepeatedly(
           Invoke(this, &QuicPacketCreatorTest::ClearSerializedPacketForTests));
-  QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);
   // Verify that there is enough room for the largest message payload.
   EXPECT_TRUE(creator_.HasRoomForMessageFrame(
       creator_.GetCurrentLargestMessagePayload()));
-  std::string message(creator_.GetCurrentLargestMessagePayload(), 'a');
+  std::string large_message(creator_.GetCurrentLargestMessagePayload(), 'a');
   QuicMessageFrame* message_frame =
-      new QuicMessageFrame(1, MakeSpan(&allocator_, message, &storage));
+      new QuicMessageFrame(1, MemSliceFromString(large_message));
   EXPECT_TRUE(creator_.AddFrame(QuicFrame(message_frame), NOT_RETRANSMISSION));
   EXPECT_TRUE(creator_.HasPendingFrames());
   creator_.FlushCurrentPacket();
 
   QuicMessageFrame* frame2 =
-      new QuicMessageFrame(2, MakeSpan(&allocator_, "message", &storage));
+      new QuicMessageFrame(2, MemSliceFromString("message"));
   EXPECT_TRUE(creator_.AddFrame(QuicFrame(frame2), NOT_RETRANSMISSION));
   EXPECT_TRUE(creator_.HasPendingFrames());
   // Verify if a new frame is added, 1 byte message length will be added.
   EXPECT_EQ(1u, creator_.ExpansionOnNewFrame());
   QuicMessageFrame* frame3 =
-      new QuicMessageFrame(3, MakeSpan(&allocator_, "message2", &storage));
+      new QuicMessageFrame(3, MemSliceFromString("message2"));
   EXPECT_TRUE(creator_.AddFrame(QuicFrame(frame3), NOT_RETRANSMISSION));
   EXPECT_EQ(1u, creator_.ExpansionOnNewFrame());
   creator_.FlushCurrentPacket();
@@ -1798,14 +1797,14 @@
       stream_id, &iov_, 1u, iov_.iov_len, 0u, 0u, false, false,
       NOT_RETRANSMISSION, &frame));
   QuicMessageFrame* frame4 =
-      new QuicMessageFrame(4, MakeSpan(&allocator_, "message", &storage));
+      new QuicMessageFrame(4, MemSliceFromString("message"));
   EXPECT_TRUE(creator_.AddFrame(QuicFrame(frame4), NOT_RETRANSMISSION));
   EXPECT_TRUE(creator_.HasPendingFrames());
   // Verify there is not enough room for largest payload.
   EXPECT_FALSE(creator_.HasRoomForMessageFrame(
       creator_.GetCurrentLargestMessagePayload()));
   // Add largest message will causes the flush of the stream frame.
-  QuicMessageFrame frame5(5, MakeSpan(&allocator_, message, &storage));
+  QuicMessageFrame frame5(5, MemSliceFromString(large_message));
   EXPECT_FALSE(creator_.AddFrame(QuicFrame(&frame5), NOT_RETRANSMISSION));
   EXPECT_FALSE(creator_.HasPendingFrames());
 }
@@ -1818,8 +1817,6 @@
     creator_.SetMaxDatagramFrameSize(kMaxAcceptedDatagramFrameSize);
   }
   std::string message_data(kDefaultMaxPacketSize, 'a');
-  absl::string_view message_buffer(message_data);
-  QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);
   // Test all possible encryption levels of message frames.
   for (EncryptionLevel level :
        {ENCRYPTION_ZERO_RTT, ENCRYPTION_FORWARD_SECURE}) {
@@ -1828,10 +1825,9 @@
     for (size_t message_size = 0;
          message_size <= creator_.GetCurrentLargestMessagePayload();
          ++message_size) {
-      QuicMessageFrame* frame = new QuicMessageFrame(
-          0, MakeSpan(&allocator_,
-                      absl::string_view(message_buffer.data(), message_size),
-                      &storage));
+      QuicMessageFrame* frame =
+          new QuicMessageFrame(0, MemSliceFromString(absl::string_view(
+                                      message_data.data(), message_size)));
       EXPECT_TRUE(creator_.AddFrame(QuicFrame(frame), NOT_RETRANSMISSION));
       EXPECT_TRUE(creator_.HasPendingFrames());
 
@@ -2450,12 +2446,13 @@
   }
 
   MessageStatus AddMessageFrame(QuicMessageId message_id,
-                                QuicMemSliceSpan message) {
+                                QuicMemSlice message) {
     if (!has_ack() && delegate_->ShouldGeneratePacket(NO_RETRANSMITTABLE_DATA,
                                                       NOT_HANDSHAKE)) {
       EXPECT_CALL(*delegate_, MaybeBundleAckOpportunistically()).Times(1);
     }
-    return QuicPacketCreator::AddMessageFrame(message_id, message);
+    return QuicPacketCreator::AddMessageFrame(message_id,
+                                              absl::MakeSpan(&message, 1));
   }
 
   size_t ConsumeCryptoData(EncryptionLevel level,
@@ -3792,7 +3789,6 @@
   if (framer_.version().UsesTls()) {
     creator_.SetMaxDatagramFrameSize(kMaxAcceptedDatagramFrameSize);
   }
-  quic::QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);
   delegate_.SetCanWriteAnything();
   EXPECT_CALL(delegate_, OnSerializedPacket(_))
       .WillOnce(
@@ -3802,30 +3798,23 @@
   creator_.ConsumeData(QuicUtils::GetFirstBidirectionalStreamId(
                            framer_.transport_version(), Perspective::IS_CLIENT),
                        &iov_, 1u, iov_.iov_len, 0, FIN);
-  EXPECT_EQ(
-      MESSAGE_STATUS_SUCCESS,
-      creator_.AddMessageFrame(1, MakeSpan(&allocator_, "message", &storage)));
+  EXPECT_EQ(MESSAGE_STATUS_SUCCESS,
+            creator_.AddMessageFrame(1, MemSliceFromString("message")));
   EXPECT_TRUE(creator_.HasPendingFrames());
   EXPECT_TRUE(creator_.HasPendingRetransmittableFrames());
 
   // Add a message which causes the flush of current packet.
-  EXPECT_EQ(
-      MESSAGE_STATUS_SUCCESS,
-      creator_.AddMessageFrame(
-          2,
-          MakeSpan(&allocator_,
-                   std::string(creator_.GetCurrentLargestMessagePayload(), 'a'),
-                   &storage)));
+  EXPECT_EQ(MESSAGE_STATUS_SUCCESS,
+            creator_.AddMessageFrame(
+                2, MemSliceFromString(std::string(
+                       creator_.GetCurrentLargestMessagePayload(), 'a'))));
   EXPECT_TRUE(creator_.HasPendingRetransmittableFrames());
 
   // Failed to send messages which cannot fit into one packet.
-  EXPECT_EQ(
-      MESSAGE_STATUS_TOO_LARGE,
-      creator_.AddMessageFrame(
-          3, MakeSpan(&allocator_,
-                      std::string(
-                          creator_.GetCurrentLargestMessagePayload() + 10, 'a'),
-                      &storage)));
+  EXPECT_EQ(MESSAGE_STATUS_TOO_LARGE,
+            creator_.AddMessageFrame(
+                3, MemSliceFromString(std::string(
+                       creator_.GetCurrentLargestMessagePayload() + 10, 'a'))));
 }
 
 TEST_F(QuicPacketCreatorMultiplePacketsTest, ConnectionId) {
diff --git a/quic/core/quic_sent_packet_manager_test.cc b/quic/core/quic_sent_packet_manager_test.cc
index b2d071e..e457e09 100644
--- a/quic/core/quic_sent_packet_manager_test.cc
+++ b/quic/core/quic_sent_packet_manager_test.cc
@@ -4637,8 +4637,7 @@
   QuicMessageFrame* message_frame = nullptr;
   {
     QuicMemSlice slice(MakeUniqueBuffer(&allocator_, 1024), 1024);
-    message_frame =
-        new QuicMessageFrame(/*message_id=*/1, QuicMemSliceSpan(&slice));
+    message_frame = new QuicMessageFrame(/*message_id=*/1, std::move(slice));
     EXPECT_FALSE(message_frame->message_data.empty());
     EXPECT_EQ(message_frame->message_length, 1024);
 
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index b9dd7e7..f4b88a3 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -2468,11 +2468,16 @@
   connection_->SetTransmissionType(type);
 }
 
-MessageResult QuicSession::SendMessage(QuicMemSliceSpan message) {
+MessageResult QuicSession::SendMessage(absl::Span<QuicMemSlice> message) {
   return SendMessage(message, /*flush=*/false);
 }
 
-MessageResult QuicSession::SendMessage(QuicMemSliceSpan message, bool flush) {
+MessageResult QuicSession::SendMessage(QuicMemSlice message) {
+  return SendMessage(absl::MakeSpan(&message, 1), /*flush=*/false);
+}
+
+MessageResult QuicSession::SendMessage(absl::Span<QuicMemSlice> message,
+                                       bool flush) {
   QUICHE_DCHECK(connection_->connected())
       << ENDPOINT << "Try to write messages when connection is closed.";
   if (!IsEncryptionEstablished()) {
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index 3fb03b9..d48f0d5 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -17,6 +17,7 @@
 #include "absl/container/flat_hash_map.h"
 #include "absl/strings/string_view.h"
 #include "absl/types/optional.h"
+#include "absl/types/span.h"
 #include "quic/core/crypto/tls_connection.h"
 #include "quic/core/frames/quic_ack_frequency_frame.h"
 #include "quic/core/handshaker_delegate_interface.h"
@@ -38,6 +39,7 @@
 #include "quic/core/uber_quic_stream_id_manager.h"
 #include "quic/platform/api/quic_export.h"
 #include "quic/platform/api/quic_flags.h"
+#include "quic/platform/api/quic_mem_slice.h"
 #include "quic/platform/api/quic_socket_address.h"
 #include "common/quiche_linked_hash_map.h"
 
@@ -209,31 +211,39 @@
                                 const QuicSocketAddress& peer_address,
                                 const QuicReceivedPacket& packet);
 
-  // Called by application to send |message|. Data copy can be avoided if
-  // |message| is provided in reference counted memory.
-  // Please note, |message| provided in reference counted memory would be moved
-  // internally when message is successfully sent. Thereafter, it would be
-  // undefined behavior if callers try to access the slices through their own
-  // copy of the span object.
-  // Returns the message result which includes the message status and message ID
-  // (valid if the write succeeds). SendMessage flushes a message packet even it
-  // is not full. If the application wants to bundle other data in the same
-  // packet, please consider adding a packet flusher around the SendMessage
-  // and/or WritevData calls.
+  // Sends |message| as a QUIC DATAGRAM frame (QUIC MESSAGE frame in gQUIC).
+  // See <https://datatracker.ietf.org/doc/html/draft-ietf-quic-datagram> for
+  // more details.
   //
-  // OnMessageAcked and OnMessageLost are called when a particular message gets
-  // acked or lost.
+  // Returns a MessageResult struct which includes the status of the write
+  // operation and a message ID.  The message ID (not sent on the wire) can be
+  // used to track the message; OnMessageAcked and OnMessageLost are called when
+  // a specific message gets acked or lost.
+  //
+  // If the write operation is successful, all of the slices in |message| are
+  // consumed, leaving them empty.  If MESSAGE_STATUS_INTERNAL_ERROR is
+  // returned, the slices in question may or may not be consumed; it is no
+  // longer safe to access those.  For all other status codes, |message| is kept
+  // intact.
   //
   // Note that SendMessage will fail with status = MESSAGE_STATUS_BLOCKED
-  // if connection is congestion control blocked or underlying socket is write
-  // blocked. In this case the caller can retry sending message again when
+  // if the connection is congestion control blocked or the underlying socket is
+  // write blocked. In this case the caller can retry sending message again when
   // connection becomes available, for example after getting OnCanWrite()
   // callback.
-  MessageResult SendMessage(QuicMemSliceSpan message);
+  //
+  // SendMessage flushes the current packet even it is not full; if the
+  // application needs to bundle other data in the same packet, consider using
+  // QuicConnection::ScopedPacketFlusher around the relevant write operations.
+  MessageResult SendMessage(absl::Span<QuicMemSlice> message);
 
   // Same as above SendMessage, except caller can specify if the given |message|
   // should be flushed even if the underlying connection is deemed unwritable.
-  MessageResult SendMessage(QuicMemSliceSpan message, bool flush);
+  MessageResult SendMessage(absl::Span<QuicMemSlice> message, bool flush);
+
+  // Single-slice version of SendMessage().  Unlike the version above, this
+  // version always takes ownership of the slice.
+  MessageResult SendMessage(QuicMemSlice message);
 
   // Called when message with |message_id| gets acked.
   virtual void OnMessageAcked(QuicMessageId message_id,
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index a79f5ba..ba5b975 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -2450,36 +2450,26 @@
 TEST_P(QuicSessionTestServer, SendMessage) {
   // Cannot send message when encryption is not established.
   EXPECT_FALSE(session_.OneRttKeysAvailable());
-  quic::QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);
   EXPECT_EQ(MessageResult(MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED, 0),
-            session_.SendMessage(
-                MakeSpan(connection_->helper()->GetStreamSendBufferAllocator(),
-                         "", &storage)));
+            session_.SendMessage(MemSliceFromString("")));
 
   CompleteHandshake();
   EXPECT_TRUE(session_.OneRttKeysAvailable());
 
-  absl::string_view message;
   EXPECT_CALL(*connection_, SendMessage(1, _, false))
       .WillOnce(Return(MESSAGE_STATUS_SUCCESS));
   EXPECT_EQ(MessageResult(MESSAGE_STATUS_SUCCESS, 1),
-            session_.SendMessage(
-                MakeSpan(connection_->helper()->GetStreamSendBufferAllocator(),
-                         message, &storage)));
+            session_.SendMessage(MemSliceFromString("")));
   // Verify message_id increases.
   EXPECT_CALL(*connection_, SendMessage(2, _, false))
       .WillOnce(Return(MESSAGE_STATUS_TOO_LARGE));
   EXPECT_EQ(MessageResult(MESSAGE_STATUS_TOO_LARGE, 0),
-            session_.SendMessage(
-                MakeSpan(connection_->helper()->GetStreamSendBufferAllocator(),
-                         message, &storage)));
+            session_.SendMessage(MemSliceFromString("")));
   // Verify unsent message does not consume a message_id.
   EXPECT_CALL(*connection_, SendMessage(2, _, false))
       .WillOnce(Return(MESSAGE_STATUS_SUCCESS));
   EXPECT_EQ(MessageResult(MESSAGE_STATUS_SUCCESS, 2),
-            session_.SendMessage(
-                MakeSpan(connection_->helper()->GetStreamSendBufferAllocator(),
-                         message, &storage)));
+            session_.SendMessage(MemSliceFromString("")));
 
   QuicMessageFrame frame(1);
   QuicMessageFrame frame2(2);
diff --git a/quic/masque/masque_compression_engine.cc b/quic/masque/masque_compression_engine.cc
index e4471dd..035b9f9 100644
--- a/quic/masque/masque_compression_engine.cc
+++ b/quic/masque/masque_compression_engine.cc
@@ -280,9 +280,8 @@
     return;
   }
 
-  QuicMemSlice slice(std::move(buffer));
   MessageResult message_result =
-      masque_session_->SendMessage(QuicMemSliceSpan(&slice));
+      masque_session_->SendMessage(QuicMemSlice(std::move(buffer)));
 
   QUIC_DVLOG(1) << "Sent packet compressed with flow ID " << flow_id
                 << " and got message result " << message_result;
diff --git a/quic/qbone/qbone_session_base.cc b/quic/qbone/qbone_session_base.cc
index e984196..2f2f31e 100644
--- a/quic/qbone/qbone_session_base.cc
+++ b/quic/qbone/qbone_session_base.cc
@@ -140,11 +140,9 @@
   }
 
   if (send_packets_as_messages_) {
-    QuicUniqueBufferPtr buffer = MakeUniqueBuffer(
-        connection()->helper()->GetStreamSendBufferAllocator(), packet.size());
-    memcpy(buffer.get(), packet.data(), packet.size());
-    QuicMemSlice slice(std::move(buffer), packet.size());
-    switch (SendMessage(QuicMemSliceSpan(&slice), /*flush=*/true).status) {
+    QuicMemSlice slice(QuicBuffer::Copy(
+        connection()->helper()->GetStreamSendBufferAllocator(), packet));
+    switch (SendMessage(absl::MakeSpan(&slice, 1), /*flush=*/true).status) {
       case MESSAGE_STATUS_SUCCESS:
         break;
       case MESSAGE_STATUS_TOO_LARGE: {
diff --git a/quic/test_tools/quic_test_utils.cc b/quic/test_tools/quic_test_utils.cc
index 3dd3e33..8081c36 100644
--- a/quic/test_tools/quic_test_utils.cc
+++ b/quic/test_tools/quic_test_utils.cc
@@ -1323,6 +1323,10 @@
 }
 
 QuicMemSlice MemSliceFromString(absl::string_view data) {
+  if (data.empty()) {
+    return QuicMemSlice();
+  }
+
   static SimpleBufferAllocator* allocator = new SimpleBufferAllocator();
   return QuicMemSlice(QuicBuffer::Copy(allocator, data));
 }
diff --git a/quic/test_tools/quic_test_utils.h b/quic/test_tools/quic_test_utils.h
index eab0624..ccd5039 100644
--- a/quic/test_tools/quic_test_utils.h
+++ b/quic/test_tools/quic_test_utils.h
@@ -763,10 +763,8 @@
               (QuicStreamId, QuicRstStreamErrorCode),
               (override));
   MOCK_METHOD(bool, SendControlFrame, (const QuicFrame& frame), (override));
-  MOCK_METHOD(MessageStatus,
-              SendMessage,
-              (QuicMessageId, QuicMemSliceSpan, bool),
-              (override));
+  MOCK_METHOD(MessageStatus, SendMessage,
+              (QuicMessageId, absl::Span<QuicMemSlice>, bool), (override));
   MOCK_METHOD(bool,
               SendPathChallenge,
               (const QuicPathFrameBuffer&,