Introduce Object ACKs (OACKs) to MoQT.

OACKs let the application receiver acknowledge that it has received (and potentially processed) the object.  In addition to that, we also communicate how far off from the deadline the object was received.

The sender want to know this information if it is in a position to adjust the send rate:
- when it's a 1:1 connection and it controls the video bitrate,
- when it's doing sender-side ABR.

The reason we want to do MoQ-level ACKs is that the alternatives are not as good:
- using QUIC ACKs directly breaks layering and is infeasible with web APIs
- using a building up queue as a signal is possible, but the queue takes a time to build up, thus making it a delayed signal

PiperOrigin-RevId: 668549384
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 1ed0b6c..dabdea4 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -140,6 +140,13 @@
   return WireUint8(0xff);
 }
 
+uint64_t SignedVarintSerializedForm(int64_t value) {
+  if (value < 0) {
+    return ((-value) << 1) | 0x01;
+  }
+  return value << 1;
+}
+
 }  // namespace
 
 quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader(
@@ -256,6 +263,10 @@
     int_parameters.push_back(
         IntParameter(MoqtSetupParameter::kRole, *message.role));
   }
+  if (message.supports_object_ack) {
+    int_parameters.push_back(
+        IntParameter(MoqtSetupParameter::kSupportObjectAcks, 1u));
+  }
   if (!using_webtrans_ && message.path.has_value()) {
     string_parameters.push_back(
         StringParameter(MoqtSetupParameter::kPath, *message.path));
@@ -276,6 +287,10 @@
     int_parameters.push_back(
         IntParameter(MoqtSetupParameter::kRole, *message.role));
   }
+  if (message.supports_object_ack) {
+    int_parameters.push_back(
+        IntParameter(MoqtSetupParameter::kSupportObjectAcks, 1u));
+  }
   return Serialize(WireVarInt62(MoqtMessageType::kServerSetup),
                    WireVarInt62(message.selected_version),
                    WireVarInt62(int_parameters.size()),
@@ -295,6 +310,14 @@
         StringParameter(MoqtTrackRequestParameter::kAuthorizationInfo,
                         *message.parameters.authorization_info));
   }
+  absl::InlinedVector<IntParameter, 1> int_params;
+  if (message.parameters.object_ack_window.has_value()) {
+    QUICHE_DCHECK(message.parameters.object_ack_window->ToMicroseconds() >= 0);
+    int_params.push_back(IntParameter(
+        MoqtTrackRequestParameter::kOackWindowSize,
+        static_cast<uint64_t>(
+            message.parameters.object_ack_window->ToMicroseconds())));
+  }
   switch (filter_type) {
     case MoqtFilterType::kLatestGroup:
     case MoqtFilterType::kLatestObject:
@@ -305,8 +328,9 @@
           WireStringWithVarInt62Length(message.track_name),
           WireUint8(message.subscriber_priority),
           WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
-          WireVarInt62(string_params.size()),
-          WireSpan<WireStringParameter>(string_params));
+          WireVarInt62(string_params.size() + int_params.size()),
+          WireSpan<WireStringParameter>(string_params),
+          WireSpan<WireIntParameter>(int_params));
     case MoqtFilterType::kAbsoluteStart:
       return Serialize(
           WireVarInt62(MoqtMessageType::kSubscribe),
@@ -317,8 +341,9 @@
           WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
           WireVarInt62(*message.start_group),
           WireVarInt62(*message.start_object),
-          WireVarInt62(string_params.size()),
-          WireSpan<WireStringParameter>(string_params));
+          WireVarInt62(string_params.size() + int_params.size()),
+          WireSpan<WireStringParameter>(string_params),
+          WireSpan<WireIntParameter>(int_params));
     case MoqtFilterType::kAbsoluteRange:
       return Serialize(
           WireVarInt62(MoqtMessageType::kSubscribe),
@@ -331,8 +356,9 @@
           WireVarInt62(*message.start_object), WireVarInt62(*message.end_group),
           WireVarInt62(message.end_object.has_value() ? *message.end_object + 1
                                                       : 0),
-          WireVarInt62(string_params.size()),
-          WireSpan<WireStringParameter>(string_params));
+          WireVarInt62(string_params.size() + int_params.size()),
+          WireSpan<WireStringParameter>(string_params),
+          WireSpan<WireIntParameter>(int_params));
     default:
       QUICHE_BUG(MoqtFramer_end_group_missing) << "Subscribe framing error.";
       return quiche::QuicheBuffer();
@@ -473,4 +499,14 @@
                    WireStringWithVarInt62Length(message.new_session_uri));
 }
 
+quiche::QuicheBuffer MoqtFramer::SerializeObjectAck(
+    const MoqtObjectAck& message) {
+  return Serialize(WireVarInt62(MoqtMessageType::kObjectAck),
+                   WireVarInt62(message.subscribe_id),
+                   WireVarInt62(message.group_id),
+                   WireVarInt62(message.object_id),
+                   WireVarInt62(SignedVarintSerializedForm(
+                       message.delta_from_deadline.ToMicroseconds())));
+}
+
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index 6aef8b6..188715a 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -55,6 +55,7 @@
   quiche::QuicheBuffer SerializeUnannounce(const MoqtUnannounce& message);
   quiche::QuicheBuffer SerializeTrackStatus(const MoqtTrackStatus& message);
   quiche::QuicheBuffer SerializeGoAway(const MoqtGoAway& message);
+  quiche::QuicheBuffer SerializeObjectAck(const MoqtObjectAck& message);
 
  private:
   quiche::QuicheBufferAllocator* allocator_;
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index a216af2..0afeb2a 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -19,7 +19,9 @@
 #include "quiche/quic/platform/api/quic_expect_bug.h"
 #include "quiche/quic/platform/api/quic_test.h"
 #include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/quiche_text_utils.h"
 #include "quiche/common/simple_buffer_allocator.h"
