Propagate Datagram ACKs from QuartcSession

Introduce maps of message_id to datagram ids, so we could translate message ACKs received from QUIC to datagram ACKs that are propagated up the stack. We have to maintain map, because QUIC assigns message_id only when message is sent (it can be queued if connection was congestion controlled).

WebRTC will use transport sequence numbers as datagram_id so that ACKs can be mapped to sent packets.

gfe-relnote: n/a (Quartc only)
PiperOrigin-RevId: 250589798
Change-Id: Ib7df728873a839bd3343adb182fb68c3bf055caf
diff --git a/quic/quartc/quartc_endpoint.cc b/quic/quartc/quartc_endpoint.cc
index 7676c59..29f0d67 100644
--- a/quic/quartc/quartc_endpoint.cc
+++ b/quic/quartc/quartc_endpoint.cc
@@ -133,6 +133,10 @@
   delegate_->OnMessageSent(datagram_id);
 }
 
+void QuartcClientEndpoint::OnMessageAcked(int64_t datagram_id) {
+  delegate_->OnMessageAcked(datagram_id);
+}
+
 QuartcServerEndpoint::QuartcServerEndpoint(
     QuicAlarmFactory* alarm_factory,
     const QuicClock* clock,
diff --git a/quic/quartc/quartc_endpoint.h b/quic/quartc/quartc_endpoint.h
index 6c7644c..da480fe 100644
--- a/quic/quartc/quartc_endpoint.h
+++ b/quic/quartc/quartc_endpoint.h
@@ -88,6 +88,7 @@
                           ConnectionCloseSource source) override;
   void OnMessageReceived(QuicStringPiece message) override;
   void OnMessageSent(int64_t datagram_id) override;
+  void OnMessageAcked(int64_t datagram_id) override;
 
  private:
   friend class CreateSessionDelegate;
diff --git a/quic/quartc/quartc_fakes.h b/quic/quartc/quartc_fakes.h
index 13b65b1..937142c 100644
--- a/quic/quartc/quartc_fakes.h
+++ b/quic/quartc/quartc_fakes.h
@@ -63,6 +63,10 @@
     sent_datagram_ids_.push_back(datagram_id);
   }
 
+  void OnMessageAcked(int64_t datagram_id) {
+    acked_datagram_ids_.push_back(datagram_id);
+  }
+
   void OnCongestionControlChange(QuicBandwidth bandwidth_estimate,
                                  QuicBandwidth pacing_rate,
                                  QuicTime::Delta latest_rtt) override {}
@@ -83,6 +87,11 @@
     return sent_datagram_ids_;
   }
 
+  // Returns all ACKEd datagram ids in the order ACKs were received.
+  const std::vector<int64_t>& acked_datagram_ids() const {
+    return acked_datagram_ids_;
+  }
+
   bool connected() const { return connected_; }
   QuicTime writable_time() const { return writable_time_; }
   QuicTime crypto_handshake_time() const { return crypto_handshake_time_; }
@@ -97,6 +106,7 @@
   QuartcStream* last_incoming_stream_;
   std::vector<std::string> incoming_messages_;
   std::vector<int64_t> sent_datagram_ids_;
+  std::vector<int64_t> acked_datagram_ids_;
   bool connected_ = true;
   QuartcStream::Delegate* stream_delegate_;
   QuicTime writable_time_ = QuicTime::Zero();
diff --git a/quic/quartc/quartc_session.cc b/quic/quartc/quartc_session.cc
index 8b285d0..2e6729c 100644
--- a/quic/quartc/quartc_session.cc
+++ b/quic/quartc/quartc_session.cc
@@ -77,14 +77,21 @@
 
     // Handle errors.
     switch (result.status) {
-      case MESSAGE_STATUS_SUCCESS:
+      case MESSAGE_STATUS_SUCCESS: {
         QUIC_VLOG(1) << "Quartc message sent, message_id=" << result.message_id
                      << ", message_size=" << message_size;
 
+        auto element = message_to_datagram_id_.find(result.message_id);
+
+        DCHECK(element == message_to_datagram_id_.end())
+            << "Mapped message_id already exists, message_id="
+            << result.message_id << ", datagram_id=" << element->second;
+
+        message_to_datagram_id_[result.message_id] = it.datagram_id;
+
         // Notify that datagram was sent.
         session_delegate_->OnMessageSent(it.datagram_id);
-
-        break;
+      } break;
 
       // If connection is congestion controlled or not writable yet, stop
       // send loop and we'll retry again when we get OnCanWrite notification.
@@ -239,6 +246,21 @@
   session_delegate_->OnMessageReceived(message);
 }
 
+void QuartcSession::OnMessageAcked(QuicMessageId message_id) {
+  auto element = message_to_datagram_id_.find(message_id);
+
+  if (element == message_to_datagram_id_.end()) {
+    QUIC_DLOG(DFATAL) << "ACKed message_id was not found, message_id="
+                      << message_id;
+    return;
+  }
+
+  session_delegate_->OnMessageAcked(/*datagram_id=*/element->second);
+
+  // Free up space -- we should never see message_id again.
+  message_to_datagram_id_.erase(element);
+}
+
 QuicStream* QuartcSession::CreateIncomingStream(QuicStreamId id) {
   return ActivateDataStream(CreateDataStream(id, QuicStream::kDefaultPriority));
 }
