QRTP: Implement Datagram Transport
1. Add datagram transport interface (for now experimental in media_transport/api, but later will be moved to WebRTC)
2. Implement QuartcDatagramTransport and QuartcDatagramTrasportTest.
3. Add support for datagram fusing
4. Add OnMessageSent notification when datagram was sent (sometimes it is queued if connection is congestion controlled). This notification will be needed to have accurate sent timestamps for WebRTC congestion control.
Full prototype: see cl/241055581
gfe-relnote: n/a (Quartc only)
PiperOrigin-RevId: 248007007
Change-Id: Idfb717b91a251b148b1dc970d0217e28705248ff
diff --git a/quic/quartc/quartc_fakes.h b/quic/quartc/quartc_fakes.h
index ddff641..7048a5e 100644
--- a/quic/quartc/quartc_fakes.h
+++ b/quic/quartc/quartc_fakes.h
@@ -79,24 +79,34 @@
incoming_messages_.emplace_back(message);
}
+ void OnMessageSent(int64_t datagram_id) override {
+ sent_datagram_ids_.push_back(datagram_id);
+ }
+
void OnCongestionControlChange(QuicBandwidth bandwidth_estimate,
QuicBandwidth pacing_rate,
QuicTime::Delta latest_rtt) override {}
- QuartcStream* last_incoming_stream() { return last_incoming_stream_; }
+ QuartcStream* last_incoming_stream() const { return last_incoming_stream_; }
// Returns all received messages.
- const std::vector<std::string>& incoming_messages() {
+ const std::vector<std::string>& incoming_messages() const {
return incoming_messages_;
}
- bool connected() { return connected_; }
+ // Returns all sent datagram ids in the order sent.
+ const std::vector<int64_t>& sent_datagram_ids() const {
+ return sent_datagram_ids_;
+ }
+
+ bool connected() const { return connected_; }
QuicTime writable_time() const { return writable_time_; }
QuicTime crypto_handshake_time() const { return crypto_handshake_time_; }
private:
QuartcStream* last_incoming_stream_;
std::vector<std::string> incoming_messages_;
+ std::vector<int64_t> sent_datagram_ids_;
bool connected_ = true;
QuartcStream::Delegate* stream_delegate_;
QuicTime writable_time_ = QuicTime::Zero();
diff --git a/quic/quartc/quartc_session.cc b/quic/quartc/quartc_session.cc
index a62fc10..a4c897d 100644
--- a/quic/quartc/quartc_session.cc
+++ b/quic/quartc/quartc_session.cc
@@ -41,7 +41,8 @@
GetNextOutgoingBidirectionalStreamId(), QuicStream::kDefaultPriority));
}
-bool QuartcSession::SendOrQueueMessage(QuicMemSliceSpan message) {
+bool QuartcSession::SendOrQueueMessage(QuicMemSliceSpan message,
+ int64_t datagram_id) {
if (!CanSendMessage()) {
QUIC_LOG(ERROR) << "Quic session does not support SendMessage";
return false;
@@ -57,8 +58,8 @@
// There may be other messages in send queue, so we have to add message
// to the queue and call queue processing helper.
- message.ConsumeAll([this](QuicMemSlice slice) {
- send_message_queue_.emplace_back(std::move(slice));
+ message.ConsumeAll([this, datagram_id](QuicMemSlice slice) {
+ send_message_queue_.emplace_back(std::move(slice), datagram_id);
});
ProcessSendMessageQueue();
@@ -70,15 +71,19 @@
QuicConnection::ScopedPacketFlusher flusher(
connection(), QuicConnection::AckBundling::NO_ACK);
while (!send_message_queue_.empty()) {
- const size_t message_size = send_message_queue_.front().length();
- MessageResult result =
- SendMessage(QuicMemSliceSpan(&send_message_queue_.front()));
+ QueuedMessage& it = send_message_queue_.front();
+ const size_t message_size = it.message.length();
+ MessageResult result = SendMessage(QuicMemSliceSpan(&it.message));
// Handle errors.
switch (result.status) {
case MESSAGE_STATUS_SUCCESS:
QUIC_VLOG(1) << "Quartc message sent, message_id=" << result.message_id
<< ", message_size=" << message_size;
+
+ // Notify that datagram was sent.
+ session_delegate_->OnMessageSent(it.datagram_id);
+
break;
// If connection is congestion controlled or not writable yet, stop
diff --git a/quic/quartc/quartc_session.h b/quic/quartc/quartc_session.h
index 8e28add..840f69d 100644
--- a/quic/quartc/quartc_session.h
+++ b/quic/quartc/quartc_session.h
@@ -48,7 +48,18 @@
// support SendMessage API. Other unexpected errors during send will not be
// returned, because messages can be sent later if connection is congestion
// controlled.
- bool SendOrQueueMessage(QuicMemSliceSpan message);
+ //
+ // |datagram_id| is used to notify when message was sent in
+ // Delegate::OnMessageSent.
+ //
+ // TODO(sukhanov): We can not use QUIC message ID for notifications, because
+ // QUIC does not take ownership of messages and if connection is congestion
+ // controlled, message is not sent and does not get message id until it is
+ // sent successfully. It also creates problem of flow control between
+ // messages and streams if they are used together. We discussed it with QUIC
+ // team and there are multiple solutions, but for now we have to use our
+ // own datagram identification.
+ bool SendOrQueueMessage(QuicMemSliceSpan message, int64_t datagram_id);
// Returns largest message payload acceptable in SendQuartcMessage.
QuicPacketLength GetCurrentLargestMessagePayload() const {
@@ -129,6 +140,22 @@
// Called when message (sent as SendMessage) is received.
virtual void OnMessageReceived(QuicStringPiece message) = 0;
+ // Called when message is sent to QUIC.
+ //
+ // Takes into account delay due to congestion control, but does not take
+ // into account any additional socket delays.
+ //
+ // Passed |datagram_id| is the same used in SendOrQueueMessage.
+ //
+ // TODO(sukhanov): We can take into account socket delay, but it's not clear
+ // if it's worth doing if we eventually plan to move congestion control to
+ // QUIC in QRTP model. If we need to do it, mellem@ thinks it's fairly
+ // strtaightforward: QUIC does not know about socket delay, but ICE does. We
+ // can tell ICE the QUIC packet number for each packet sent, and it will
+ // echo it back to us when the packet actually goes out. We just need to
+ // plumb that signal up to RTP's congestion control.
+ virtual void OnMessageSent(int64_t datagram_id) = 0;
+
// TODO(zhihuang): Add proof verification.
};
@@ -171,6 +198,14 @@
std::unique_ptr<QuartcStream> stream,
spdy::SpdyPriority priority);
+ // Holds message until it's sent.
+ struct QueuedMessage {
+ QueuedMessage(QuicMemSlice the_message, int64_t the_datagram_id)
+ : message(std::move(the_message)), datagram_id(the_datagram_id) {}
+ QuicMemSlice message;
+ int64_t datagram_id;
+ };
+
void ProcessSendMessageQueue();
// Take ownership of the QuicConnection. Note: if |connection_| changes,
@@ -190,7 +225,7 @@
// Queue of pending messages sent by SendQuartcMessage that were not sent
// yet or blocked by congestion control. Messages are queued in the order
// of sent by SendOrQueueMessage().
- QuicDeque<QuicMemSlice> send_message_queue_;
+ QuicDeque<QueuedMessage> send_message_queue_;
};
class QuartcClientSession : public QuartcSession,
diff --git a/quic/quartc/quartc_session_test.cc b/quic/quartc/quartc_session_test.cc
index 75fd20a..e150e03 100644
--- a/quic/quartc/quartc_session_test.cc
+++ b/quic/quartc/quartc_session_test.cc
@@ -163,10 +163,14 @@
ASSERT_TRUE(server_peer_->CanSendMessage());
ASSERT_TRUE(client_peer_->CanSendMessage());
+ int64_t server_datagram_id = 111;
+ int64_t client_datagram_id = 222;
+
// Send message from peer 1 to peer 2.
test::QuicTestMemSliceVector message =
CreateMemSliceVector("Message from server");
- ASSERT_TRUE(server_peer_->SendOrQueueMessage(message.span()));
+ ASSERT_TRUE(
+ server_peer_->SendOrQueueMessage(message.span(), server_datagram_id));
// First message in each direction should not be queued.
EXPECT_EQ(server_peer_->send_message_queue_size(), 0u);
@@ -177,9 +181,13 @@
EXPECT_THAT(client_session_delegate_->incoming_messages(),
testing::ElementsAre("Message from server"));
+ EXPECT_THAT(server_session_delegate_->sent_datagram_ids(),
+ testing::ElementsAre(server_datagram_id));
+
// Send message from peer 2 to peer 1.
message = CreateMemSliceVector("Message from client");
- ASSERT_TRUE(client_peer_->SendOrQueueMessage(message.span()));
+ ASSERT_TRUE(
+ client_peer_->SendOrQueueMessage(message.span(), client_datagram_id));
// First message in each direction should not be queued.
EXPECT_EQ(client_peer_->send_message_queue_size(), 0u);
@@ -189,6 +197,9 @@
EXPECT_THAT(server_session_delegate_->incoming_messages(),
testing::ElementsAre("Message from client"));
+
+ EXPECT_THAT(client_session_delegate_->sent_datagram_ids(),
+ testing::ElementsAre(client_datagram_id));
}
// Test for sending multiple messages that also result in queueing.
@@ -207,23 +218,34 @@
direction_from_server ? client_session_delegate_.get()
: server_session_delegate_.get();
+ FakeQuartcSessionDelegate* const delegate_sending =
+ direction_from_server ? server_session_delegate_.get()
+ : client_session_delegate_.get();
+
// There should be no messages in the queue before we start sending.
EXPECT_EQ(peer_sending->send_message_queue_size(), 0u);
// Send messages from peer 1 to peer 2 until required number of messages
// are queued in unsent message queue.
std::vector<std::string> sent_messages;
+ std::vector<int64_t> sent_datagram_ids;
+ int64_t current_datagram_id = 0;
while (peer_sending->send_message_queue_size() < queue_size) {
sent_messages.push_back(
QuicStrCat("Sending message, index=", sent_messages.size()));
ASSERT_TRUE(peer_sending->SendOrQueueMessage(
- CreateMemSliceVector(sent_messages.back()).span()));
+ CreateMemSliceVector(sent_messages.back()).span(),
+ current_datagram_id));
+
+ sent_datagram_ids.push_back(current_datagram_id);
+ ++current_datagram_id;
}
// Wait for peer 2 to receive all messages.
RunTasks();
EXPECT_EQ(delegate_receiving->incoming_messages(), sent_messages);
+ EXPECT_EQ(delegate_sending->sent_datagram_ids(), sent_datagram_ids);
}
// Test sending long messages:
@@ -238,13 +260,15 @@
std::string(server_peer_->GetCurrentLargestMessagePayload(), 'A');
test::QuicTestMemSliceVector message =
CreateMemSliceVector(message_max_long);
- ASSERT_TRUE(server_peer_->SendOrQueueMessage(message.span()));
+ ASSERT_TRUE(
+ server_peer_->SendOrQueueMessage(message.span(), /*datagram_id=*/0));
// Send long message which should fail.
std::string message_too_long =
std::string(server_peer_->GetCurrentLargestMessagePayload() + 1, 'B');
message = CreateMemSliceVector(message_too_long);
- ASSERT_FALSE(server_peer_->SendOrQueueMessage(message.span()));
+ ASSERT_FALSE(
+ server_peer_->SendOrQueueMessage(message.span(), /*datagram_id=*/0));
// Wait for peer 2 to receive message.
RunTasks();
diff --git a/quic/quartc/test/quartc_peer.cc b/quic/quartc/test/quartc_peer.cc
index 9cecd18..6857858 100644
--- a/quic/quartc/test/quartc_peer.cc
+++ b/quic/quartc/test/quartc_peer.cc
@@ -123,7 +123,7 @@
DCHECK_LE(length, session_->GetCurrentLargestMessagePayload());
struct iovec iov = {const_cast<char*>(data), length};
QuicMemSliceStorage storage(&iov, 1, buffer_allocator_, length);
- session_->SendOrQueueMessage(storage.ToSpan());
+ session_->SendOrQueueMessage(storage.ToSpan(), /*datagram_id=*/0);
}
} // namespace test
diff --git a/quic/quartc/test/quartc_peer.h b/quic/quartc/test/quartc_peer.h
index 8a62ba7..65d85fa 100644
--- a/quic/quartc/test/quartc_peer.h
+++ b/quic/quartc/test/quartc_peer.h
@@ -87,6 +87,7 @@
const std::string& error_details,
ConnectionCloseSource source) override;
void OnMessageReceived(QuicStringPiece message) override;
+ void OnMessageSent(int64_t datagram_id) override {}
// QuartcDataSource::Delegate overrides.
void OnDataProduced(const char* data, size_t length) override;