+#include "quiche/common/test_tools/quiche_test_utils.h"
 
 namespace moqt::test {
 
@@ -40,8 +42,9 @@
       MoqtMessageType::kTrackStatus,       MoqtMessageType::kAnnounce,
       MoqtMessageType::kAnnounceOk,        MoqtMessageType::kAnnounceError,
       MoqtMessageType::kUnannounce,        MoqtMessageType::kGoAway,
-      MoqtMessageType::kClientSetup,       MoqtMessageType::kServerSetup,
-      MoqtMessageType::kStreamHeaderTrack, MoqtMessageType::kStreamHeaderGroup,
+      MoqtMessageType::kObjectAck,         MoqtMessageType::kClientSetup,
+      MoqtMessageType::kServerSetup,       MoqtMessageType::kStreamHeaderTrack,
+      MoqtMessageType::kStreamHeaderGroup,
   };
   std::vector<bool> uses_web_transport_bool = {
       false,
@@ -158,6 +161,10 @@
         auto data = std::get<MoqtGoAway>(structured_data);
         return framer_.SerializeGoAway(data);
       }
+      case moqt::MoqtMessageType::kObjectAck: {
+        auto data = std::get<MoqtObjectAck>(structured_data);
+        return framer_.SerializeObjectAck(data);
+      }
       case MoqtMessageType::kClientSetup: {
         auto data = std::get<MoqtClientSetup>(structured_data);
         return framer_.SerializeClientSetup(data);
@@ -187,7 +194,9 @@
   auto structured_data = message->structured_data();
   auto buffer = SerializeMessage(structured_data);
   EXPECT_EQ(buffer.size(), message->total_message_size());
-  EXPECT_EQ(buffer.AsStringView(), message->PacketSample());
+  quiche::test::CompareCharArraysWithHexError(
+      "frame encoding", buffer.data(), buffer.size(),
+      message->PacketSample().data(), message->PacketSample().size());
 }
 
 class MoqtFramerSimpleTest : public quic::test::QuicTest {
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 0eb9331..a8a67e7 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -6,15 +6,18 @@
 #include <memory>
 #include <optional>
 #include <string>
+#include <utility>
 
 #include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_generic_session.h"
+#include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_known_track_publisher.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_outgoing_queue.h"
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
 #include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
 #include "quiche/quic/test_tools/quic_test_utils.h"
@@ -48,11 +51,7 @@
   }
 
   void WireUpEndpoints() { test_harness_.WireUpEndpoints(); }
-
-  void EstablishSession() {
-    CreateDefaultEndpoints();
-    WireUpEndpoints();
-
+  void ConnectEndpoints() {
     client_->quic_session()->CryptoConnect();
     bool client_established = false;
     bool server_established = false;
@@ -65,6 +64,12 @@
     QUICHE_CHECK(success);
   }
 
+  void EstablishSession() {
+    CreateDefaultEndpoints();
+    WireUpEndpoints();
+    ConnectEndpoints();
+  }
+
  protected:
   quic::simulator::TestHarness test_harness_;
 
@@ -430,6 +435,54 @@
   EXPECT_TRUE(success);
 }
 
+TEST_F(MoqtIntegrationTest, ObjectAcks) {
+  CreateDefaultEndpoints();
+  WireUpEndpoints();
+  client_->session()->set_support_object_acks(true);
+  server_->session()->set_support_object_acks(true);
+  ConnectEndpoints();
+
+  FullTrackName full_track_name("foo", "bar");
+  MockRemoteTrackVisitor client_visitor;
+
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+  auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
+  publisher.Add(track_publisher);
+
+  MockPublishingMonitorInterface monitoring;
+  server_->session()->SetMonitoringInterfaceForTrack(full_track_name,
+                                                     &monitoring);
+
+  MoqtObjectAckFunction ack_function = nullptr;
+  EXPECT_CALL(client_visitor, OnCanAckObjects(_))
+      .WillOnce([&](MoqtObjectAckFunction new_ack_function) {
+        ack_function = std::move(new_ack_function);
+      });
+  EXPECT_CALL(client_visitor, OnReply(_, _))
+      .WillOnce([&](const FullTrackName&, std::optional<absl::string_view>) {
+        ack_function(10, 20, quic::QuicTimeDelta::FromMicroseconds(-123));
+        ack_function(100, 200, quic::QuicTimeDelta::FromMicroseconds(456));
+      });
+
+  MoqtSubscribeParameters parameters;
+  parameters.object_ack_window = quic::QuicTimeDelta::FromMilliseconds(100);
+  client_->session()->SubscribeCurrentObject(full_track_name.track_namespace,
+                                             full_track_name.track_name,
+                                             &client_visitor, parameters);
+  EXPECT_CALL(monitoring, OnObjectAckSupportKnown(true));
+  EXPECT_CALL(
+      monitoring,
+      OnObjectAckReceived(10, 20, quic::QuicTimeDelta::FromMicroseconds(-123)));
+  bool done = false;
+  EXPECT_CALL(
+      monitoring,
+      OnObjectAckReceived(100, 200, quic::QuicTimeDelta::FromMicroseconds(456)))
+      .WillOnce([&] { done = true; });
+  bool success = test_harness_.RunUntilWithDefaultTimeout([&] { return done; });
+  EXPECT_TRUE(success);
+}
+
 }  // namespace
 
 }  // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 277687f..7e3e616 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -95,6 +95,8 @@
       return "STREAM_HEADER_TRACK";
     case MoqtMessageType::kStreamHeaderGroup:
       return "STREAM_HEADER_GROUP";
+    case MoqtMessageType::kObjectAck:
+      return "OBJECT_ACK";
   }
   return "Unknown message " + std::to_string(static_cast<int>(message_type));
 }
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 2ec9b05..c25231b 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -33,14 +33,25 @@
   kUnrecognizedVersionForTests = 0xfe0000ff,
 };
 
+inline constexpr MoqtVersion kDefaultMoqtVersion = MoqtVersion::kDraft05;
+
 struct QUICHE_EXPORT MoqtSessionParameters {
   // TODO: support multiple versions.
   // TODO: support roles other than PubSub.
-  MoqtVersion version;
+
+  explicit MoqtSessionParameters(quic::Perspective perspective)
+      : perspective(perspective), using_webtrans(true) {}
+  MoqtSessionParameters(quic::Perspective perspective, std::string path)
+      : perspective(perspective),
+        using_webtrans(false),
+        path(std::move(path)) {}
+
+  MoqtVersion version = kDefaultMoqtVersion;
   quic::Perspective perspective;
   bool using_webtrans;
   std::string path;
-  bool deliver_partial_objects;
+  bool deliver_partial_objects = false;
+  bool support_object_acks = false;
 };
 
 // The maximum length of a message, excluding any OBJECT payload. This prevents
