Propagate receive_timestamp and lost message frames from QUIC to WebRTC.

WebRTC needs to know when messages are lost or acked (with receive timestamps)
in order to synthesize RTCP feedback packets.  This change propagates receive
timestamps and lost message notifications from QUIC to the datagram sink.

gfe-relnote: n/a (Quartc only)
PiperOrigin-RevId: 250968409
Change-Id: Ie452f1eaee265f8688c3dc2b2de41c5b3976db9a
diff --git a/quic/quartc/quartc_endpoint.cc b/quic/quartc/quartc_endpoint.cc
index 29f0d67..a77595f 100644
--- a/quic/quartc/quartc_endpoint.cc
+++ b/quic/quartc/quartc_endpoint.cc
@@ -133,8 +133,13 @@
   delegate_->OnMessageSent(datagram_id);
 }
 
-void QuartcClientEndpoint::OnMessageAcked(int64_t datagram_id) {
-  delegate_->OnMessageAcked(datagram_id);
+void QuartcClientEndpoint::OnMessageAcked(int64_t datagram_id,
+                                          QuicTime receive_timestamp) {
+  delegate_->OnMessageAcked(datagram_id, receive_timestamp);
+}
+
+void QuartcClientEndpoint::OnMessageLost(int64_t datagram_id) {
+  delegate_->OnMessageLost(datagram_id);
 }
 
 QuartcServerEndpoint::QuartcServerEndpoint(
diff --git a/quic/quartc/quartc_endpoint.h b/quic/quartc/quartc_endpoint.h
index da480fe..df6f056 100644
--- a/quic/quartc/quartc_endpoint.h
+++ b/quic/quartc/quartc_endpoint.h
@@ -88,7 +88,8 @@
                           ConnectionCloseSource source) override;
   void OnMessageReceived(QuicStringPiece message) override;
   void OnMessageSent(int64_t datagram_id) override;
-  void OnMessageAcked(int64_t datagram_id) override;
+  void OnMessageAcked(int64_t datagram_id, QuicTime receive_timestamp) override;
+  void OnMessageLost(int64_t datagram_id) override;
 
  private:
   friend class CreateSessionDelegate;
diff --git a/quic/quartc/quartc_fakes.h b/quic/quartc/quartc_fakes.h
index 937142c..abf2a43 100644
--- a/quic/quartc/quartc_fakes.h
+++ b/quic/quartc/quartc_fakes.h
@@ -63,8 +63,13 @@
     sent_datagram_ids_.push_back(datagram_id);
   }
 
