Change SendOrQueueMessage to take a QuicMemSliceSpan.
QuicMemSliceSpan now has a `ConsumeAll` method in its API, which allows us to
move the mem slices into a queue of messages to send. This is now the same
mechanism used to store mem slice spans in a stream or message's send buffer.
It ensures that mem slices are properly ref-counted so that the contents are not
deleted after storing them.
This should enable zero-copy writes down the line. However, currently, we still
have two copies:
- One in the application layer, where we create mem slices by copying data
- One in the session, when converting mem slices back into a mem slice span
Nothing stops us from fixing the former, but it requires a bit of refactoring at
the application layer.
The latter requires an API for building a QuicMemSliceSpan out of QuicMemSlices
without copying them. This should be possible, but doesn't seem to exist today.
gfe-relnote: n/a (Quartc only)
PiperOrigin-RevId: 247442584
Change-Id: Ie1bc1ad2f878ee312a5e13b048fd2233864c3d18
diff --git a/quic/quartc/quartc_session.cc b/quic/quartc/quartc_session.cc
index 4d36305..59bb2c2 100644
--- a/quic/quartc/quartc_session.cc
+++ b/quic/quartc/quartc_session.cc
@@ -41,14 +41,15 @@
GetNextOutgoingBidirectionalStreamId(), QuicStream::kDefaultPriority));
}
-bool QuartcSession::SendOrQueueMessage(std::string message) {
+bool QuartcSession::SendOrQueueMessage(QuicMemSliceSpan message) {
if (!CanSendMessage()) {
QUIC_LOG(ERROR) << "Quic session does not support SendMessage";
return false;
}
- if (message.size() > GetCurrentLargestMessagePayload()) {
- QUIC_LOG(ERROR) << "Message is too big, message_size=" << message.size()
+ if (message.total_length() > GetCurrentLargestMessagePayload()) {
+ QUIC_LOG(ERROR) << "Message is too big, message_size="
+ << message.total_length()
<< ", GetCurrentLargestMessagePayload="
<< GetCurrentLargestMessagePayload();
return false;
@@ -56,7 +57,9 @@
// There may be other messages in send queue, so we have to add message
// to the queue and call queue processing helper.
- send_message_queue_.emplace_back(std::move(message));
+ message.ConsumeAll([this](QuicMemSlice slice) {
+ send_message_queue_.emplace_back(std::move(slice));
+ });
ProcessSendMessageQueue();
@@ -74,7 +77,7 @@
send_message_queue_.front().length());
MessageResult result = SendMessage(storage.ToSpan());
- const size_t message_size = send_message_queue_.front().size();
+ const size_t message_size = send_message_queue_.front().length();
// Handle errors.
switch (result.status) {
diff --git a/quic/quartc/quartc_session.h b/quic/quartc/quartc_session.h
index 9da306e..8e28add 100644
--- a/quic/quartc/quartc_session.h
+++ b/quic/quartc/quartc_session.h
@@ -48,7 +48,7 @@
// support SendMessage API. Other unexpected errors during send will not be
// returned, because messages can be sent later if connection is congestion
// controlled.
- bool SendOrQueueMessage(std::string message);
+ bool SendOrQueueMessage(QuicMemSliceSpan message);
// Returns largest message payload acceptable in SendQuartcMessage.
QuicPacketLength GetCurrentLargestMessagePayload() const {
@@ -190,7 +190,7 @@
// Queue of pending messages sent by SendQuartcMessage that were not sent
// yet or blocked by congestion control. Messages are queued in the order
// of sent by SendOrQueueMessage().
- QuicDeque<std::string> send_message_queue_;
+ QuicDeque<QuicMemSlice> send_message_queue_;
};
class QuartcClientSession : public QuartcSession,
diff --git a/quic/quartc/quartc_session_test.cc b/quic/quartc/quartc_session_test.cc
index c6488a1..75fd20a 100644
--- a/quic/quartc/quartc_session_test.cc
+++ b/quic/quartc/quartc_session_test.cc
@@ -34,6 +34,11 @@
static QuicByteCount kDefaultMaxPacketSize = 1200;
+test::QuicTestMemSliceVector CreateMemSliceVector(QuicStringPiece data) {
+ return test::QuicTestMemSliceVector(
+ {std::pair<char*, size_t>(const_cast<char*>(data.data()), data.size())});
+}
+
class QuartcSessionTest : public QuicTest {
public:
~QuartcSessionTest() override {}
@@ -130,9 +135,7 @@
outgoing_stream->SetDelegate(server_stream_delegate_.get());
// Send a test message from peer 1 to peer 2.
- char kTestMessage[] = "Hello";
- test::QuicTestMemSliceVector data(
- {std::make_pair(kTestMessage, strlen(kTestMessage))});
+ test::QuicTestMemSliceVector data = CreateMemSliceVector("Hello");
outgoing_stream->WriteMemSlices(data.span(), /*fin=*/false);
RunTasks();
@@ -144,17 +147,15 @@
EXPECT_EQ(incoming->id(), stream_id);
EXPECT_TRUE(client_peer_->ShouldKeepConnectionAlive());
- EXPECT_EQ(client_stream_delegate_->data()[stream_id], kTestMessage);
+ EXPECT_EQ(client_stream_delegate_->data()[stream_id], "Hello");
// Send a test message from peer 2 to peer 1.
- char kTestResponse[] = "Response";
- test::QuicTestMemSliceVector response(
- {std::make_pair(kTestResponse, strlen(kTestResponse))});
+ test::QuicTestMemSliceVector response = CreateMemSliceVector("Response");
incoming->WriteMemSlices(response.span(), /*fin=*/false);
RunTasks();
// Wait for peer 1 to receive messages.
ASSERT_TRUE(server_stream_delegate_->has_data());
- EXPECT_EQ(server_stream_delegate_->data()[stream_id], kTestResponse);
+ EXPECT_EQ(server_stream_delegate_->data()[stream_id], "Response");
}
// Test sending/receiving of messages for two directions.
@@ -163,7 +164,9 @@
ASSERT_TRUE(client_peer_->CanSendMessage());
// Send message from peer 1 to peer 2.
- ASSERT_TRUE(server_peer_->SendOrQueueMessage("Message from server"));
+ test::QuicTestMemSliceVector message =
+ CreateMemSliceVector("Message from server");
+ ASSERT_TRUE(server_peer_->SendOrQueueMessage(message.span()));
// First message in each direction should not be queued.
EXPECT_EQ(server_peer_->send_message_queue_size(), 0u);
@@ -175,7 +178,8 @@
testing::ElementsAre("Message from server"));
// Send message from peer 2 to peer 1.
- ASSERT_TRUE(client_peer_->SendOrQueueMessage("Message from client"));
+ message = CreateMemSliceVector("Message from client");
+ ASSERT_TRUE(client_peer_->SendOrQueueMessage(message.span()));
// First message in each direction should not be queued.
EXPECT_EQ(client_peer_->send_message_queue_size(), 0u);
@@ -212,7 +216,8 @@
while (peer_sending->send_message_queue_size() < queue_size) {
sent_messages.push_back(
QuicStrCat("Sending message, index=", sent_messages.size()));
- ASSERT_TRUE(peer_sending->SendOrQueueMessage(sent_messages.back()));
+ ASSERT_TRUE(peer_sending->SendOrQueueMessage(
+ CreateMemSliceVector(sent_messages.back()).span()));
}
// Wait for peer 2 to receive all messages.
@@ -231,12 +236,15 @@
// Send message of maximum allowed length.
std::string message_max_long =
std::string(server_peer_->GetCurrentLargestMessagePayload(), 'A');
- ASSERT_TRUE(server_peer_->SendOrQueueMessage(message_max_long));
+ test::QuicTestMemSliceVector message =
+ CreateMemSliceVector(message_max_long);
+ ASSERT_TRUE(server_peer_->SendOrQueueMessage(message.span()));
// Send long message which should fail.
std::string message_too_long =
std::string(server_peer_->GetCurrentLargestMessagePayload() + 1, 'B');
- ASSERT_FALSE(server_peer_->SendOrQueueMessage(message_too_long));
+ message = CreateMemSliceVector(message_too_long);
+ ASSERT_FALSE(server_peer_->SendOrQueueMessage(message.span()));
// Wait for peer 2 to receive message.
RunTasks();
@@ -375,9 +383,7 @@
QuartcStream* stream = client_peer_->CreateOutgoingBidirectionalStream();
stream->SetDelegate(client_stream_delegate_.get());
- char kClientMessage[] = "Hello";
- test::QuicTestMemSliceVector stream_data(
- {std::make_pair(kClientMessage, strlen(kClientMessage))});
+ test::QuicTestMemSliceVector stream_data = CreateMemSliceVector("Hello");
stream->WriteMemSlices(stream_data.span(), /*fin=*/false);
RunTasks();
@@ -412,15 +418,13 @@
client_filter_->set_packets_to_drop(1);
- char kClientMessage[] = "Hello";
- test::QuicTestMemSliceVector stream_data(
- {std::make_pair(kClientMessage, strlen(kClientMessage))});
+ test::QuicTestMemSliceVector stream_data = CreateMemSliceVector("Hello");
stream->WriteMemSlices(stream_data.span(), /*fin=*/false);
RunTasks();
// Stream data should make it despite packet loss.
ASSERT_TRUE(server_stream_delegate_->has_data());
- EXPECT_EQ(server_stream_delegate_->data()[stream_id], kClientMessage);
+ EXPECT_EQ(server_stream_delegate_->data()[stream_id], "Hello");
}
TEST_F(QuartcSessionTest, StreamRetransmissionDisabled) {
@@ -450,9 +454,7 @@
client_filter_->set_packets_to_drop(1);
- char kMessage[] = "Hello";
- test::QuicTestMemSliceVector stream_data(
- {std::make_pair(kMessage, strlen(kMessage))});
+ test::QuicTestMemSliceVector stream_data = CreateMemSliceVector("Hello");
stream->WriteMemSlices(stream_data.span(), /*fin=*/false);
simulator_.RunFor(QuicTime::Delta::FromMilliseconds(1));
@@ -460,9 +462,8 @@
QuartcStream* stream_1 = client_peer_->CreateOutgoingBidirectionalStream();
stream_1->SetDelegate(client_stream_delegate_.get());
- char kMessage1[] = "Second message";
- test::QuicTestMemSliceVector stream_data_1(
- {std::make_pair(kMessage1, strlen(kMessage1))});
+ test::QuicTestMemSliceVector stream_data_1 =
+ CreateMemSliceVector("Second message");
stream_1->WriteMemSlices(stream_data_1.span(), /*fin=*/false);
RunTasks();
diff --git a/quic/quartc/test/bidi_test_runner.cc b/quic/quartc/test/bidi_test_runner.cc
index 2e9ba87..07ad80e 100644
--- a/quic/quartc/test/bidi_test_runner.cc
+++ b/quic/quartc/test/bidi_test_runner.cc
@@ -90,10 +90,12 @@
bool BidiTestRunner::RunTest(QuicTime::Delta test_duration) {
client_peer_ = QuicMakeUnique<QuartcPeer>(
simulator_->GetClock(), simulator_->GetAlarmFactory(),
- simulator_->GetRandomGenerator(), client_configs_);
+ simulator_->GetRandomGenerator(),
+ simulator_->GetStreamSendBufferAllocator(), client_configs_);
server_peer_ = QuicMakeUnique<QuartcPeer>(
simulator_->GetClock(), simulator_->GetAlarmFactory(),
- simulator_->GetRandomGenerator(), server_configs_);
+ simulator_->GetRandomGenerator(),
+ simulator_->GetStreamSendBufferAllocator(), server_configs_);
QuartcEndpoint::Delegate* server_delegate = server_peer_.get();
if (server_interceptor_) {
diff --git a/quic/quartc/test/quartc_peer.cc b/quic/quartc/test/quartc_peer.cc
index 3918c33..9cecd18 100644
--- a/quic/quartc/test/quartc_peer.cc
+++ b/quic/quartc/test/quartc_peer.cc
@@ -4,6 +4,7 @@
#include "net/third_party/quiche/src/quic/quartc/test/quartc_peer.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_storage.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
namespace quic {
@@ -12,10 +13,12 @@
QuartcPeer::QuartcPeer(const QuicClock* clock,
QuicAlarmFactory* alarm_factory,
QuicRandom* random,
+ QuicBufferAllocator* buffer_allocator,
const std::vector<QuartcDataSource::Config>& configs)
: clock_(clock),
alarm_factory_(alarm_factory),
random_(random),
+ buffer_allocator_(buffer_allocator),
enabled_(false),
session_(nullptr),
configs_(configs) {}
@@ -118,7 +121,9 @@
// Further packetization is not required, as sources are configured to produce
// frames that fit within message payloads.
DCHECK_LE(length, session_->GetCurrentLargestMessagePayload());
- session_->SendOrQueueMessage(std::string(data, length));
+ struct iovec iov = {const_cast<char*>(data), length};
+ QuicMemSliceStorage storage(&iov, 1, buffer_allocator_, length);
+ session_->SendOrQueueMessage(storage.ToSpan());
}
} // namespace test
diff --git a/quic/quartc/test/quartc_peer.h b/quic/quartc/test/quartc_peer.h
index 68cb9ee..8a62ba7 100644
--- a/quic/quartc/test/quartc_peer.h
+++ b/quic/quartc/test/quartc_peer.h
@@ -48,6 +48,7 @@
QuartcPeer(const QuicClock* clock,
QuicAlarmFactory* alarm_factory,
QuicRandom* random,
+ QuicBufferAllocator* buffer_allocator,
const std::vector<QuartcDataSource::Config>& configs);
QuartcPeer(QuartcPeer&) = delete;
QuartcPeer& operator=(QuartcPeer&) = delete;
@@ -94,6 +95,7 @@
const QuicClock* clock_;
QuicAlarmFactory* alarm_factory_;
QuicRandom* random_;
+ QuicBufferAllocator* buffer_allocator_;
// Whether the peer is currently sending.
bool enabled_;
diff --git a/quic/quartc/test/quartc_peer_test.cc b/quic/quartc/test/quartc_peer_test.cc
index 3eca703..fe76acf 100644
--- a/quic/quartc/test/quartc_peer_test.cc
+++ b/quic/quartc/test/quartc_peer_test.cc
@@ -40,10 +40,12 @@
void CreatePeers(const std::vector<QuartcDataSource::Config>& configs) {
client_peer_ = QuicMakeUnique<QuartcPeer>(
simulator_.GetClock(), simulator_.GetAlarmFactory(),
- simulator_.GetRandomGenerator(), configs);
+ simulator_.GetRandomGenerator(),
+ simulator_.GetStreamSendBufferAllocator(), configs);
server_peer_ = QuicMakeUnique<QuartcPeer>(
simulator_.GetClock(), simulator_.GetAlarmFactory(),
- simulator_.GetRandomGenerator(), configs);
+ simulator_.GetRandomGenerator(),
+ simulator_.GetStreamSendBufferAllocator(), configs);
}
void Connect() {