@@ -69,6 +80,12 @@
   kServerSetup = 0x41,
   kStreamHeaderTrack = 0x50,
   kStreamHeaderGroup = 0x51,
+
+  // QUICHE-specific extensions.
+
+  // kObjectAck (OACK for short) is a frame used by the receiver indicating that
+  // it has received and processed the specified object.
+  kObjectAck = 0x3184,
 };
 
 enum class QUICHE_EXPORT MoqtError : uint64_t {
@@ -98,10 +115,17 @@
 enum class QUICHE_EXPORT MoqtSetupParameter : uint64_t {
   kRole = 0x0,
   kPath = 0x1,
+
+  // QUICHE-specific extensions.
+  // Indicates support for OACK messages.
+  kSupportObjectAcks = 0xbbf1439,
 };
 
 enum class QUICHE_EXPORT MoqtTrackRequestParameter : uint64_t {
   kAuthorizationInfo = 0x2,
+
+  // QUICHE-specific extensions.
+  kOackWindowSize = 0xbbf1439,
 };
 
 // TODO: those are non-standard; add standard error codes once those exist, see
@@ -190,11 +214,13 @@
   std::vector<MoqtVersion> supported_versions;
   std::optional<MoqtRole> role;
   std::optional<std::string> path;
+  bool supports_object_ack = false;
 };
 
 struct QUICHE_EXPORT MoqtServerSetup {
   MoqtVersion selected_version;
   std::optional<MoqtRole> role;
+  bool supports_object_ack = false;
 };
 
 // These codes do not appear on the wire.
@@ -240,6 +266,12 @@
 
 struct QUICHE_EXPORT MoqtSubscribeParameters {
   std::optional<std::string> authorization_info;
+
+  // If present, indicates that OBJECT_ACK messages will be sent in response to
+  // the objects on the stream. The actual value is informational, and it
+  // communicates how many frames the subscriber is willing to buffer, in
+  // microseconds.
+  std::optional<quic::QuicTimeDelta> object_ack_window;
 };
 
 struct QUICHE_EXPORT MoqtSubscribe {
@@ -385,6 +417,17 @@
   std::string new_session_uri;
 };
 
+// All of the four values in this message are encoded as varints.
+// `delta_from_deadline` is encoded as an absolute value, with the lowest bit
+// indicating the sign (0 if positive).
+struct QUICHE_EXPORT MoqtObjectAck {
+  uint64_t subscribe_id;
+  uint64_t group_id;
+  uint64_t object_id;
+  // Positive if the object has been received before the deadline.
+  quic::QuicTimeDelta delta_from_deadline = quic::QuicTimeDelta::Zero();
+};
+
 std::string MoqtMessageTypeToString(MoqtMessageType message_type);
 
 std::string MoqtForwardingPreferenceToString(
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 3e95abd..3e6b0e5 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -40,6 +40,13 @@
   }
 }
 
+uint64_t SignedVarintUnserializedForm(uint64_t value) {
+  if (value & 0x01) {
+    return -(value >> 1);
+  }
+  return value >> 1;
+}
+
 }  // namespace
 
 // The buffering philosophy is complicated, to minimize copying. Here is an
@@ -208,6 +215,8 @@
       return ProcessTrackStatus(reader);
     case MoqtMessageType::kGoAway:
       return ProcessGoAway(reader);
+    case moqt::MoqtMessageType::kObjectAck:
+      return ProcessObjectAck(reader);
     default:
       ParseError("Unknown message type");
       return 0;
@@ -352,6 +361,14 @@
         }
         setup.path = value;
         break;
+      case MoqtSetupParameter::kSupportObjectAcks:
+        uint64_t flag;
+        if (!StringViewToVarInt(value, flag) || flag > 1) {
+          ParseError("Invalid kSupportObjectAcks value");
+          return 0;
+        }
+        setup.supports_object_ack = static_cast<bool>(flag);
+        break;
       default:
         // Skip over the parameter.
         break;
@@ -407,6 +424,14 @@
       case MoqtSetupParameter::kPath:
         ParseError("PATH parameter in SERVER_SETUP");
         return 0;
+      case MoqtSetupParameter::kSupportObjectAcks:
+        uint64_t flag;
+        if (!StringViewToVarInt(value, flag) || flag > 1) {
+          ParseError("Invalid kSupportObjectAcks value");
+          return 0;
+        }
+        setup.supports_object_ack = static_cast<bool>(flag);
+        break;
       default:
         // Skip over the parameter.
         break;
@@ -497,6 +522,20 @@
         }
         subscribe_request.parameters.authorization_info = value;
         break;
+      case MoqtTrackRequestParameter::kOackWindowSize: {
+        if (subscribe_request.parameters.object_ack_window.has_value()) {
+          ParseError("OACK_WINDOW_SIZE parameter appears twice in SUBSCRIBE");
+          return 0;
+        }
+        uint64_t raw_value;
+        if (!StringViewToVarInt(value, raw_value)) {
+          ParseError("OACK_WINDOW_SIZE parameter is not a valid varint");
+          return 0;
+        }
+        subscribe_request.parameters.object_ack_window =
+            quic::QuicTimeDelta::FromMicroseconds(raw_value);
+        break;
+      }
       default:
         // Skip over the parameter.
         break;
@@ -760,6 +799,21 @@
   return reader.PreviouslyReadPayload().length();
 }
 
+size_t MoqtParser::ProcessObjectAck(quic::QuicDataReader& reader) {
+  MoqtObjectAck object_ack;
+  uint64_t raw_delta;
+  if (!reader.ReadVarInt62(&object_ack.subscribe_id) ||
+      !reader.ReadVarInt62(&object_ack.group_id) ||
+      !reader.ReadVarInt62(&object_ack.object_id) ||
+      !reader.ReadVarInt62(&raw_delta)) {
+    return 0;
+  }
+  object_ack.delta_from_deadline = quic::QuicTimeDelta::FromMicroseconds(
+      SignedVarintUnserializedForm(raw_delta));
+  visitor_.OnObjectAckMessage(object_ack);
+  return reader.PreviouslyReadPayload().length();
+}
+
 // static
 size_t MoqtParser::ParseObjectHeader(quic::QuicDataReader& reader,
                                      MoqtObject& object, MoqtMessageType type) {
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index bada6cd..b10c22f 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -49,6 +49,7 @@
   virtual void OnUnannounceMessage(const MoqtUnannounce& message) = 0;
   virtual void OnTrackStatusMessage(const MoqtTrackStatus& message) = 0;
   virtual void OnGoAwayMessage(const MoqtGoAway& message) = 0;
+  virtual void OnObjectAckMessage(const MoqtObjectAck& message) = 0;
 
   virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0;
 };