diff --git a/quic/quartc/quartc_session.h b/quic/quartc/quartc_session.h
index fd1743b..042827b 100644
--- a/quic/quartc/quartc_session.h
+++ b/quic/quartc/quartc_session.h
@@ -14,6 +14,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
 #include "net/third_party/quiche/src/quic/core/quic_session.h"
 #include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
 #include "net/third_party/quiche/src/quic/quartc/quartc_packet_writer.h"
 #include "net/third_party/quiche/src/quic/quartc/quartc_stream.h"
 
@@ -156,6 +157,9 @@
     // plumb that signal up to RTP's congestion control.
     virtual void OnMessageSent(int64_t datagram_id) = 0;
 
+    // Called when message with |datagram_id| gets acked.
+    virtual void OnMessageAcked(int64_t datagram_id) = 0;
+
     // TODO(zhihuang): Add proof verification.
   };
 
@@ -171,6 +175,9 @@
 
   void OnMessageReceived(QuicStringPiece message) override;
 
+  // Called when message with |message_id| gets acked.
+  void OnMessageAcked(QuicMessageId message_id) override;
+
   // Returns number of queued (not sent) messages submitted by
   // SendOrQueueMessage. Messages are queued if connection is congestion
   // controlled.
@@ -226,6 +233,10 @@
   // yet or blocked by congestion control. Messages are queued in the order
   // of sent by SendOrQueueMessage().
   QuicDeque<QueuedMessage> send_message_queue_;
+
+  // Maps message ids to datagram ids, so we could translate message ACKs
+  // received from QUIC to datagram ACKs that are propagated up the stack.
+  QuicUnorderedMap<QuicMessageId, int64_t> message_to_datagram_id_;
 };
 
 class QuartcClientSession : public QuartcSession,
diff --git a/quic/quartc/quartc_session_test.cc b/quic/quartc/quartc_session_test.cc
index 87174dc..316a7a7 100644
--- a/quic/quartc/quartc_session_test.cc
+++ b/quic/quartc/quartc_session_test.cc
@@ -180,6 +180,9 @@
     EXPECT_THAT(server_session_delegate_->sent_datagram_ids(),
                 testing::ElementsAre(server_datagram_id));
 
+    EXPECT_THAT(server_session_delegate_->acked_datagram_ids(),
+                testing::ElementsAre(server_datagram_id));
+
     // Send message from peer 2 to peer 1.
     message = CreateMemSliceVector("Message from client");
     ASSERT_TRUE(
@@ -196,6 +199,9 @@
 
     EXPECT_THAT(client_session_delegate_->sent_datagram_ids(),
                 testing::ElementsAre(client_datagram_id));
+
+    EXPECT_THAT(client_session_delegate_->acked_datagram_ids(),
+                testing::ElementsAre(client_datagram_id));
   }
 
   // Test for sending multiple messages that also result in queueing.
@@ -242,6 +248,7 @@
 
     EXPECT_EQ(delegate_receiving->incoming_messages(), sent_messages);
     EXPECT_EQ(delegate_sending->sent_datagram_ids(), sent_datagram_ids);
+    EXPECT_EQ(delegate_sending->acked_datagram_ids(), sent_datagram_ids);
   }
 
   // Test sending long messages:
diff --git a/quic/quartc/test/quartc_peer.h b/quic/quartc/test/quartc_peer.h
index b96f488..fc5e6b1 100644
--- a/quic/quartc/test/quartc_peer.h
+++ b/quic/quartc/test/quartc_peer.h
@@ -85,6 +85,7 @@
                           ConnectionCloseSource source) override;
   void OnMessageReceived(QuicStringPiece message) override;
   void OnMessageSent(int64_t datagram_id) override {}
+  void OnMessageAcked(int64_t datagram_id) override {}
 
   // QuartcDataSource::Delegate overrides.
   void OnDataProduced(const char* data, size_t length) override;
diff --git a/quic/quartc/test/quic_trace_interceptor.cc b/quic/quartc/test/quic_trace_interceptor.cc
index 741c542..5ce9be5 100644
--- a/quic/quartc/test/quic_trace_interceptor.cc
+++ b/quic/quartc/test/quic_trace_interceptor.cc
@@ -67,6 +67,10 @@
   delegate_->OnMessageSent(datagram_id);
 }
 
+void QuicTraceInterceptor::OnMessageAcked(int64_t datagram_id) {
+  delegate_->OnMessageAcked(datagram_id);
+}
+
 void QuicTraceInterceptor::SetDelegate(QuartcEndpoint::Delegate* delegate) {
   DCHECK(delegate != nullptr);
   delegate_ = delegate;
diff --git a/quic/quartc/test/quic_trace_interceptor.h b/quic/quartc/test/quic_trace_interceptor.h
index ee79d1e..e68d880 100644
--- a/quic/quartc/test/quic_trace_interceptor.h
+++ b/quic/quartc/test/quic_trace_interceptor.h
@@ -39,6 +39,7 @@
                           ConnectionCloseSource source) override;
   void OnMessageReceived(QuicStringPiece message) override;
   void OnMessageSent(int64_t datagram_id) override;
+  void OnMessageAcked(int64_t datagram_id) override;
   void SetDelegate(QuartcEndpoint::Delegate* delegate) override;
 
  private: