Implement MoqtOutgoingQueue.

This lets us have some "backlog buffer" in-memory, primarily for use in ingestion client.

PiperOrigin-RevId: 627368811
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 2e6cd92..465be42 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1494,6 +1494,7 @@
 moqt_hdrs = [
     "quic/moqt/moqt_framer.h",
     "quic/moqt/moqt_messages.h",
+    "quic/moqt/moqt_outgoing_queue.h",
     "quic/moqt/moqt_parser.h",
     "quic/moqt/moqt_session.h",
     "quic/moqt/moqt_subscribe_windows.h",
@@ -1508,6 +1509,8 @@
     "quic/moqt/moqt_framer_test.cc",
     "quic/moqt/moqt_integration_test.cc",
     "quic/moqt/moqt_messages.cc",
+    "quic/moqt/moqt_outgoing_queue.cc",
+    "quic/moqt/moqt_outgoing_queue_test.cc",
     "quic/moqt/moqt_parser.cc",
     "quic/moqt/moqt_parser_test.cc",
     "quic/moqt/moqt_session.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 5105182..1bb50dd 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1498,6 +1498,7 @@
 moqt_hdrs = [
     "src/quiche/quic/moqt/moqt_framer.h",
     "src/quiche/quic/moqt/moqt_messages.h",
+    "src/quiche/quic/moqt/moqt_outgoing_queue.h",
     "src/quiche/quic/moqt/moqt_parser.h",
     "src/quiche/quic/moqt/moqt_session.h",
     "src/quiche/quic/moqt/moqt_subscribe_windows.h",
@@ -1512,6 +1513,8 @@
     "src/quiche/quic/moqt/moqt_framer_test.cc",
     "src/quiche/quic/moqt/moqt_integration_test.cc",
     "src/quiche/quic/moqt/moqt_messages.cc",
+    "src/quiche/quic/moqt/moqt_outgoing_queue.cc",
+    "src/quiche/quic/moqt/moqt_outgoing_queue_test.cc",
     "src/quiche/quic/moqt/moqt_parser.cc",
     "src/quiche/quic/moqt/moqt_parser_test.cc",
     "src/quiche/quic/moqt/moqt_session.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 7b749af..fb45e82 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1497,6 +1497,7 @@
   "moqt_hdrs": [
     "quiche/quic/moqt/moqt_framer.h",
     "quiche/quic/moqt/moqt_messages.h",
+    "quiche/quic/moqt/moqt_outgoing_queue.h",
     "quiche/quic/moqt/moqt_parser.h",
     "quiche/quic/moqt/moqt_session.h",
     "quiche/quic/moqt/moqt_subscribe_windows.h",
@@ -1511,6 +1512,8 @@
     "quiche/quic/moqt/moqt_framer_test.cc",
     "quiche/quic/moqt/moqt_integration_test.cc",
     "quiche/quic/moqt/moqt_messages.cc",
+    "quiche/quic/moqt/moqt_outgoing_queue.cc",
+    "quiche/quic/moqt/moqt_outgoing_queue_test.cc",
     "quiche/quic/moqt/moqt_parser.cc",
     "quiche/quic/moqt/moqt_parser_test.cc",
     "quiche/quic/moqt/moqt_session.cc",
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 8691be6..56286fc 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -16,10 +16,12 @@
 #include "quiche/quic/core/quic_generic_session.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
 #include "quiche/quic/moqt/moqt_session.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
 #include "quiche/quic/test_tools/crypto_test_utils.h"
+#include "quiche/quic/test_tools/quic_test_utils.h"
 #include "quiche/quic/test_tools/simulator/simulator.h"
 #include "quiche/quic/test_tools/simulator/test_harness.h"
 #include "quiche/common/platform/api/quiche_test.h"
@@ -29,6 +31,7 @@
 namespace {
 
 using ::quic::simulator::Simulator;
+using ::quic::test::MemSliceFromString;
 using ::testing::_;
 using ::testing::Assign;
 using ::testing::Return;
@@ -241,6 +244,48 @@
   EXPECT_TRUE(success);
 }
 
+TEST_F(MoqtIntegrationTest, AnnounceSuccessSendDatainResponse) {
+  EstablishSession();
+
+  // Set up the server to subscribe to "data" track for the namespace announce
+  // it receives.
+  MockRemoteTrackVisitor server_visitor;
+  EXPECT_CALL(server_->callbacks().incoming_announce_callback, Call(_))
+      .WillOnce([&](absl::string_view track_namespace) {
+        server_->session()->SubscribeAbsolute(
+            track_namespace, "data", /*start_group=*/0,
+            /*start_object=*/0, &server_visitor);
+        return std::optional<MoqtAnnounceErrorReason>();
+      });
+
+  MoqtOutgoingQueue queue(client_->session(), FullTrackName{"test", "data"});
+  client_->session()->AddLocalTrack(FullTrackName{"test", "data"},
+                                    MoqtForwardingPreference::kGroup, &queue);
+  queue.AddObject(MemSliceFromString("object data"), /*key=*/true);
+  client_->session()->Announce(
+      "test", [](absl::string_view, std::optional<MoqtAnnounceErrorReason>) {});
+
+  bool received_object = false;
+  EXPECT_CALL(server_visitor, OnObjectFragment(_, _, _, _, _, _, _))
+      .WillOnce([&](const FullTrackName& full_track_name,
+                    uint64_t group_sequence, uint64_t object_sequence,
+                    uint64_t /*object_send_order*/,
+                    MoqtForwardingPreference forwarding_preference,
+                    absl::string_view object, bool end_of_message) {
+        EXPECT_EQ(full_track_name.track_namespace, "test");
+        EXPECT_EQ(full_track_name.track_name, "data");
+        EXPECT_EQ(group_sequence, 0u);
+        EXPECT_EQ(object_sequence, 0u);
+        EXPECT_EQ(forwarding_preference, MoqtForwardingPreference::kGroup);
+        EXPECT_EQ(object, "object data");
+        EXPECT_TRUE(end_of_message);
+        received_object = true;
+      });
+  bool success = test_harness_.RunUntilWithDefaultTimeout(
+      [&]() { return received_object; });
+  EXPECT_TRUE(success);
+}
+
 TEST_F(MoqtIntegrationTest, AnnounceFailure) {
   EstablishSession();
   testing::MockFunction<void(
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
new file mode 100644
index 0000000..84c2a85
--- /dev/null
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -0,0 +1,91 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <optional>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace moqt {
+
+void MoqtOutgoingQueue::AddObject(quiche::QuicheMemSlice payload, bool key) {
+  if (queue_.empty() && !key) {
+    QUICHE_BUG(MoqtOutgoingQueue_AddObject_first_object_not_key)
+        << "The first object ever added to the queue must have the \"key\" "
+           "flag.";
+    return;
+  }
+
+  if (key) {
+    if (!queue_.empty()) {
+      CloseStreamForGroup(current_group_id_);
+    }
+
+    if (queue_.size() == kMaxQueuedGroups) {
+      queue_.erase(queue_.begin());
+    }
+    queue_.emplace_back();
+    ++current_group_id_;
+  }
+
+  absl::string_view payload_view = payload.AsStringView();
+  uint64_t object_id = queue_.back().size();
+  queue_.back().push_back(std::move(payload));
+  PublishObject(current_group_id_, object_id, payload_view,
+                /*close_stream=*/false);
+}
+
+absl::StatusOr<MoqtOutgoingQueue::PublishPastObjectsCallback>
+MoqtOutgoingQueue::OnSubscribeForPast(const SubscribeWindow& window) {
+  QUICHE_BUG_IF(
+      MoqtOutgoingQueue_requires_kGroup,
+      window.forwarding_preference() != MoqtForwardingPreference::kGroup)
+      << "MoqtOutgoingQueue currently only supports kGroup.";
+  if (window.HasEnd()) {
+    // TODO: support this (this would require changing the logic for closing the
+    // stream below).
+    return absl::UnimplementedError("SUBSCRIBEs with an end are not supported");
+  }
+  return [this, &window]() {
+    for (size_t i = 0; i < queue_.size(); ++i) {
+      const uint64_t group_id = first_group_in_queue() + i;
+      const Group& group = queue_[i];
+      const bool is_last_group = (i == queue_.size() - 1);
+      for (size_t j = 0; j < group.size(); ++j) {
+        const FullSequence sequence{group_id, j};
+        if (!window.InWindow(sequence)) {
+          continue;
+        }
+        const bool is_last_object = (j == group.size() - 1);
+        const bool should_close_stream = !is_last_group && is_last_object;
+        PublishObject(group_id, j, group[j].AsStringView(),
+                      should_close_stream);
+      }
+    }
+  };
+}
+
+void MoqtOutgoingQueue::CloseStreamForGroup(uint64_t group_id) {
+  session_->CloseObjectStream(track_, group_id);
+}
+
+void MoqtOutgoingQueue::PublishObject(uint64_t group_id, uint64_t object_id,
+                                      absl::string_view payload,
+                                      bool close_stream) {
+  session_->PublishObject(track_, group_id, object_id,
+                          /*object_send_order=*/group_id, payload,
+                          close_stream);
+}
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
new file mode 100644
index 0000000..0966b57
--- /dev/null
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -0,0 +1,79 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_TOOLS_MOQT_OUTGOING_QUEUE_H_
+#define QUICHE_QUIC_MOQT_TOOLS_MOQT_OUTGOING_QUEUE_H_
+
+#include <cstddef>
+#include <cstdint>
+#include <optional>
+#include <utility>
+#include <vector>
+
+#include "absl/container/inlined_vector.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace moqt {
+
+// MoqtOutgoingQueue lets the user send objects by providing the contents of the
+// object and a keyframe flag.  The queue will automatically number objects and
+// groups, and maintain a buffer of three most recent groups that will be
+// provided to subscribers automatically.
+//
+// This class is primarily meant to be used by publishers to buffer the frames
+// that they produce. Limitations of this class:
+// - It currently only works with the forwarding preference of kGroup.
+// - It only supports a single session.
+// - Everything is sent in order that it is queued.
+class MoqtOutgoingQueue : public LocalTrack::Visitor {
+ public:
+  explicit MoqtOutgoingQueue(MoqtSession* session, FullTrackName track)
+      : session_(session), track_(std::move(track)) {}
+
+  MoqtOutgoingQueue(const MoqtOutgoingQueue&) = delete;
+  MoqtOutgoingQueue(MoqtOutgoingQueue&&) = default;
+  MoqtOutgoingQueue& operator=(const MoqtOutgoingQueue&) = delete;
+  MoqtOutgoingQueue& operator=(MoqtOutgoingQueue&&) = default;
+
+  // If `key` is true, the object is placed into a new group, and the previous
+  // group is closed. The first object ever sent MUST have `key` set to true.
+  void AddObject(quiche::QuicheMemSlice payload, bool key);
+
+  // LocalTrack::Visitor implementation.
+  absl::StatusOr<PublishPastObjectsCallback> OnSubscribeForPast(
+      const SubscribeWindow& window) override;
+
+ protected:
+  // Interface to MoqtSession; can be mocked out for tests.
+  virtual void CloseStreamForGroup(uint64_t group_id);
+  virtual void PublishObject(uint64_t group_id, uint64_t object_id,
+                             absl::string_view payload, bool close_stream);
+
+ private:
+  // The number of recent groups to keep around for newly joined subscribers.
+  static constexpr size_t kMaxQueuedGroups = 3;
+
+  using Object = quiche::QuicheMemSlice;
+  using Group = std::vector<Object>;
+
+  // The number of the oldest group available.
+  uint64_t first_group_in_queue() {
+    return current_group_id_ - queue_.size() + 1;
+  }
+
+  MoqtSession* session_;  // Not owned.
+  FullTrackName track_;
+  absl::InlinedVector<Group, kMaxQueuedGroups> queue_;
+  uint64_t current_group_id_ = -1;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_TOOLS_MOQT_OUTGOING_QUEUE_H_
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
new file mode 100644
index 0000000..7e6662e
--- /dev/null
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -0,0 +1,220 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
+
+#include <cstdint>
+#include <utility>
+
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/test_tools/quic_test_utils.h"
+#include "quiche/common/platform/api/quiche_expect_bug.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_test.h"
+
+namespace moqt {
+namespace {
+
+using ::quic::test::MemSliceFromString;
+
+class TestMoqtOutgoingQueue : public MoqtOutgoingQueue {
+ public:
+  TestMoqtOutgoingQueue()
+      : MoqtOutgoingQueue(nullptr, FullTrackName{"test", "track"}) {}
+
+  void CallSubscribeForPast(const SubscribeWindow& window) {
+    absl::StatusOr<PublishPastObjectsCallback> callback =
+        OnSubscribeForPast(window);
+    QUICHE_CHECK_OK(callback.status());
+    (*std::move(callback))();
+  }
+
+  MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), (override));
+  MOCK_METHOD(void, PublishObject,
+              (uint64_t group_id, uint64_t object_id, absl::string_view payload,
+               bool close_stream),
+              (override));
+};
+
+TEST(MoqtOutgoingQueue, FirstObjectNotKeyframe) {
+  TestMoqtOutgoingQueue queue;
+  EXPECT_QUICHE_BUG(queue.AddObject(MemSliceFromString("a"), false),
+                    "The first object");
+}
+
+TEST(MoqtOutgoingQueue, SingleGroup) {
+  TestMoqtOutgoingQueue queue;
+  {
+    testing::InSequence seq;
+    EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+    EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+    EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+  }
+  queue.AddObject(MemSliceFromString("a"), true);
+  queue.AddObject(MemSliceFromString("b"), false);
+  queue.AddObject(MemSliceFromString("c"), false);
+}
+
+TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromZero) {
+  TestMoqtOutgoingQueue queue;
+  {
+    testing::InSequence seq;
+    EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+    EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+    EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+
+    EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+    EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+    EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+  }
+  queue.AddObject(MemSliceFromString("a"), true);
+  queue.AddObject(MemSliceFromString("b"), false);
+  queue.AddObject(MemSliceFromString("c"), false);
+  queue.CallSubscribeForPast(
+      SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 0));
+}
+
+TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromMidGroup) {
+  TestMoqtOutgoingQueue queue;
+  {
+    testing::InSequence seq;
+    EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+    EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+    EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+
+    EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+    EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+  }
+  queue.AddObject(MemSliceFromString("a"), true);
+  queue.AddObject(MemSliceFromString("b"), false);
+  queue.AddObject(MemSliceFromString("c"), false);
+  queue.CallSubscribeForPast(
+      SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 1));
+}
+
+TEST(MoqtOutgoingQueue, TwoGroups) {
+  TestMoqtOutgoingQueue queue;
+  {
+    testing::InSequence seq;
+    EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+    EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+    EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+    EXPECT_CALL(queue, CloseStreamForGroup(0));
+    EXPECT_CALL(queue, PublishObject(1, 0, "d", false));
+    EXPECT_CALL(queue, PublishObject(1, 1, "e", false));
+    EXPECT_CALL(queue, PublishObject(1, 2, "f", false));
+  }
+  queue.AddObject(MemSliceFromString("a"), true);
+  queue.AddObject(MemSliceFromString("b"), false);
+  queue.AddObject(MemSliceFromString("c"), false);
+  queue.AddObject(MemSliceFromString("d"), true);
+  queue.AddObject(MemSliceFromString("e"), false);
+  queue.AddObject(MemSliceFromString("f"), false);
+}
+
+TEST(MoqtOutgoingQueue, TwoGroupsPastSubscribe) {
+  TestMoqtOutgoingQueue queue;
+  {
+    testing::InSequence seq;
+    EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+    EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+    EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+    EXPECT_CALL(queue, CloseStreamForGroup(0));
+    EXPECT_CALL(queue, PublishObject(1, 0, "d", false));
+    EXPECT_CALL(queue, PublishObject(1, 1, "e", false));
+    EXPECT_CALL(queue, PublishObject(1, 2, "f", false));
+
+    EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+    EXPECT_CALL(queue, PublishObject(0, 2, "c", true));
+    EXPECT_CALL(queue, PublishObject(1, 0, "d", false));
+    EXPECT_CALL(queue, PublishObject(1, 1, "e", false));
+    EXPECT_CALL(queue, PublishObject(1, 2, "f", false));
+  }
+  queue.AddObject(MemSliceFromString("a"), true);
+  queue.AddObject(MemSliceFromString("b"), false);
+  queue.AddObject(MemSliceFromString("c"), false);
+  queue.AddObject(MemSliceFromString("d"), true);
+  queue.AddObject(MemSliceFromString("e"), false);
+  queue.AddObject(MemSliceFromString("f"), false);
+  queue.CallSubscribeForPast(
+      SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 1));
+}
+
+TEST(MoqtOutgoingQueue, FiveGroups) {
+  TestMoqtOutgoingQueue queue;
+  {
+    testing::InSequence seq;
+    EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+    EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+    EXPECT_CALL(queue, CloseStreamForGroup(0));
+    EXPECT_CALL(queue, PublishObject(1, 0, "c", false));
+    EXPECT_CALL(queue, PublishObject(1, 1, "d", false));
+    EXPECT_CALL(queue, CloseStreamForGroup(1));
+    EXPECT_CALL(queue, PublishObject(2, 0, "e", false));
+    EXPECT_CALL(queue, PublishObject(2, 1, "f", false));
+    EXPECT_CALL(queue, CloseStreamForGroup(2));
+    EXPECT_CALL(queue, PublishObject(3, 0, "g", false));
+    EXPECT_CALL(queue, PublishObject(3, 1, "h", false));
+    EXPECT_CALL(queue, CloseStreamForGroup(3));
+    EXPECT_CALL(queue, PublishObject(4, 0, "i", false));
+    EXPECT_CALL(queue, PublishObject(4, 1, "j", false));
+  }
+  queue.AddObject(MemSliceFromString("a"), true);
+  queue.AddObject(MemSliceFromString("b"), false);
+  queue.AddObject(MemSliceFromString("c"), true);
+  queue.AddObject(MemSliceFromString("d"), false);
+  queue.AddObject(MemSliceFromString("e"), true);
+  queue.AddObject(MemSliceFromString("f"), false);
+  queue.AddObject(MemSliceFromString("g"), true);
+  queue.AddObject(MemSliceFromString("h"), false);
+  queue.AddObject(MemSliceFromString("i"), true);
+  queue.AddObject(MemSliceFromString("j"), false);
+}
+
+TEST(MoqtOutgoingQueue, FiveGroupsPastSubscribe) {
+  TestMoqtOutgoingQueue queue;
+  {
+    testing::InSequence seq;
+    EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+    EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+    EXPECT_CALL(queue, CloseStreamForGroup(0));
+    EXPECT_CALL(queue, PublishObject(1, 0, "c", false));
+    EXPECT_CALL(queue, PublishObject(1, 1, "d", false));
+    EXPECT_CALL(queue, CloseStreamForGroup(1));
+    EXPECT_CALL(queue, PublishObject(2, 0, "e", false));
+    EXPECT_CALL(queue, PublishObject(2, 1, "f", false));
+    EXPECT_CALL(queue, CloseStreamForGroup(2));
+    EXPECT_CALL(queue, PublishObject(3, 0, "g", false));
+    EXPECT_CALL(queue, PublishObject(3, 1, "h", false));
+    EXPECT_CALL(queue, CloseStreamForGroup(3));
+    EXPECT_CALL(queue, PublishObject(4, 0, "i", false));
+    EXPECT_CALL(queue, PublishObject(4, 1, "j", false));
+
+    // Past SUBSCRIBE would only get the three most recent groups.
+    EXPECT_CALL(queue, PublishObject(2, 0, "e", false));
+    EXPECT_CALL(queue, PublishObject(2, 1, "f", true));
+    EXPECT_CALL(queue, PublishObject(3, 0, "g", false));
+    EXPECT_CALL(queue, PublishObject(3, 1, "h", true));
+    EXPECT_CALL(queue, PublishObject(4, 0, "i", false));
+    EXPECT_CALL(queue, PublishObject(4, 1, "j", false));
+  }
+  queue.AddObject(MemSliceFromString("a"), true);
+  queue.AddObject(MemSliceFromString("b"), false);
+  queue.AddObject(MemSliceFromString("c"), true);
+  queue.AddObject(MemSliceFromString("d"), false);
+  queue.AddObject(MemSliceFromString("e"), true);
+  queue.AddObject(MemSliceFromString("f"), false);
+  queue.AddObject(MemSliceFromString("g"), true);
+  queue.AddObject(MemSliceFromString("h"), false);
+  queue.AddObject(MemSliceFromString("i"), true);
+  queue.AddObject(MemSliceFromString("j"), false);
+  queue.CallSubscribeForPast(
+      SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 0));
+}
+
+}  // namespace
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index dcbbf61..8781b26 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -15,6 +15,7 @@
 
 #include "absl/algorithm/container.h"
 #include "absl/status/status.h"
+#include "absl/status/statusor.h"
 #include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
 #include "absl/types/span.h"
@@ -24,6 +25,7 @@
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_stream.h"
@@ -439,6 +441,44 @@
   return (failures == 0);
 }
 
+void MoqtSession::CloseObjectStream(const FullTrackName& full_track_name,
+                                    uint64_t group_id) {
+  auto track_it = local_tracks_.find(full_track_name);
+  if (track_it == local_tracks_.end()) {
+    QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT for nonexistent track";
+    return;
+  }
+  LocalTrack& track = track_it->second;
+
+  MoqtForwardingPreference forwarding_preference =
+      track.forwarding_preference();
+  if (forwarding_preference == MoqtForwardingPreference::kObject ||
+      forwarding_preference == MoqtForwardingPreference::kDatagram) {
+    QUIC_BUG(MoqtSession_CloseStreamObject_wrong_type)
+        << "Forwarding preferences of Object or Datagram require stream to be "
+           "immediately closed, and thus are not valid CloseObjectStream() "
+           "targets";
+    return;
+  }
+
+  std::vector<SubscribeWindow*> subscriptions =
+      track.ShouldSend({group_id, /*object=*/0});
+  for (SubscribeWindow* subscription : subscriptions) {
+    std::optional<webtransport::StreamId> stream_id =
+        subscription->GetStreamForSequence(
+            FullSequence(group_id, /*object=*/0));
+    if (!stream_id.has_value()) {
+      continue;
+    }
+    webtransport::Stream* stream = session_->GetStreamById(*stream_id);
+    if (stream == nullptr) {
+      continue;
+    }
+    bool success = stream->SendFin();
+    QUICHE_BUG_IF(MoqtSession_CloseObjectStream_fin_failed, !success);
+  }
+}
+
 void MoqtSession::Stream::OnCanRead() {
   bool fin =
       quiche::ProcessAllReadableRegions(*stream_, [&](absl::string_view chunk) {
@@ -627,37 +667,39 @@
   QUICHE_DCHECK(start.has_value());  // Parser enforces this.
   std::optional<FullSequence> end = session_->LocationToAbsoluteNumber(
       track, message.end_group, message.end_object);
+  LocalTrack::Visitor::PublishPastObjectsCallback publish_past_objects;
+  SubscribeWindow window =
+      end.has_value()
+          ? SubscribeWindow(message.subscribe_id, track.forwarding_preference(),
+                            start->group, start->object, end->group,
+                            end->object)
+          : SubscribeWindow(message.subscribe_id, track.forwarding_preference(),
+                            start->group, start->object);
   if (start < track.next_sequence() && track.visitor() != nullptr) {
-    // TODO: Rework this. It's not good that the session notifies the
-    // application -- presumably triggering the send of a bunch of objects --
-    // and only then sends the Subscribe OK.
-    SubscribeWindow window =
-        end.has_value()
-            ? SubscribeWindow(message.subscribe_id,
-                              track.forwarding_preference(), start->group,
-                              start->object, end->group, end->object)
-            : SubscribeWindow(message.subscribe_id,
-                              track.forwarding_preference(), start->group,
-                              start->object);
-    std::optional<absl::string_view> past_objects_available =
-        track.visitor()->OnSubscribeForPast(window);
-    if (past_objects_available.has_value()) {
+    absl::StatusOr<LocalTrack::Visitor::PublishPastObjectsCallback>
+        past_objects_available = track.visitor()->OnSubscribeForPast(window);
+    if (!past_objects_available.ok()) {
       SendSubscribeError(message, SubscribeErrorCode::kInternalError,
-                         "Object does not exist", message.track_alias);
+                         past_objects_available.status().message(),
+                         message.track_alias);
       return;
     }
+    publish_past_objects = *std::move(past_objects_available);
   }
   MoqtSubscribeOk subscribe_ok;
   subscribe_ok.subscribe_id = message.subscribe_id;
   SendOrBufferMessage(session_->framer_.SerializeSubscribeOk(subscribe_ok));
   QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
                   << message.track_namespace << ":" << message.track_name;
-  if (!end.has_value()) {
+  if (end.has_value()) {
+    track.AddWindow(message.subscribe_id, start->group, start->object,
+                    end->group, end->object);
+  } else {
     track.AddWindow(message.subscribe_id, start->group, start->object);
-    return;
   }
-  track.AddWindow(message.subscribe_id, start->group, start->object, end->group,
-                  end->object);
+  if (publish_past_objects) {
+    std::move(publish_past_objects)();
+  }
 }
 
 void MoqtSession::Stream::OnSubscribeOkMessage(const MoqtSubscribeOk& message) {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 08eb658..fab05df 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -130,7 +130,8 @@
   bool PublishObject(const FullTrackName& full_track_name, uint64_t group_id,
                      uint64_t object_id, uint64_t object_send_order,
                      absl::string_view payload, bool end_of_stream);
-  // TODO: Add an API to FIN the stream for a particular track/group/object.
+  void CloseObjectStream(const FullTrackName& full_track_name,
+                         uint64_t group_id);
   // TODO: Add an API to send partial objects.
 
   MoqtSessionCallbacks& callbacks() { return callbacks_; }
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 5cac2fe..9f9370a 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -456,8 +456,8 @@
   std::unique_ptr<MoqtParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
   bool correct_message = true;
-  EXPECT_CALL(local_track_visitor, OnSubscribeForPast(_))
-      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(local_track_visitor, OnSubscribeForPast(_)).WillOnce(Return([] {
+  }));
   EXPECT_CALL(mock_stream, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index 53652ed..75a204f 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -54,6 +54,11 @@
 
   void RemoveStream(uint64_t group_id, uint64_t object_id);
 
+  bool HasEnd() const { return end_.has_value(); }
+  MoqtForwardingPreference forwarding_preference() const {
+    return forwarding_preference_;
+  }
+
  private:
   // Converts an object sequence number into one that matches the way that
   // stream IDs are being mapped. (See the comment for send_streams_ below.)
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 6193ccd..5f2957a 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -9,9 +9,11 @@
 #include <optional>
 #include <vector>
 
+#include "absl/status/statusor.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/common/quiche_callbacks.h"
 
 namespace moqt {
 
@@ -22,6 +24,8 @@
    public:
     virtual ~Visitor() = default;
 
+    using PublishPastObjectsCallback = quiche::SingleUseCallback<void()>;
+
     // Requests that application re-publish objects from {start_group,
     // start_object} to the latest object. If the return value is nullopt, the
     // subscribe is valid and the application will deliver the object and
@@ -29,7 +33,7 @@
     // is the error message (the session will send SUBSCRIBE_ERROR). Via this
     // API, the application decides if a partially fulfillable
     // SUBSCRIBE results in an error or not.
-    virtual std::optional<absl::string_view> OnSubscribeForPast(
+    virtual absl::StatusOr<PublishPastObjectsCallback> OnSubscribeForPast(
         const SubscribeWindow& window) = 0;
   };
   // |visitor| must not be nullptr.
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index 7d174a1..aaf301c 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -8,6 +8,7 @@
 #include <cstdint>
 #include <optional>
 
+#include "absl/status/statusor.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_session.h"
@@ -40,7 +41,7 @@
 
 class MockLocalTrackVisitor : public LocalTrack::Visitor {
  public:
-  MOCK_METHOD(std::optional<absl::string_view>, OnSubscribeForPast,
+  MOCK_METHOD(absl::StatusOr<PublishPastObjectsCallback>, OnSubscribeForPast,
               (const SubscribeWindow& window), (override));
 };