@@ -104,6 +105,7 @@
   size_t ProcessUnannounce(quic::QuicDataReader& reader);
   size_t ProcessTrackStatus(quic::QuicDataReader& reader);
   size_t ProcessGoAway(quic::QuicDataReader& reader);
+  size_t ProcessObjectAck(quic::QuicDataReader& reader);
 
   static size_t ParseObjectHeader(quic::QuicDataReader& reader,
                                   MoqtObject& object, MoqtMessageType type);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index cf6b2e3..11f6f29 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -15,6 +15,7 @@
 
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_data_writer.h"
+#include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/test_tools/moqt_test_message.h"
 #include "quiche/quic/platform/api/quic_test.h"
@@ -59,6 +60,7 @@
     MoqtMessageType::kStreamHeaderTrack,
     MoqtMessageType::kStreamHeaderGroup,
     MoqtMessageType::kGoAway,
+    MoqtMessageType::kObjectAck,
 };
 
 }  // namespace
@@ -166,6 +168,9 @@
   void OnGoAwayMessage(const MoqtGoAway& message) override {
     OnControlMessage(message);
   }
+  void OnObjectAckMessage(const MoqtObjectAck& message) override {
+    OnControlMessage(message);
+  }
   void OnParsingError(MoqtError code, absl::string_view reason) override {
     QUIC_LOG(INFO) << "Parsing error: " << reason;
     parsing_error_ = reason;
@@ -1019,6 +1024,25 @@
             "SUBSCRIBE_UPDATE has end_object but no end_group");
 }
 
+TEST_F(MoqtMessageSpecificTest, ObjectAckNegativeDelta) {
+  MoqtParser parser(kRawQuic, visitor_);
+  char object_ack[] = {
+      0x71, 0x84,        // type
+      0x01, 0x10, 0x20,  // subscribe ID, group, object
+      0x40, 0x81,        // -0x40 time delta
+  };
+  parser.ProcessData(absl::string_view(object_ack, sizeof(object_ack)), false);
+  EXPECT_EQ(visitor_.parsing_error_, std::nullopt);
+  ASSERT_EQ(visitor_.messages_received_, 1);
+  MoqtObjectAck message =
+      std::get<MoqtObjectAck>(visitor_.last_message_.value());
+  EXPECT_EQ(message.subscribe_id, 0x01);
+  EXPECT_EQ(message.group_id, 0x10);
+  EXPECT_EQ(message.object_id, 0x20);
+  EXPECT_EQ(message.delta_from_deadline,
+            quic::QuicTimeDelta::FromMicroseconds(-0x40));
+}
+
 TEST_F(MoqtMessageSpecificTest, AllMessagesTogether) {
   char buffer[5000];
   MoqtParser parser(kRawQuic, visitor_);
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 356a3da..2d9d073 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -18,6 +18,7 @@
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
 #include "absl/container/node_hash_map.h"
+#include "absl/functional/bind_front.h"
 #include "absl/status/status.h"
 #include "absl/status/statusor.h"
 #include "absl/strings/str_cat.h"
@@ -144,6 +145,7 @@
   MoqtClientSetup setup = MoqtClientSetup{
       .supported_versions = std::vector<MoqtVersion>{parameters_.version},
       .role = MoqtRole::kPubSub,
+      .supports_object_ack = parameters_.support_object_acks,
   };
   if (!parameters_.using_webtrans) {
     setup.path = parameters_.path;
@@ -398,6 +400,17 @@
   } else {
     message.track_alias = next_remote_track_alias_++;
   }
+  if (SupportsObjectAck() && visitor != nullptr) {
+    // Since we do not expose subscribe IDs directly in the API, instead wrap
+    // the session and subscribe ID in a callback.
+    visitor->OnCanAckObjects(absl::bind_front(&MoqtSession::SendObjectAck, this,
+                                              message.subscribe_id));
+  } else {
+    QUICHE_DLOG_IF(WARNING, message.parameters.object_ack_window.has_value())
+        << "Attempting to set object_ack_window on a connection that does not "
+           "support it.";
+    message.parameters.object_ack_window = std::nullopt;
+  }
   SendControlMessage(framer_.SerializeSubscribe(message));
   QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE message for "
                   << message.track_namespace << ":" << message.track_name;
@@ -556,11 +569,13 @@
                                  absl::Hex(session_->parameters_.version)));
     return;
   }
+  session_->peer_supports_object_ack_ = message.supports_object_ack;
   QUICHE_DLOG(INFO) << ENDPOINT << "Received the SETUP message";
   if (session_->parameters_.perspective == Perspective::IS_SERVER) {
     MoqtServerSetup response;
     response.selected_version = session_->parameters_.version;
     response.role = MoqtRole::kPubSub;
+    response.supports_object_ack = session_->parameters_.support_object_acks;
     SendOrBufferMessage(session_->framer_.SerializeServerSetup(response));
     QUIC_DLOG(INFO) << ENDPOINT << "Sent the SETUP message";
   }
@@ -583,6 +598,7 @@
                                  absl::Hex(session_->parameters_.version)));
     return;
   }
+  session_->peer_supports_object_ack_ = message.supports_object_ack;
   QUIC_DLOG(INFO) << ENDPOINT << "Received the SETUP message";
   // TODO: handle role and path.
   std::move(session_->callbacks_.session_established_callback)();
@@ -630,8 +646,17 @@
   }
   MoqtDeliveryOrder delivery_order = (*track_publisher)->GetDeliveryOrder();
 
+  MoqtPublishingMonitorInterface* monitoring = nullptr;
+  auto monitoring_it =
+      session_->monitoring_interfaces_for_published_tracks_.find(track_name);
+  if (monitoring_it !=
+      session_->monitoring_interfaces_for_published_tracks_.end()) {
+    monitoring = monitoring_it->second;
+    session_->monitoring_interfaces_for_published_tracks_.erase(monitoring_it);
+  }
+
   auto subscription = std::make_unique<MoqtSession::PublishedSubscription>(
-      session_, *std::move(track_publisher), message);
+      session_, *std::move(track_publisher), message, monitoring);
   auto [it, success] = session_->published_subscriptions_.emplace(
       message.subscribe_id, std::move(subscription));
   if (!success) {
@@ -862,15 +887,21 @@
 
 MoqtSession::PublishedSubscription::PublishedSubscription(
     MoqtSession* session, std::shared_ptr<MoqtTrackPublisher> track_publisher,
-    const MoqtSubscribe& subscribe)
+    const MoqtSubscribe& subscribe,
+    MoqtPublishingMonitorInterface* monitoring_interface)
     : subscription_id_(subscribe.subscribe_id),
       session_(session),
       track_publisher_(track_publisher),
       track_alias_(subscribe.track_alias),
       window_(SubscribeMessageToWindow(subscribe, *track_publisher)),
       subscriber_priority_(subscribe.subscriber_priority),
-      subscriber_delivery_order_(subscribe.group_order) {
+      subscriber_delivery_order_(subscribe.group_order),
+      monitoring_interface_(monitoring_interface) {
   track_publisher->AddObjectListener(this);
+  if (monitoring_interface_ != nullptr) {
+    monitoring_interface_->OnObjectAckSupportKnown(
+        subscribe.parameters.object_ack_window.has_value());
+  }
   QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
                   << subscribe.track_namespace << ":" << subscribe.track_name;
 }
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index c2f2aaa..8ccbdd6 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -15,6 +15,7 @@
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
 #include "absl/strings/string_view.h"
+#include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_framer.h"
 #include "quiche/quic/moqt/moqt_messages.h"
@@ -65,6 +66,17 @@
       DefaultIncomingAnnounceCallback;
 };
 
+// MoqtPublishingMonitorInterface allows a publisher monitor the delivery
+// progress for a single individual subscriber.
+class MoqtPublishingMonitorInterface {
+ public:
+  virtual ~MoqtPublishingMonitorInterface() = default;
+
+  virtual void OnObjectAckSupportKnown(bool supported) = 0;
+  virtual void OnObjectAckReceived(uint64_t group_id, uint64_t object_id,
+                                   quic::QuicTimeDelta delta_from_deadline) = 0;
+};
+
 class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor {
  public:
   MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters,
@@ -124,6 +136,21 @@
   MoqtSessionCallbacks& callbacks() { return callbacks_; }
   MoqtPublisher* publisher() { return publisher_; }
   void set_publisher(MoqtPublisher* publisher) { publisher_ = publisher; }
+  bool support_object_acks() const { return parameters_.support_object_acks; }
+  void set_support_object_acks(bool value) {
+    QUICHE_DCHECK(!control_stream_.has_value())
+        << "support_object_acks needs to be set before handshake";
+    parameters_.support_object_acks = value;
+  }
+
+  // Assigns a monitoring interface for a specific track subscription that is
+  // expected to happen in the future.  `interface` will be only used for a
+  // single subscription, and it must outlive the session.
+  void SetMonitoringInterfaceForTrack(
+      FullTrackName track, MoqtPublishingMonitorInterface* interface) {
+    monitoring_interfaces_for_published_tracks_.emplace(std::move(track),
+                                                        interface);
+  }
 
   void Close() { session_->CloseSession(0, "Application closed"); }
 
@@ -165,6 +192,14 @@
     void OnUnannounceMessage(const MoqtUnannounce& /*message*/) override {}
     void OnTrackStatusMessage(const MoqtTrackStatus& message) override {}
     void OnGoAwayMessage(const MoqtGoAway& /*message*/) override {}
+    void OnObjectAckMessage(const MoqtObjectAck& message) override {
+      auto subscription_it =
+          session_->published_subscriptions_.find(message.subscribe_id);
+      if (subscription_it == session_->published_subscriptions_.end()) {
+        return;
+      }
+      subscription_it->second->ProcessObjectAck(message);
+    }
     void OnParsingError(MoqtError error_code,
                         absl::string_view reason) override;
 
@@ -257,6 +292,9 @@
     void OnGoAwayMessage(const MoqtGoAway&) override {
       OnControlMessageReceived();
     }
+    void OnObjectAckMessage(const MoqtObjectAck&) override {
+      OnControlMessageReceived();
+    }
     void OnParsingError(MoqtError error_code,
                         absl::string_view reason) override;
 
@@ -282,7 +320,8 @@
     explicit PublishedSubscription(
         MoqtSession* session,
         std::shared_ptr<MoqtTrackPublisher> track_publisher,
-        const MoqtSubscribe& subscribe);
+        const MoqtSubscribe& subscribe,
+        MoqtPublishingMonitorInterface* monitoring_interface);
     ~PublishedSubscription();
 
     PublishedSubscription(const PublishedSubscription&) = delete;
@@ -299,6 +338,13 @@
     }
 
     void OnNewObjectAvailable(FullSequence sequence) override;
+    void ProcessObjectAck(const MoqtObjectAck& message) {
+      if (monitoring_interface_ == nullptr) {
+        return;
+      }
+      monitoring_interface_->OnObjectAckReceived(
+          message.group_id, message.object_id, message.delta_from_deadline);
+    }
 
     // Creates streams for all objects that are currently in the track's object
     // cache and match the subscription window.  This is in some sense similar
@@ -336,6 +382,7 @@
     SubscribeWindow window_;
     MoqtPriority subscriber_priority_;
     std::optional<MoqtDeliveryOrder> subscriber_delivery_order_;
+    MoqtPublishingMonitorInterface* monitoring_interface_;
     // Largest sequence number ever sent via this subscription.
     std::optional<FullSequence> largest_sent_;
     // Should be almost always accessed via `stream_map()`.
@@ -423,12 +470,33 @@
   std::pair<FullTrackName, RemoteTrack::Visitor*> TrackPropertiesFromAlias(
       const MoqtObject& message);
 
+  // Sends an OBJECT_ACK message for a specific subscribe ID.
+  void SendObjectAck(uint64_t subscribe_id, uint64_t group_id,
+                     uint64_t object_id,
+                     quic::QuicTimeDelta delta_from_deadline) {
+    if (!SupportsObjectAck()) {
+      return;
+    }
+    MoqtObjectAck ack;
+    ack.subscribe_id = subscribe_id;
+    ack.group_id = group_id;
+    ack.object_id = object_id;
+    ack.delta_from_deadline = delta_from_deadline;
+    SendControlMessage(framer_.SerializeObjectAck(ack));
+  }
+
+  // Indicates if OBJECT_ACK is supported by both sides.
+  bool SupportsObjectAck() const {
+    return parameters_.support_object_acks && peer_supports_object_ack_;
+  }
+
   webtransport::Session* session_;
   MoqtSessionParameters parameters_;
   MoqtSessionCallbacks callbacks_;
   MoqtFramer framer_;
 
   std::optional<webtransport::StreamId> control_stream_;
+  bool peer_supports_object_ack_ = false;
   std::string error_;
 
   // All the tracks the session is subscribed to, indexed by track_alias.
@@ -468,6 +536,10 @@
   absl::flat_hash_map<uint64_t, ActiveSubscribe> active_subscribes_;
   uint64_t next_subscribe_id_ = 0;
 
+  // Monitoring interfaces for expected incoming subscriptions.
+  absl::flat_hash_map<FullTrackName, MoqtPublishingMonitorInterface*>
+      monitoring_interfaces_for_published_tracks_;
+
   // Indexed by track namespace.
   absl::flat_hash_map<std::string, MoqtOutgoingAnnounceCallback>
       pending_outgoing_announces_;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index c3122cf..9547cf4 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -47,14 +47,6 @@
 constexpr webtransport::StreamId kIncomingUniStreamId = 15;
 constexpr webtransport::StreamId kOutgoingUniStreamId = 14;
 
-constexpr MoqtSessionParameters default_parameters = {
-    /*version=*/MoqtVersion::kDraft05,
-    /*perspective=*/quic::Perspective::IS_CLIENT,
-    /*using_webtrans=*/true,
-    /*path=*/std::string(),
-    /*deliver_partial_objects=*/false,
-};
-
 // Returns nullopt if there is not enough in |message| to extract a type
 static std::optional<MoqtMessageType> ExtractMessageType(
     const absl::string_view message) {
@@ -141,7 +133,8 @@
     subscribe.subscriber_priority = 0x80;
     session->published_subscriptions_.emplace(
         subscribe_id, std::make_unique<MoqtSession::PublishedSubscription>(
-                          session, std::move(publisher), subscribe));
+                          session, std::move(publisher), subscribe,
+                          /*monitoring_interface=*/nullptr));
     return session->published_subscriptions_[subscribe_id].get();
   }
 
@@ -157,7 +150,8 @@
 class MoqtSessionTest : public quic::test::QuicTest {
  public:
   MoqtSessionTest()
-      : session_(&mock_session_, default_parameters,
+      : session_(&mock_session_,
+                 MoqtSessionParameters(quic::Perspective::IS_CLIENT),
                  session_callbacks_.AsSessionCallbacks()) {
     session_.set_publisher(&publisher_);
   }
@@ -215,15 +209,9 @@
 }
 
 TEST_F(MoqtSessionTest, OnClientSetup) {
-  MoqtSessionParameters server_parameters = {
-      /*version=*/MoqtVersion::kDraft05,
-      /*perspective=*/quic::Perspective::IS_SERVER,
-      /*using_webtrans=*/true,
-      /*path=*/"",
-      /*deliver_partial_objects=*/false,
-  };
-  MoqtSession server_session(&mock_session_, server_parameters,
-                             session_callbacks_.AsSessionCallbacks());
+  MoqtSession server_session(
+      &mock_session_, MoqtSessionParameters(quic::Perspective::IS_SERVER),
+      session_callbacks_.AsSessionCallbacks());
   webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream);
@@ -603,13 +591,8 @@
 }
 
 TEST_F(MoqtSessionTest, IncomingPartialObjectNoBuffer) {
-  MoqtSessionParameters parameters = {
-      /*version=*/MoqtVersion::kDraft05,
-      /*perspective=*/quic::Perspective::IS_CLIENT,
-      /*using_webtrans=*/true,
-      /*path=*/"",
-      /*deliver_partial_objects=*/true,
-  };
+  MoqtSessionParameters parameters(quic::Perspective::IS_CLIENT);
+  parameters.deliver_partial_objects = true;
   MoqtSession session(&mock_session_, parameters,
                       session_callbacks_.AsSessionCallbacks());
   MockRemoteTrackVisitor visitor_;
@@ -1058,15 +1041,9 @@
 }
 
 TEST_F(MoqtSessionTest, OneBidirectionalStreamServer) {
-  MoqtSessionParameters server_parameters = {
-      /*version=*/MoqtVersion::kDraft05,
-      /*perspective=*/quic::Perspective::IS_SERVER,
-      /*using_webtrans=*/true,
-      /*path=*/"",
-      /*deliver_partial_objects=*/false,
-  };
-  MoqtSession server_session(&mock_session_, server_parameters,
-                             session_callbacks_.AsSessionCallbacks());
+  MoqtSession server_session(
+      &mock_session_, MoqtSessionParameters(quic::Perspective::IS_SERVER),
+      session_callbacks_.AsSessionCallbacks());
   webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream);
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 37636b7..5f3a75c 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -9,11 +9,17 @@
 #include <optional>
 
 #include "absl/strings/string_view.h"
+#include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/common/quiche_callbacks.h"
 
 namespace moqt {
 
+using MoqtObjectAckFunction =
+    quiche::MultiUseCallback<void(uint64_t group_id, uint64_t object_id,
+                                  quic::QuicTimeDelta delta_from_deadline)>;
+
 // A track on the peer to which the session has subscribed.
 class RemoteTrack {
  public:
@@ -26,6 +32,11 @@
     virtual void OnReply(
         const FullTrackName& full_track_name,
         std::optional<absl::string_view> error_reason_phrase) = 0;
+    // Called when the subscription process is far enough that it is possible to
+    // send OBJECT_ACK messages; provides a callback to do so. The callback is
+    // valid for as long as the session is valid.
+    virtual void OnCanAckObjects(MoqtObjectAckFunction ack_function) = 0;
+    // Called when an object fragment (or an entire object) is received.
     virtual void OnObjectFragment(
         const FullTrackName& full_track_name, uint64_t group_sequence,
         uint64_t object_sequence, MoqtPriority publisher_priority,
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc b/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc
index 20b42ee..d181d0b 100644
--- a/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc
+++ b/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc
@@ -5,7 +5,6 @@
 #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
 
 #include <string>
-#include <utility>
 
 #include "quiche/quic/core/crypto/quic_compressed_certs_cache.h"
 #include "quiche/quic/core/crypto/quic_crypto_server_config.h"
@@ -20,6 +19,15 @@
 
 namespace moqt::test {
 
+namespace {
+MoqtSessionParameters CreateParameters(quic::Perspective perspective,
+                                       MoqtVersion version) {
+  MoqtSessionParameters parameters(perspective, "");
+  parameters.version = version;
+  return parameters;
+}
+}  // namespace
+
 MoqtClientEndpoint::MoqtClientEndpoint(quic::simulator::Simulator* simulator,
                                        const std::string& name,
                                        const std::string& peer_name,
@@ -31,14 +39,9 @@
       quic_session_(connection_.get(), false, nullptr, quic::QuicConfig(),
                     "test.example.com", 443, "moqt", &session_,
                     /*visitor_owned=*/false, nullptr, &crypto_config_),
-      session_(
-          &quic_session_,
-          MoqtSessionParameters{.version = version,
-                                .perspective = quic::Perspective::IS_CLIENT,
-                                .using_webtrans = false,
-                                .path = "",
-                                .deliver_partial_objects = false},
-          MoqtSessionCallbacks()) {
+      session_(&quic_session_,
+               CreateParameters(quic::Perspective::IS_CLIENT, version),
+               MoqtSessionCallbacks()) {
   quic_session_.Initialize();
 }
 
@@ -59,14 +62,9 @@
                     "moqt", &session_,
                     /*visitor_owned=*/false, nullptr, &crypto_config_,
                     &compressed_certs_cache_),
-      session_(
-          &quic_session_,
-          MoqtSessionParameters{.version = version,
-                                .perspective = quic::Perspective::IS_SERVER,
-                                .using_webtrans = false,
-                                .path = "",
-                                .deliver_partial_objects = false},
-          MoqtSessionCallbacks()) {
+      session_(&quic_session_,
+               CreateParameters(quic::Perspective::IS_SERVER, version),
+               MoqtSessionCallbacks()) {
   quic_session_.Initialize();
 }
 
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 748252c..c6feb68 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -34,12 +34,12 @@
   virtual ~TestMessageBase() = default;
   MoqtMessageType message_type() const { return message_type_; }
 
-  typedef absl::variant<MoqtClientSetup, MoqtServerSetup, MoqtObject,
-                        MoqtSubscribe, MoqtSubscribeOk, MoqtSubscribeError,
-                        MoqtUnsubscribe, MoqtSubscribeDone, MoqtSubscribeUpdate,
-                        MoqtAnnounce, MoqtAnnounceOk, MoqtAnnounceError,
-                        MoqtAnnounceCancel, MoqtTrackStatusRequest,
-                        MoqtUnannounce, MoqtTrackStatus, MoqtGoAway>
+  typedef absl::variant<
+      MoqtClientSetup, MoqtServerSetup, MoqtObject, MoqtSubscribe,
+      MoqtSubscribeOk, MoqtSubscribeError, MoqtUnsubscribe, MoqtSubscribeDone,
+      MoqtSubscribeUpdate, MoqtAnnounce, MoqtAnnounceOk, MoqtAnnounceError,
+      MoqtAnnounceCancel, MoqtTrackStatusRequest, MoqtUnannounce,
+      MoqtTrackStatus, MoqtGoAway, MoqtObjectAck>
       MessageStructuredData;
 
   // The total actual size of the message.
@@ -492,7 +492,7 @@
       /*start_object=*/1,
       /*end_group=*/std::nullopt,
       /*end_object=*/std::nullopt,
-      /*authorization_info=*/MoqtSubscribeParameters{"bar"},
+      /*authorization_info=*/MoqtSubscribeParameters{"bar", std::nullopt},
   };
 };
 
@@ -1048,6 +1048,54 @@
   };
 };
 
+class QUICHE_NO_EXPORT ObjectAckMessage : public TestMessageBase {
+ public:
+  ObjectAckMessage() : TestMessageBase(MoqtMessageType::kObjectAck) {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+  }
+
+  bool EqualFieldValues(MessageStructuredData& values) const override {
+    auto cast = std::get<MoqtObjectAck>(values);
+    if (cast.subscribe_id != object_ack_.subscribe_id) {
+      QUIC_LOG(INFO) << "OBJECT_ACK subscribe ID mismatch";
+      return false;
+    }
+    if (cast.group_id != object_ack_.group_id) {
+      QUIC_LOG(INFO) << "OBJECT_ACK group ID mismatch";
+      return false;
+    }
+    if (cast.object_id != object_ack_.object_id) {
+      QUIC_LOG(INFO) << "OBJECT_ACK object ID mismatch";
+      return false;
+    }
+    if (cast.delta_from_deadline != object_ack_.delta_from_deadline) {
+      QUIC_LOG(INFO) << "OBJECT_ACK delta from deadline mismatch";
+      return false;
+    }
+    return true;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvvvv"); }
+
+  MessageStructuredData structured_data() const override {
+    return TestMessageBase::MessageStructuredData(object_ack_);
+  }
+
+ private:
+  uint8_t raw_packet_[6] = {
+      0x71, 0x84,        // type
+      0x01, 0x10, 0x20,  // subscribe ID, group, object
+      0x20,              // 0x10 time delta
+  };
+
+  MoqtObjectAck object_ack_ = {
+      /*subscribe_id=*/0x01,
+      /*group_id=*/0x10,
+      /*object_id=*/0x20,
+      /*delta_from_deadline=*/quic::QuicTimeDelta::FromMicroseconds(0x10),
+  };
+};
+
 // Factory function for test messages.
 static inline std::unique_ptr<TestMessageBase> CreateTestMessage(
     MoqtMessageType message_type, bool is_webtrans) {
@@ -1084,6 +1132,8 @@
       return std::make_unique<TrackStatusMessage>();
     case MoqtMessageType::kGoAway:
       return std::make_unique<GoAwayMessage>();
+    case MoqtMessageType::kObjectAck:
+      return std::make_unique<ObjectAckMessage>();
     case MoqtMessageType::kClientSetup:
       return std::make_unique<ClientSetupMessage>(is_webtrans);
     case MoqtMessageType::kServerSetup:
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index 8d33ca1..f5bc8df 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -189,7 +189,8 @@
   FullTrackName catalog_name = chat_strings_->GetCatalogName();
   if (!session_->SubscribeCurrentGroup(
           catalog_name.track_namespace, catalog_name.track_name,
-          remote_track_visitor_.get(), MoqtSubscribeParameters{username_})) {
+          remote_track_visitor_.get(),
+          MoqtSubscribeParameters{username_, std::nullopt})) {
     std::cout << "Failed to get catalog\n";
     return false;
   }
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h
index dad1533..0b3ffc7 100644
--- a/quiche/quic/moqt/tools/chat_client.h
+++ b/quiche/quic/moqt/tools/chat_client.h
@@ -95,6 +95,8 @@
     void OnReply(const moqt::FullTrackName& full_track_name,
                  std::optional<absl::string_view> reason_phrase) override;
 
+    void OnCanAckObjects(MoqtObjectAckFunction) override {}
+
     void OnObjectFragment(const moqt::FullTrackName& full_track_name,
                           uint64_t group_sequence, uint64_t object_sequence,
                           moqt::MoqtPriority publisher_priority,
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
index 60a1557..a399af9 100644
--- a/quiche/quic/moqt/tools/chat_server.h
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -40,7 +40,7 @@
     explicit RemoteTrackVisitor(ChatServer* server);
     void OnReply(const moqt::FullTrackName& full_track_name,
                  std::optional<absl::string_view> reason_phrase) override;
-
+    void OnCanAckObjects(MoqtObjectAckFunction) override {}
     void OnObjectFragment(
         const moqt::FullTrackName& full_track_name, uint64_t group_sequence,
         uint64_t object_sequence, moqt::MoqtPriority /*publisher_priority*/,
diff --git a/quiche/quic/moqt/tools/moqt_client.cc b/quiche/quic/moqt/tools/moqt_client.cc
index 2464c73..8457c99 100644
--- a/quiche/quic/moqt/tools/moqt_client.cc
+++ b/quiche/quic/moqt/tools/moqt_client.cc
@@ -88,12 +88,7 @@
     return absl::InternalError("Failed to initialize WebTransport session");
   }
 
-  MoqtSessionParameters parameters;
-  parameters.version = MoqtVersion::kDraft05;
-  parameters.perspective = quic::Perspective::IS_CLIENT,
-  parameters.using_webtrans = true;
-  parameters.path = "";
-  parameters.deliver_partial_objects = false;
+  MoqtSessionParameters parameters(quic::Perspective::IS_CLIENT);
 
   // Ensure that we never have a dangling pointer to the session.
   MoqtSessionDeletedCallback deleted_callback =
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
index 01d010d..a66eb8d 100644
--- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -165,6 +165,8 @@
       }
     }
 
+    void OnCanAckObjects(MoqtObjectAckFunction) override {}
+
     void OnObjectFragment(const FullTrackName& full_track_name,
                           uint64_t group_sequence, uint64_t object_sequence,
                           MoqtPriority /*publisher_priority*/,
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index a5475a3..1e4aebf 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -12,6 +12,7 @@
 
 #include "absl/status/statusor.h"
 #include "absl/strings/string_view.h"
+#include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
@@ -77,6 +78,8 @@
               (const FullTrackName& full_track_name,
                std::optional<absl::string_view> error_reason_phrase),
               (override));
+  MOCK_METHOD(void, OnCanAckObjects, (MoqtObjectAckFunction ack_function),
+              (override));
   MOCK_METHOD(void, OnObjectFragment,
               (const FullTrackName& full_track_name, uint64_t group_sequence,
                uint64_t object_sequence, MoqtPriority publisher_priority,
@@ -86,6 +89,15 @@
               (override));
 };
 
+class MockPublishingMonitorInterface : public MoqtPublishingMonitorInterface {
+ public:
+  MOCK_METHOD(void, OnObjectAckSupportKnown, (bool supported), (override));
+  MOCK_METHOD(void, OnObjectAckReceived,
+              (uint64_t group_id, uint64_t object_id,
+               quic::QuicTimeDelta delta_from_deadline),
+              (override));
+};
+
 }  // namespace moqt::test
 
 #endif  // QUICHE_QUIC_MOQT_TOOLS_MOQT_MOCK_VISITOR_H_
diff --git a/quiche/quic/moqt/tools/moqt_server.cc b/quiche/quic/moqt/tools/moqt_server.cc
index ac1748a..32b70ed 100644
--- a/quiche/quic/moqt/tools/moqt_server.cc
+++ b/quiche/quic/moqt/tools/moqt_server.cc
@@ -29,12 +29,7 @@
     if (!configurator.ok()) {
       return configurator.status();
     }
-    MoqtSessionParameters parameters;
-    parameters.perspective = quic::Perspective::IS_SERVER;
-    parameters.path = path;
-    parameters.using_webtrans = true;
-    parameters.version = MoqtVersion::kDraft05;
-    parameters.deliver_partial_objects = false;
+    MoqtSessionParameters parameters(quic::Perspective::IS_SERVER);
     auto moqt_session = std::make_unique<MoqtSession>(session, parameters);
     std::move (*configurator)(moqt_session.get());
     return moqt_session;
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index 70c64b5..b3369cb 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -171,6 +171,8 @@
     QUICHE_CHECK(!error_reason_phrase.has_value()) << *error_reason_phrase;
   }
 
+  void OnCanAckObjects(MoqtObjectAckFunction) override {}
+
   void OnObjectFragment(const FullTrackName& full_track_name,
                         uint64_t group_sequence, uint64_t object_sequence,
                         MoqtPriority /*publisher_priority*/,