-  void OnMessageAcked(int64_t datagram_id) {
-    acked_datagram_ids_.push_back(datagram_id);
+  void OnMessageAcked(int64_t datagram_id, QuicTime receive_timestamp) {
+    acked_datagram_id_to_receive_timestamp_.emplace(datagram_id,
+                                                    receive_timestamp);
+  }
+
+  void OnMessageLost(int64_t datagram_id) {
+    lost_datagram_ids_.push_back(datagram_id);
   }
 
   void OnCongestionControlChange(QuicBandwidth bandwidth_estimate,
@@ -88,8 +93,13 @@
   }
 
   // Returns all ACKEd datagram ids in the order ACKs were received.
-  const std::vector<int64_t>& acked_datagram_ids() const {
-    return acked_datagram_ids_;
+  const std::map<int64_t, QuicTime>& acked_datagram_id_to_receive_timestamp()
+      const {
+    return acked_datagram_id_to_receive_timestamp_;
+  }
+
+  const std::vector<int64_t>& lost_datagram_ids() const {
+    return lost_datagram_ids_;
   }
 
   bool connected() const { return connected_; }
@@ -106,7 +116,8 @@
   QuartcStream* last_incoming_stream_;
   std::vector<std::string> incoming_messages_;
   std::vector<int64_t> sent_datagram_ids_;
-  std::vector<int64_t> acked_datagram_ids_;
+  std::map<int64_t, QuicTime> acked_datagram_id_to_receive_timestamp_;
+  std::vector<int64_t> lost_datagram_ids_;
   bool connected_ = true;
   QuartcStream::Delegate* stream_delegate_;
   QuicTime writable_time_ = QuicTime::Zero();
diff --git a/quic/quartc/quartc_session.cc b/quic/quartc/quartc_session.cc
index 95c7e86..84396ad 100644
--- a/quic/quartc/quartc_session.cc
+++ b/quic/quartc/quartc_session.cc
@@ -251,18 +251,28 @@
   auto element = message_to_datagram_id_.find(message_id);
 
   if (element == message_to_datagram_id_.end()) {
-    QUIC_DLOG(DFATAL) << "ACKed message_id was not found, message_id="
-                      << message_id;
     return;
   }
 
-  // TODO(mellem): Pass receive_timestamp to |delegate_|.
-  session_delegate_->OnMessageAcked(/*datagram_id=*/element->second);
+  session_delegate_->OnMessageAcked(/*datagram_id=*/element->second,
+                                    receive_timestamp);
 
   // Free up space -- we should never see message_id again.
   message_to_datagram_id_.erase(element);
 }
 
+void QuartcSession::OnMessageLost(QuicMessageId message_id) {
+  auto it = message_to_datagram_id_.find(message_id);
+  if (it == message_to_datagram_id_.end()) {
+    return;
+  }
+
+  session_delegate_->OnMessageLost(/*datagram_id=*/it->second);
+
+  // Free up space.
+  message_to_datagram_id_.erase(it);
+}
+
 QuicStream* QuartcSession::CreateIncomingStream(QuicStreamId id) {
   return ActivateDataStream(CreateDataStream(id, QuicStream::kDefaultPriority));
 }
diff --git a/quic/quartc/quartc_session.h b/quic/quartc/quartc_session.h
index 3dd2916..6cd1a20 100644
--- a/quic/quartc/quartc_session.h
+++ b/quic/quartc/quartc_session.h
@@ -157,8 +157,14 @@
     // plumb that signal up to RTP's congestion control.
     virtual void OnMessageSent(int64_t datagram_id) = 0;
 
-    // Called when message with |datagram_id| gets acked.
-    virtual void OnMessageAcked(int64_t datagram_id) = 0;
+    // Called when message with |datagram_id| gets acked.  |receive_timestamp|
+    // indicates when the peer received this message, according to its own
+    // clock.
+    virtual void OnMessageAcked(int64_t datagram_id,
+                                QuicTime receive_timestamp) = 0;
+
+    // Called when message with |datagram_id| is lost.
+    virtual void OnMessageLost(int64_t datagram_id) = 0;
 
     // TODO(zhihuang): Add proof verification.
   };
@@ -179,6 +185,8 @@
   void OnMessageAcked(QuicMessageId message_id,
                       QuicTime receive_timestamp) override;
 
+  void OnMessageLost(QuicMessageId message_id) override;
+
   // Returns number of queued (not sent) messages submitted by
   // SendOrQueueMessage. Messages are queued if connection is congestion
   // controlled.
diff --git a/quic/quartc/quartc_session_test.cc b/quic/quartc/quartc_session_test.cc
index 316a7a7..1bd0828 100644
--- a/quic/quartc/quartc_session_test.cc
+++ b/quic/quartc/quartc_session_test.cc
@@ -26,6 +26,11 @@
 
 namespace {
 
+using ::testing::ElementsAre;
+using ::testing::ElementsAreArray;
+using ::testing::Gt;
+using ::testing::Pair;
+
 constexpr QuicTime::Delta kPropagationDelay =
     QuicTime::Delta::FromMilliseconds(10);
 // Propagation delay and a bit, but no more than full RTT.
@@ -180,8 +185,9 @@
     EXPECT_THAT(server_session_delegate_->sent_datagram_ids(),
                 testing::ElementsAre(server_datagram_id));
 
-    EXPECT_THAT(server_session_delegate_->acked_datagram_ids(),
-                testing::ElementsAre(server_datagram_id));
+    EXPECT_THAT(
+        server_session_delegate_->acked_datagram_id_to_receive_timestamp(),
+        ElementsAre(Pair(server_datagram_id, Gt(QuicTime::Zero()))));
 
     // Send message from peer 2 to peer 1.
     message = CreateMemSliceVector("Message from client");
@@ -200,8 +206,9 @@
     EXPECT_THAT(client_session_delegate_->sent_datagram_ids(),
                 testing::ElementsAre(client_datagram_id));
 
-    EXPECT_THAT(client_session_delegate_->acked_datagram_ids(),
-                testing::ElementsAre(client_datagram_id));
+    EXPECT_THAT(
+        client_session_delegate_->acked_datagram_id_to_receive_timestamp(),
+        ElementsAre(Pair(client_datagram_id, Gt(QuicTime::Zero()))));
   }
 
   // Test for sending multiple messages that also result in queueing.
@@ -246,9 +253,14 @@
     // Wait for peer 2 to receive all messages.
     RunTasks();
 
+    std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers;
+    for (int64_t id : sent_datagram_ids) {
+      ack_matchers.push_back(Pair(id, Gt(QuicTime::Zero())));
+    }
     EXPECT_EQ(delegate_receiving->incoming_messages(), sent_messages);
     EXPECT_EQ(delegate_sending->sent_datagram_ids(), sent_datagram_ids);
-    EXPECT_EQ(delegate_sending->acked_datagram_ids(), sent_datagram_ids);
+    EXPECT_THAT(delegate_sending->acked_datagram_id_to_receive_timestamp(),
+                ElementsAreArray(ack_matchers));
   }
 
   // Test sending long messages:
@@ -320,12 +332,18 @@
 }
 
 TEST_F(QuartcSessionTest, SendReceiveMessages) {
+  // TODO(b/134175506): Remove when IETF QUIC supports receive timestamps.
+  SetQuicReloadableFlag(quic_enable_version_99, false);
+
   CreateClientAndServerSessions(QuartcSessionConfig());
   AwaitHandshake();
   TestSendReceiveMessage();
 }
 
 TEST_F(QuartcSessionTest, SendReceiveQueuedMessages) {
+  // TODO(b/134175506): Remove when IETF QUIC supports receive timestamps.
+  SetQuicReloadableFlag(quic_enable_version_99, false);
+
   CreateClientAndServerSessions(QuartcSessionConfig());
   AwaitHandshake();
   TestSendReceiveQueuedMessages(/*direction_from_server=*/true);
@@ -362,6 +380,9 @@
 }
 
 TEST_F(QuartcSessionTest, PreSharedKeyHandshake) {
+  // TODO(b/134175506): Remove when IETF QUIC supports receive timestamps.
+  SetQuicReloadableFlag(quic_enable_version_99, false);
+
   QuartcSessionConfig config;
   config.pre_shared_key = "foo";
   CreateClientAndServerSessions(config);
@@ -504,6 +525,54 @@
             QUIC_STREAM_CANCELLED);
 }
 
+TEST_F(QuartcSessionTest, LostDatagramNotifications) {
+  // TODO(b/134175506): Remove when IETF QUIC supports receive timestamps.
+  SetQuicReloadableFlag(quic_enable_version_99, false);
+
+  // Disable tail loss probe, otherwise test maybe flaky because dropped
+  // message will be retransmitted to detect tail loss.
+  QuartcSessionConfig session_config;
+  session_config.enable_tail_loss_probe = false;
+  CreateClientAndServerSessions(session_config);
+
+  // Disable probing retransmissions, otherwise test maybe flaky because dropped
+  // message will be retransmitted to to probe for more bandwidth.
+  client_peer_->connection()->set_fill_up_link_during_probing(false);
+  server_peer_->connection()->set_fill_up_link_during_probing(false);
+
+  AwaitHandshake();
+  ASSERT_TRUE(client_peer_->IsCryptoHandshakeConfirmed());
+  ASSERT_TRUE(server_peer_->IsCryptoHandshakeConfirmed());
+
+  // The client sends an ACK for the crypto handshake next.  This must be
+  // flushed before we set the filter to drop the next packet, in order to
+  // ensure that the filter drops a data-bearing packet instead of just an ack.
+  RunTasks();
+
+  // Drop the next packet.
+  client_filter_->set_packets_to_drop(1);
+
+  test::QuicTestMemSliceVector message =
+      CreateMemSliceVector("This message will be lost");
+  ASSERT_TRUE(client_peer_->SendOrQueueMessage(message.span(), 1));
+
+  RunTasks();
+
+  // Send another packet to elicit an ack and trigger loss detection.
+  message = CreateMemSliceVector("This message will arrive");
+  ASSERT_TRUE(client_peer_->SendOrQueueMessage(message.span(), 2));
+
+  RunTasks();
+
+  EXPECT_THAT(server_session_delegate_->incoming_messages(),
+              ElementsAre("This message will arrive"));
+  EXPECT_THAT(client_session_delegate_->sent_datagram_ids(), ElementsAre(1, 2));
+  EXPECT_THAT(
+      client_session_delegate_->acked_datagram_id_to_receive_timestamp(),
+      ElementsAre(Pair(2, Gt(QuicTime::Zero()))));
+  EXPECT_THAT(client_session_delegate_->lost_datagram_ids(), ElementsAre(1));
+}
+
 TEST_F(QuartcSessionTest, ServerRegistersAsWriteBlocked) {
   // Initialize client and server session, but with the server write-blocked.
   Init();
diff --git a/quic/quartc/test/quartc_peer.h b/quic/quartc/test/quartc_peer.h
index fc5e6b1..39dd148 100644
--- a/quic/quartc/test/quartc_peer.h
+++ b/quic/quartc/test/quartc_peer.h
@@ -85,7 +85,9 @@
                           ConnectionCloseSource source) override;
   void OnMessageReceived(QuicStringPiece message) override;
   void OnMessageSent(int64_t datagram_id) override {}
-  void OnMessageAcked(int64_t datagram_id) override {}
+  void OnMessageAcked(int64_t datagram_id,
+                      QuicTime receive_timestamp) override {}
+  void OnMessageLost(int64_t datagram_id) override {}
 
   // QuartcDataSource::Delegate overrides.
   void OnDataProduced(const char* data, size_t length) override;
diff --git a/quic/quartc/test/quic_trace_interceptor.cc b/quic/quartc/test/quic_trace_interceptor.cc
index 5ce9be5..dc10c38 100644
--- a/quic/quartc/test/quic_trace_interceptor.cc
+++ b/quic/quartc/test/quic_trace_interceptor.cc
@@ -67,8 +67,13 @@
   delegate_->OnMessageSent(datagram_id);
 }
 
-void QuicTraceInterceptor::OnMessageAcked(int64_t datagram_id) {
-  delegate_->OnMessageAcked(datagram_id);
+void QuicTraceInterceptor::OnMessageAcked(int64_t datagram_id,
+                                          QuicTime receive_timestamp) {
+  delegate_->OnMessageAcked(datagram_id, receive_timestamp);
+}
+
+void QuicTraceInterceptor::OnMessageLost(int64_t datagram_id) {
+  delegate_->OnMessageLost(datagram_id);
 }
 
 void QuicTraceInterceptor::SetDelegate(QuartcEndpoint::Delegate* delegate) {
diff --git a/quic/quartc/test/quic_trace_interceptor.h b/quic/quartc/test/quic_trace_interceptor.h
index e68d880..263b596 100644
--- a/quic/quartc/test/quic_trace_interceptor.h
+++ b/quic/quartc/test/quic_trace_interceptor.h
@@ -39,7 +39,8 @@
                           ConnectionCloseSource source) override;
   void OnMessageReceived(QuicStringPiece message) override;
   void OnMessageSent(int64_t datagram_id) override;
-  void OnMessageAcked(int64_t datagram_id) override;
+  void OnMessageAcked(int64_t datagram_id, QuicTime receive_timestamp) override;
+  void OnMessageLost(int64_t datagram_id) override;
   void SetDelegate(QuartcEndpoint::Delegate* delegate) override;
 
  private: