Allow fragmented MOQT object payloads.

MoqtOutgoingQueue does not create objects in fragments, but MoqtLiveRelayQueue should be prepared to accept them.

PiperOrigin-RevId: 914368728
diff --git a/build/source_list.bzl b/build/source_list.bzl
index b2b27f6..d741557 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1656,6 +1656,7 @@
     "quic/moqt/moqt_messages_test.cc",
     "quic/moqt/moqt_names_test.cc",
     "quic/moqt/moqt_namespace_stream_test.cc",
+    "quic/moqt/moqt_object_test.cc",
     "quic/moqt/moqt_outgoing_queue_test.cc",
     "quic/moqt/moqt_outstanding_objects_test.cc",
     "quic/moqt/moqt_parser_fuzz_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index c68b800..f07ff9b 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1661,6 +1661,7 @@
     "src/quiche/quic/moqt/moqt_messages_test.cc",
     "src/quiche/quic/moqt/moqt_names_test.cc",
     "src/quiche/quic/moqt/moqt_namespace_stream_test.cc",
+    "src/quiche/quic/moqt/moqt_object_test.cc",
     "src/quiche/quic/moqt/moqt_outgoing_queue_test.cc",
     "src/quiche/quic/moqt/moqt_outstanding_objects_test.cc",
     "src/quiche/quic/moqt/moqt_parser_fuzz_test.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 215e90a..dc40e1a 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1660,6 +1660,7 @@
     "quiche/quic/moqt/moqt_messages_test.cc",
     "quiche/quic/moqt/moqt_names_test.cc",
     "quiche/quic/moqt/moqt_namespace_stream_test.cc",
+    "quiche/quic/moqt/moqt_object_test.cc",
     "quiche/quic/moqt/moqt_outgoing_queue_test.cc",
     "quiche/quic/moqt/moqt_outstanding_objects_test.cc",
     "quiche/quic/moqt/moqt_parser_fuzz_test.cc",
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 6202316..ffa3e3d 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -331,13 +331,13 @@
   EXPECT_CALL(subscribe_visitor_, OnObjectFragment)
       .WillOnce([&](const FullTrackName& full_track_name,
                     const PublishedObjectMetadata& metadata,
-                    absl::string_view object, bool end_of_message) {
+                    absl::string_view object, uint64_t offset) {
         EXPECT_EQ(full_track_name, FullTrackName("test", "data"));
         EXPECT_EQ(metadata.location.group, 0u);
         EXPECT_EQ(metadata.location.object, 0u);
         EXPECT_EQ(metadata.status, MoqtObjectStatus::kNormal);
         EXPECT_EQ(object, "object data");
-        EXPECT_TRUE(end_of_message);
+        EXPECT_EQ(offset, 0u);
         received_object = true;
       });
   success = test_harness_.RunUntilWithDefaultTimeout(
@@ -391,20 +391,20 @@
         OnObjectFragment(_,
                          MetadataLocationAndStatus(
                              Location{0, 3}, MoqtObjectStatus::kEndOfGroup),
-                         "", true))
+                         "", /*offset=*/0))
         .WillOnce([&] { ++received; });
     EXPECT_CALL(subscribe_visitor_,
                 OnObjectFragment(_,
                                  MetadataLocationAndStatus(
                                      Location{1, 0}, MoqtObjectStatus::kNormal),
-                                 "object 4", true))
+                                 "object 4", /*offset=*/0))
         .WillOnce([&] { ++received; });
     queue->AddObject(MemSliceFromString("object 4"), /*key=*/true);
     EXPECT_CALL(subscribe_visitor_,
                 OnObjectFragment(_,
                                  MetadataLocationAndStatus(
                                      Location{1, 1}, MoqtObjectStatus::kNormal),
-                                 "object 5", true))
+                                 "object 5", /*offset=*/0))
         .WillOnce([&] { ++received; });
     queue->AddObject(MemSliceFromString("object 5"), /*key=*/false);
 
@@ -416,7 +416,7 @@
                 OnObjectFragment(_,
                                  MetadataLocationAndStatus(
                                      Location{1, 2}, MoqtObjectStatus::kNormal),
-                                 "object 6", true))
+                                 "object 6", /*offset=*/0))
         .WillOnce([&] { ++received; });
     queue->AddObject(MemSliceFromString("object 6"), /*key=*/false);
     EXPECT_CALL(
@@ -424,20 +424,20 @@
         OnObjectFragment(_,
                          MetadataLocationAndStatus(
                              Location{1, 3}, MoqtObjectStatus::kEndOfGroup),
-                         "", true))
+                         "", /*offset=*/0))
         .WillOnce([&] { ++received; });
     EXPECT_CALL(subscribe_visitor_,
                 OnObjectFragment(_,
                                  MetadataLocationAndStatus(
                                      Location{2, 0}, MoqtObjectStatus::kNormal),
-                                 "object 7", true))
+                                 "object 7", /*offset=*/0))
         .WillOnce([&] { ++received; });
     queue->AddObject(MemSliceFromString("object 7"), /*key=*/true);
     EXPECT_CALL(subscribe_visitor_,
                 OnObjectFragment(_,
                                  MetadataLocationAndStatus(
                                      Location{2, 1}, MoqtObjectStatus::kNormal),
-                                 "object 8", true))
+                                 "object 8", /*offset=*/0))
         .WillOnce([&] { ++received; });
     queue->AddObject(MemSliceFromString("object 8"), /*key=*/false);
 
@@ -450,14 +450,14 @@
         OnObjectFragment(_,
                          MetadataLocationAndStatus(
                              Location{2, 2}, MoqtObjectStatus::kEndOfGroup),
-                         "", true))
+                         "", /*offset=*/0))
         .WillOnce([&] { ++received; });
     EXPECT_CALL(
         subscribe_visitor_,
         OnObjectFragment(_,
                          MetadataLocationAndStatus(
                              Location{3, 0}, MoqtObjectStatus::kEndOfTrack),
-                         "", true))
+                         "", /*offset=*/0))
         .WillOnce([&] { ++received; });
     queue->Close();
     success = test_harness_.RunUntilWithDefaultTimeout(
@@ -499,7 +499,7 @@
     EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kSuccess);
     EXPECT_EQ(object.metadata.location, expected);
     EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kNormal);
-    EXPECT_EQ(object.payload.AsStringView(), "object");
+    EXPECT_EQ(object.payload[0].AsStringView(), "object");
     ++expected.group;
   } while (result == MoqtFetchTask::GetNextObjectResult::kSuccess);
   EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kEof);
@@ -779,7 +779,7 @@
       .WillRepeatedly(
           [&](const FullTrackName&, const PublishedObjectMetadata& metadata,
               absl::string_view object,
-              bool end_of_message) { bytes_received += object.size(); });
+              uint64_t offset) { bytes_received += object.size(); });
   queue->AddObject(Location{0, 0}, 0, data, false);
   queue->AddObject(Location{0, 1}, 0, data, false);
   queue->AddObject(Location{0, 2}, 0, data, false);
@@ -830,7 +830,7 @@
       .WillRepeatedly(
           [&](const FullTrackName&, const PublishedObjectMetadata& metadata,
               absl::string_view object,
-              bool end_of_message) { bytes_received += object.size(); });
+              uint64_t offset) { bytes_received += object.size(); });
   queue->AddObject(Location{0, 0}, 0, data, false);
   queue->AddObject(Location{1, 0}, 0, data, false);
   success = test_harness_.RunUntilWithDefaultTimeout([&]() {
@@ -904,7 +904,7 @@
               OnObjectFragment(_,
                                MetadataLocationAndStatus(
                                    Location{0, 0}, MoqtObjectStatus::kNormal),
-                               kObjectPayload, true))
+                               kObjectPayload, /*offset=*/0))
       .WillOnce([&] { ++received; });
 
   success =
diff --git a/quiche/quic/moqt/moqt_object.cc b/quiche/quic/moqt/moqt_object.cc
index 3ee1a72..b894d7a 100644
--- a/quiche/quic/moqt/moqt_object.cc
+++ b/quiche/quic/moqt/moqt_object.cc
@@ -4,22 +4,58 @@
 
 #include "quiche/quic/moqt/moqt_object.h"
 
+#include <cstdint>
+
 #include "absl/strings/string_view.h"
-#include "quiche/common/quiche_mem_slice.h"
+#include "absl/synchronization/mutex.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
+#include "quiche/common/quiche_cord_utils.h"
 
 namespace moqt {
 
-moqt::PublishedObject CachedObjectToPublishedObject(
-    const CachedObject& object) {
-  PublishedObject result;
-  result.metadata = object.metadata;
-  if (object.payload != nullptr && !object.payload->empty()) {
-    result.payload = quiche::QuicheMemSlice(
-        object.payload->data(), object.payload->length(),
-        [retained_pointer = object.payload](absl::string_view) {});
+bool CachedObject::Append(uint64_t offset, absl::string_view payload) {
+  absl::MutexLock lock(mutex_);
+  uint64_t total_length = payload_received_locked();
+  if (offset > total_length) {
+    QUICHE_BUG(cached_object_gap) << "Gap in bytes in CachedObject::Append";
+    return false;
   }
-  result.fin_after_this = object.fin_after_this;
+  if (offset + payload.length() > metadata_.payload_length) {
+    // This object is larger than the declared size.
+    QUICHE_BUG(cached_object_too_large)
+        << "Object is larger than the declared size";
+    return false;
+  }
+  if (offset + payload.length() <= total_length) {
+    return false;  // No new data.
+  }
+  payload_.Append(payload.substr(total_length - offset));
+  return true;
+}
+
+PublishedObject CachedObject::ToPublishedObject(uint64_t offset) const {
+  PublishedObject result;
+  result.metadata = metadata();
+  absl::MutexLock lock(mutex_);
+  uint64_t total_length = payload_received_locked();
+  quiche::CordToMemSlicesTo(payload_.Subcord(offset, total_length - offset),
+                            result.payload);
+  result.fin_after_this = fin_after_this_;
   return result;
 }
 
+bool CachedObject::OverlapIsEqual(uint64_t offset,
+                                  absl::string_view payload) const {
+  absl::MutexLock lock(mutex_);
+  uint64_t total_length = payload_received_locked();
+  if (offset >= total_length) {
+    return true;
+  }
+  if (offset + payload.length() <= total_length) {
+    return payload_.Subcord(offset, payload.length()) == payload;
+  }
+  return payload_.Subcord(offset, total_length - offset) ==
+         payload.substr(0, total_length - offset);
+}
+
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_object.h b/quiche/quic/moqt/moqt_object.h
index 87469d9..a465036 100644
--- a/quiche/quic/moqt/moqt_object.h
+++ b/quiche/quic/moqt/moqt_object.h
@@ -6,13 +6,19 @@
 #define QUICHE_QUIC_MOQT_MOQT_OBJECT_H_
 
 #include <cstdint>
-#include <memory>
 #include <optional>
 #include <string>
+#include <utility>
+#include <vector>
 
+#include "absl/base/thread_annotations.h"
+#include "absl/strings/cord.h"
+#include "absl/strings/string_view.h"
+#include "absl/synchronization/mutex.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_types.h"
+#include "quiche/common/quiche_cord_utils.h"
 #include "quiche/common/quiche_mem_slice.h"
 
 namespace moqt {
@@ -23,6 +29,9 @@
   std::string extensions;
   MoqtObjectStatus status;
   MoqtPriority publisher_priority;
+  // The length of the entire payload, which might include data that is not
+  // present in an encompassing PublishedObject or CachedObject.
+  uint64_t payload_length;
   quic::QuicTime arrival_time = quic::QuicTime::Zero();
   bool IsMalformed(const PublishedObjectMetadata& other) const {
     // It's OK for arrival_time to be different when checking immutables.
@@ -30,26 +39,70 @@
             status != other.status ||
             publisher_priority != other.publisher_priority);
   }
+  bool operator==(const PublishedObjectMetadata& other) const = default;
 };
 
 // PublishedObject is a description of an object that is sufficient to publish
 // it on a given track.
 struct PublishedObject {
   PublishedObjectMetadata metadata;
-  quiche::QuicheMemSlice payload;
+  // This could be a partial object, containing the data between the requested
+  // offset and the end of the data the publisher has on hand.
+  std::vector<quiche::QuicheMemSlice> payload;
   bool fin_after_this = false;
 };
 
 // CachedObject is a version of PublishedObject with a reference counted
-// payload.
-struct CachedObject {
-  PublishedObjectMetadata metadata;
-  std::shared_ptr<quiche::QuicheMemSlice> payload;
-  bool fin_after_this;  // This is the last object before FIN.
-};
+// payload. This is thread-safe.
+// TODO(martinduke): Allow for the deletion of the front of the payload. The
+// number of bytes deleted will have to be subtracted from any offset that the
+// caller provides (as well as added in payload_received).
+class CachedObject {
+ public:
+  CachedObject(const PublishedObjectMetadata& metadata,
+               quiche::QuicheMemSlice payload, bool fin_after_this)
+      : metadata_(metadata),
+        payload_(quiche::MemSliceToCord(std::move(payload))),
+        fin_after_this_(fin_after_this) {}
 
-// Transforms a CachedObject into a PublishedObject.
-PublishedObject CachedObjectToPublishedObject(const CachedObject& object);
+  // Add |payload| at |offset|. Checks for overlaps in data. Returns false if
+  // the payload is too large, or there is no new data.
+  bool Append(uint64_t offset, absl::string_view payload);
+  // Returns a PublishedObject with only the portion of payload starting at
+  // |offset|.
+  PublishedObject ToPublishedObject(uint64_t offset = 0) const;
+  const PublishedObjectMetadata& metadata() const { return metadata_; }
+  bool fin_after_this() const ABSL_LOCKS_EXCLUDED(mutex_) {
+    absl::MutexLock lock(mutex_);
+    return fin_after_this_;
+  }
+  void set_fin_after_this(bool fin) {
+    absl::MutexLock lock(mutex_);
+    fin_after_this_ = fin;
+  }
+  // This function wraps payload_.size(), both for Mutex purposes, and because
+  // eventually it will account for memory blocks that have been freed from the
+  // front of the payload.
+  uint64_t payload_received() const {
+    absl::MutexLock lock(mutex_);
+    return payload_received_locked();
+  }
+  // Returns true if data in payload_ starting at |offset| is equal to
+  // |payload|, checking only until the offset where one of the two strings
+  // ends. Returns true if there is no overlap in the offsets.
+  bool OverlapIsEqual(uint64_t offset, absl::string_view payload) const;
+
+ private:
+  // TODO(martinduke): Account for memory blocks that have been freed from the
+  // front of the payload.
+  uint64_t payload_received_locked() const { return payload_.size(); }
+
+  mutable absl::Mutex mutex_;
+  const PublishedObjectMetadata metadata_;
+  absl::Cord payload_;
+  // If true, this is the last object before FIN.
+  bool ABSL_GUARDED_BY(mutex_) fin_after_this_;
+};
 
 }  // namespace moqt
 
diff --git a/quiche/quic/moqt/moqt_object_test.cc b/quiche/quic/moqt/moqt_object_test.cc
new file mode 100644
index 0000000..df9090b
--- /dev/null
+++ b/quiche/quic/moqt/moqt_object_test.cc
@@ -0,0 +1,271 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_object.h"
+
+#include <cstring>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/moqt/moqt_types.h"
+#include "quiche/common/platform/api/quiche_expect_bug.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_mem_slice.h"
+
+namespace moqt {
+namespace test {
+namespace {
+
+class CachedObjectTest : public quiche::test::QuicheTest {
+ public:
+  PublishedObjectMetadata DefaultMetadata() {
+    PublishedObjectMetadata metadata;
+    metadata.location = Location(1, 2);
+    metadata.subgroup = 3;
+    metadata.status = MoqtObjectStatus::kNormal;
+    metadata.publisher_priority = 4;
+    metadata.payload_length = 10;
+    return metadata;
+  }
+
+  std::string Reassemble(const PublishedObject& published) {
+    std::string result;
+    for (const auto& slice : published.payload) {
+      result += std::string(slice.AsStringView());
+    }
+    return result;
+  }
+};
+
+TEST_F(CachedObjectTest, Constructor) {
+  CachedObject object(DefaultMetadata(), quiche::QuicheMemSlice::Copy("abc"),
+                      false);
+  PublishedObject published = object.ToPublishedObject();
+  EXPECT_EQ(published.metadata.location, Location(1, 2));
+  EXPECT_EQ(published.payload.size(), 1);
+  EXPECT_EQ(published.payload[0].AsStringView(), "abc");
+  EXPECT_FALSE(published.fin_after_this);
+}
+
+TEST(PublishedObjectMetadataTest, IsMalformed) {
+  PublishedObjectMetadata metadata;
+  metadata.location = Location(1, 2);
+  metadata.subgroup = 3;
+  metadata.status = MoqtObjectStatus::kNormal;
+  metadata.publisher_priority = 4;
+  metadata.payload_length = 10;
+
+  PublishedObjectMetadata other = metadata;
+  EXPECT_FALSE(metadata.IsMalformed(other));
+
+  other.location = Location(1, 3);
+  EXPECT_TRUE(metadata.IsMalformed(other));
+  other = metadata;
+
+  other.subgroup = 4;
+  EXPECT_TRUE(metadata.IsMalformed(other));
+  other = metadata;
+
+  other.status = MoqtObjectStatus::kObjectDoesNotExist;
+  EXPECT_TRUE(metadata.IsMalformed(other));
+  other = metadata;
+
+  other.publisher_priority = 5;
+  EXPECT_TRUE(metadata.IsMalformed(other));
+
+  // arrival_time, payload_length, and extensions being different should NOT
+  // make it malformed.
+  other = metadata;
+  other.arrival_time =
+      quic::QuicTime::Zero() + quic::QuicTimeDelta::FromSeconds(1);
+  EXPECT_FALSE(metadata.IsMalformed(other));
+  other.payload_length = 20;
+  EXPECT_FALSE(metadata.IsMalformed(other));
+  other.extensions = "ext";
+  EXPECT_FALSE(metadata.IsMalformed(other));
+}
+
+TEST(PublishedObjectMetadataTest, Equality) {
+  PublishedObjectMetadata metadata;
+  metadata.location = Location(1, 2);
+  metadata.subgroup = 3;
+  metadata.status = MoqtObjectStatus::kNormal;
+  metadata.publisher_priority = 4;
+  metadata.payload_length = 10;
+  metadata.extensions = "ext";
+  metadata.arrival_time =
+      quic::QuicTime::Zero() + quic::QuicTimeDelta::FromSeconds(1);
+
+  PublishedObjectMetadata other = metadata;
+  EXPECT_EQ(metadata, other);
+
+  other.location = Location(1, 3);
+  EXPECT_NE(metadata, other);
+  other = metadata;
+
+  other.subgroup = 4;
+  EXPECT_NE(metadata, other);
+  other = metadata;
+
+  other.status = MoqtObjectStatus::kObjectDoesNotExist;
+  EXPECT_NE(metadata, other);
+  other = metadata;
+
+  other.publisher_priority = 5;
+  EXPECT_NE(metadata, other);
+  other = metadata;
+
+  other.payload_length = 20;
+  EXPECT_NE(metadata, other);
+  other = metadata;
+
+  other.extensions = "something else";
+  EXPECT_NE(metadata, other);
+  other = metadata;
+
+  other.arrival_time =
+      quic::QuicTime::Zero() + quic::QuicTimeDelta::FromSeconds(2);
+  EXPECT_NE(metadata, other);
+}
+
+TEST_F(CachedObjectTest, SetFinAfterThis) {
+  CachedObject object(DefaultMetadata(), quiche::QuicheMemSlice::Copy("abc"),
+                      false);
+  EXPECT_FALSE(object.fin_after_this());
+  object.set_fin_after_this(true);
+  EXPECT_TRUE(object.fin_after_this());
+}
+
+TEST_F(CachedObjectTest, Append) {
+  PublishedObjectMetadata metadata = DefaultMetadata();
+  metadata.payload_length = 10;
+  CachedObject object(metadata, quiche::QuicheMemSlice::Copy("abc"), false);
+
+  // Success: append at the end.
+  EXPECT_TRUE(object.Append(3, "def"));
+
+  // Success: partial overlap, should append remaining.
+  EXPECT_TRUE(object.Append(5, "fghi"));  // length 4. 5+4 = 9.
+
+  // Failure: gap.
+  EXPECT_QUICHE_BUG(object.Append(10, "k"),
+                    "Gap in bytes in CachedObject::Append");
+
+  // Failure: beyond payload_length.
+  EXPECT_QUICHE_BUG(
+      object.Append(9, "abc"),
+      "Object is larger than the declared size");  // 9+3 = 12 > 10.
+
+  // Failure: already received.
+  EXPECT_FALSE(object.Append(0, "abc"));
+
+  PublishedObject published = object.ToPublishedObject();
+  std::string full_payload;
+  for (const auto& slice : published.payload) {
+    full_payload += std::string(slice.AsStringView());
+  }
+  EXPECT_EQ(full_payload, "abcdefghi");
+}
+
+TEST_F(CachedObjectTest, GetPayload) {
+  PublishedObjectMetadata metadata = DefaultMetadata();
+  // We have to use large object fragments to avoid absl::Cord optimizing them
+  // into a single slice.
+  const size_t kBlockSize = 1000;
+  metadata.payload_length = kBlockSize * 3;
+  std::string object_data(metadata.payload_length, 'a');
+  absl::string_view payload = absl::string_view(object_data);
+  CachedObject object(
+      metadata, quiche::QuicheMemSlice::Copy(payload.substr(0, kBlockSize)),
+      false);
+  object.Append(kBlockSize, payload.substr(kBlockSize, kBlockSize));
+  object.Append(2 * kBlockSize, payload.substr(2 * kBlockSize, kBlockSize));
+
+  // Offset 0: get full payload.
+  std::string received = Reassemble(object.ToPublishedObject(0));
+  EXPECT_EQ(received, payload);
+  // Offset at slice boundary.
+  received = Reassemble(object.ToPublishedObject(kBlockSize));
+  EXPECT_EQ(received, payload.substr(kBlockSize));
+  // Offset in the middle of a slice.
+  received = Reassemble(object.ToPublishedObject(kBlockSize + 1));
+  EXPECT_EQ(received, payload.substr(kBlockSize + 1));
+  // Offset beyond the last slice but within payload_length.
+  received = Reassemble(object.ToPublishedObject(3 * kBlockSize));
+  EXPECT_EQ(received, "");
+  // Offset way beyond.
+  received = Reassemble(object.ToPublishedObject(4 * kBlockSize));
+  EXPECT_EQ(object.payload_received(), 3 * kBlockSize);
+}
+
+TEST_F(CachedObjectTest, ToPublishedObjectReferenceCounting) {
+  CachedObject object(DefaultMetadata(), quiche::QuicheMemSlice::Copy("abc"),
+                      false);
+  PublishedObject published = object.ToPublishedObject();
+  EXPECT_EQ(published.payload[0].AsStringView(), "abc");
+
+  // Even if we append more, the old published object's slices should remain
+  // valid.
+  object.Append(3, "def");
+  EXPECT_EQ(published.payload[0].AsStringView(), "abc");
+  EXPECT_EQ(published.payload.size(), 1);
+}
+
+TEST_F(CachedObjectTest, OverlapIsEqual) {
+  PublishedObjectMetadata metadata = DefaultMetadata();
+  metadata.payload_length = 20;
+  CachedObject object(metadata, quiche::QuicheMemSlice::Copy("abcdefghij"),
+                      false);
+  // payload_ covers [0, 10).
+
+  // No overlap.
+  EXPECT_TRUE(object.OverlapIsEqual(10, "klm"));
+  EXPECT_TRUE(object.OverlapIsEqual(15, "xyz"));
+
+  // Exact match.
+  EXPECT_TRUE(object.OverlapIsEqual(0, "abcdefghij"));
+  EXPECT_TRUE(object.OverlapIsEqual(5, "fghij"));
+
+  // Partial overlap, matches.
+  EXPECT_TRUE(object.OverlapIsEqual(0, "abcde"));
+  EXPECT_TRUE(object.OverlapIsEqual(8, "ijmnop"));  // Overlap is "ij", matches.
+
+  // Partial overlap, mismatch.
+  EXPECT_FALSE(object.OverlapIsEqual(0, "axcde"));
+  EXPECT_FALSE(
+      object.OverlapIsEqual(8, "ixmnop"));  // Overlap is "ij" vs "ix", mismatch
+
+  // Overlap beyond end of payload_, matches.
+  EXPECT_TRUE(
+      object.OverlapIsEqual(5, "fghijXXXXX"));  // Overlap is "fghij", matches.
+
+  // Empty payload argument.
+  EXPECT_TRUE(object.OverlapIsEqual(5, ""));
+}
+
+TEST_F(CachedObjectTest, OverlapIsEqualMultiSlice) {
+  PublishedObjectMetadata metadata = DefaultMetadata();
+  metadata.payload_length = 20;
+  CachedObject object(metadata, quiche::QuicheMemSlice::Copy("abc"), false);
+  object.Append(3, "def");
+  object.Append(6, "ghi");
+
+  // Overlap across two slices: "bcde" (indices 1 to 5)
+  EXPECT_TRUE(object.OverlapIsEqual(1, "bcde"));
+  EXPECT_FALSE(object.OverlapIsEqual(1, "bcxe"));
+
+  // Overlap across three slices: "bcdefgh"
+  EXPECT_TRUE(object.OverlapIsEqual(1, "bcdefgh"));
+  EXPECT_FALSE(object.OverlapIsEqual(1, "bcdefgx"));
+
+  // Overlap starting exactly at boundary
+  EXPECT_TRUE(object.OverlapIsEqual(3, "defg"));
+}
+
+}  // namespace
+}  // namespace test
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
index 3abeaaa..71b3e39 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -17,7 +17,6 @@
 #include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
-#include "quiche/quic/moqt/moqt_stream_map.h"
 #include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/quiche_mem_slice.h"
@@ -75,11 +74,15 @@
                                      quiche::QuicheMemSlice payload) {
   Location sequence{current_group_id_, queue_.back().size()};
   bool fin = status == MoqtObjectStatus::kEndOfGroup;
-  queue_.back().push_back(CachedObject{
-      PublishedObjectMetadata{sequence, 0, "", status,
-                              default_publisher_priority(),
-                              clock_->ApproximateNow()},
-      std::make_shared<quiche::QuicheMemSlice>(std::move(payload)), fin});
+  PublishedObjectMetadata metadata{sequence,
+                                   0,
+                                   "",
+                                   status,
+                                   default_publisher_priority(),
+                                   payload.length(),
+                                   clock_->ApproximateNow()};
+  queue_.back().push_back(
+      std::make_unique<CachedObject>(metadata, std::move(payload), fin));
   for (MoqtObjectListener* listener : listeners_) {
     listener->OnNewObjectAvailable(sequence, /*subgroup=*/0,
                                    default_publisher_priority());
@@ -87,7 +90,8 @@
 }
 
 std::optional<PublishedObject> MoqtOutgoingQueue::GetCachedObject(
-    uint64_t group, std::optional<uint64_t> subgroup, uint64_t object) const {
+    uint64_t group, std::optional<uint64_t> subgroup, uint64_t object,
+    uint64_t offset) const {
   QUICHE_DCHECK(subgroup.has_value() && subgroup == 0u);
   if (group < first_group_in_queue()) {
     return std::nullopt;
@@ -95,24 +99,24 @@
   if (group > current_group_id_) {
     return std::nullopt;
   }
-  const std::vector<CachedObject>& group_objects =
+  const std::vector<std::unique_ptr<CachedObject>>& group_objects =
       queue_[group - first_group_in_queue()];
   if (object >= group_objects.size()) {
     return std::nullopt;
   }
   QUICHE_DCHECK(Location(group, object) ==
-                group_objects[object].metadata.location);
-  return CachedObjectToPublishedObject(group_objects[object]);
+                group_objects[object]->metadata().location);
+  return group_objects[object]->ToPublishedObject(offset);
 }
 
 std::vector<Location> MoqtOutgoingQueue::GetCachedObjectsInRange(
     Location start, Location end) const {
   std::vector<Location> sequences;
   for (const Group& group : queue_) {
-    for (const CachedObject& object : group) {
-      if (object.metadata.location >= start &&
-          object.metadata.location <= end) {
-        sequences.push_back(object.metadata.location);
+    for (const std::unique_ptr<CachedObject>& object : group) {
+      if (object->metadata().location >= start &&
+          object->metadata().location <= end) {
+        sequences.push_back(object->metadata().location);
       }
     }
   }
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index 2cc9309..2d64c0d 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -58,8 +58,8 @@
   // MoqtTrackPublisher implementation.
   const FullTrackName& GetTrackName() const override { return track_; }
   std::optional<PublishedObject> GetCachedObject(
-      uint64_t group, std::optional<uint64_t> subgroup,
-      uint64_t min_object) const override;
+      uint64_t group, std::optional<uint64_t> subgroup, uint64_t min_object,
+      uint64_t offset = 0) const override;
   void AddObjectListener(MoqtObjectListener* listener) override {
     listeners_.insert(listener);
     listener->OnSubscribeAccepted();
@@ -158,7 +158,9 @@
     absl::Status status_ = absl::OkStatus();
   };
 
-  using Group = std::vector<CachedObject>;
+  // CachedObject is non-movable, so we have to use unique_ptr for pointer
+  // stability.
+  using Group = std::vector<std::unique_ptr<CachedObject>>;
 
   // Appends an object to the end of the current group.
   void AddRawObject(MoqtObjectStatus status, quiche::QuicheMemSlice payload);
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index 07b5a5f..b091536 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -65,7 +65,7 @@
     if (object->metadata.status == MoqtObjectStatus::kNormal) {
       PublishObject(object->metadata.location.group,
                     object->metadata.location.object,
-                    object->payload.AsStringView());
+                    object->payload[0].AsStringView());
     } else {
       CloseStreamForGroup(object->metadata.location.group);
     }
@@ -110,7 +110,7 @@
     switch (result) {
       case MoqtFetchTask::kSuccess:
         if (object.metadata.status == MoqtObjectStatus::kNormal) {
-          objects.emplace_back(object.payload.AsStringView());
+          objects.emplace_back(object.payload[0].AsStringView());
         } else {
           EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kEndOfGroup);
         }
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
index 2d13655..f978df2 100644
--- a/quiche/quic/moqt/moqt_publisher.h
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -36,9 +36,10 @@
   // This could happen synchronously or asynchronously.
   virtual void OnSubscribeRejected(MoqtRequestErrorInfo info) = 0;
 
-  // Notifies that a new object is available on the track.  The object payload
-  // itself may be retrieved via GetCachedObject method of the associated track
-  // publisher. If |subgroup| is nullopt, the object is a datagram.
+  // Notifies that a new object is available on the track, or new payload of
+  // a partially delivered object.  The object payload itself may be retrieved
+  // via GetCachedObject method of the associated track publisher. If |subgroup|
+  // is nullopt, the object is a datagram.
   virtual void OnNewObjectAvailable(Location sequence,
                                     std::optional<uint64_t> subgroup,
                                     MoqtPriority publisher_priority) = 0;
@@ -85,8 +86,8 @@
   // This method returns nullopt if the object is not currently available.
   // If |subgroup| is nullopt, the object is a datagram.
   virtual std::optional<PublishedObject> GetCachedObject(
-      uint64_t group, std::optional<uint64_t> subgroup,
-      uint64_t min_object) const = 0;
+      uint64_t group, std::optional<uint64_t> subgroup, uint64_t min_object,
+      uint64_t offset = 0) const = 0;
 
   // Registers a listener with the track.  The listener will be notified of all
   // newly arriving objects. The pointer to the listener must be valid until
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.cc b/quiche/quic/moqt/moqt_relay_track_publisher.cc
index 6d76d65..797febe 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher.cc
+++ b/quiche/quic/moqt/moqt_relay_track_publisher.cc
@@ -5,7 +5,6 @@
 #include "quiche/quic/moqt/moqt_relay_track_publisher.h"
 
 #include <cstdint>
-#include <memory>
 #include <optional>
 #include <utility>
 #include <variant>
@@ -15,7 +14,6 @@
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_error.h"
 #include "quiche/quic/moqt/moqt_key_value_pair.h"
-#include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_names.h"
 #include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
@@ -64,13 +62,13 @@
 void MoqtRelayTrackPublisher::OnObjectFragment(
     const FullTrackName& full_track_name,
     const PublishedObjectMetadata& metadata, absl::string_view object,
-    bool end_of_message) {
+    uint64_t offset) {
   if (is_closing_) {
     return;
   }
-  if (!end_of_message) {
-    QUICHE_BUG(moqt_relay_track_publisher_got_fragment)
-        << "Received a fragment of an object.";
+  if (object.empty() && offset > 0) {
+    QUICHE_BUG(moqt_relay_track_publisher_empty_payload)
+        << "Empty payload for partial object " << metadata.location;
     return;
   }
   bool last_object_in_stream = false;
@@ -127,13 +125,17 @@
   }
   CachedObject* duplicate_object = nullptr;
   if (!metadata.subgroup.has_value()) {  // It's a datagram.
-    std::shared_ptr<quiche::QuicheMemSlice> slice;
+    if (offset != 0 || object.length() != metadata.payload_length) {
+      QUICHE_BUG(moqt_relay_track_publisher_incomplete_datagram)
+          << "Received a partial datagram.";
+      return;
+    }
+    quiche::QuicheMemSlice slice;
     if (!object.empty()) {
-      slice = std::make_shared<quiche::QuicheMemSlice>(
-          quiche::QuicheMemSlice::Copy(object));
+      slice = quiche::QuicheMemSlice::Copy(object);
     }
     auto [it, inserted] = group.datagrams.try_emplace(
-        metadata.location.object, CachedObject{metadata, slice, false});
+        metadata.location.object, metadata, std::move(slice), false);
     if (!inserted) {
       duplicate_object = &it->second;
     }
@@ -142,13 +144,13 @@
     auto& subgroup = subgroup_it.first->second;
     if (!subgroup.empty()) {  // Check if the new object is valid
       CachedObject& last_object = subgroup.rbegin()->second;
-      if (last_object.metadata.publisher_priority !=
+      if (last_object.metadata().publisher_priority !=
           metadata.publisher_priority) {
         QUICHE_DLOG(INFO) << "Publisher priority changing in a subgroup";
         OnMalformedTrack(full_track_name);
         return;
       }
-      if (last_object.fin_after_this) {
+      if (last_object.fin_after_this()) {
         QUICHE_DLOG(INFO) << "Skipping object because it is after the end of "
                           << "the subgroup";
         OnMalformedTrack(full_track_name);
@@ -157,9 +159,9 @@
       // If last_object has stream-ending status, it should have been caught by
       // the fin_after_this check above.
       QUICHE_DCHECK(
-          last_object.metadata.status != MoqtObjectStatus::kEndOfGroup &&
-          last_object.metadata.status != MoqtObjectStatus::kEndOfTrack);
-      if (last_object.metadata.location.object > metadata.location.object) {
+          last_object.metadata().status != MoqtObjectStatus::kEndOfGroup &&
+          last_object.metadata().status != MoqtObjectStatus::kEndOfTrack);
+      if (last_object.metadata().location.object > metadata.location.object) {
         QUICHE_DLOG(INFO) << "Skipping object because it decreases the "
                           << "object ID in the subgroup.";
         return;
@@ -170,29 +172,41 @@
       // Anticipate stream FIN.
       last_object_in_stream = true;
     }
-    std::shared_ptr<quiche::QuicheMemSlice> slice;
+    quiche::QuicheMemSlice slice;
     if (!object.empty()) {
-      slice = std::make_shared<quiche::QuicheMemSlice>(
-          quiche::QuicheMemSlice::Copy(object));
+      slice = quiche::QuicheMemSlice::Copy(object);
     }
-    auto [it, inserted] = subgroup.try_emplace(
-        metadata.location.object,
-        CachedObject{metadata, slice, last_object_in_stream});
+    auto [it, inserted] =
+        subgroup.try_emplace(metadata.location.object, metadata,
+                             std::move(slice), last_object_in_stream);
     if (!inserted) {
       duplicate_object = &it->second;
     }
   }
   if (duplicate_object != nullptr) {
-    if (metadata.IsMalformed(duplicate_object->metadata)) {
+    if (metadata.IsMalformed(duplicate_object->metadata())) {
       // Something besides the arrival time and extension headers changed.
       OnMalformedTrack(full_track_name);
       return;
     }
-    // TODO(b/467718801): Fix this when the class supports partial object
-    // delivery. When objects are complete, we can simply compare payloads.
-    if (duplicate_object->payload->AsStringView() != object) {
+    if (!duplicate_object->OverlapIsEqual(offset, object)) {
       OnMalformedTrack(full_track_name);
+      return;
     }
+    // This could complete an incomplete object.
+    if (duplicate_object->metadata().payload_length >
+        duplicate_object->payload_received()) {
+      if (!duplicate_object->Append(offset, object)) {
+        return;
+      }
+      // Data added to the object. Notify listeners.
+      for (MoqtObjectListener* listener : listeners_) {
+        listener->OnNewObjectAvailable(metadata.location, metadata.subgroup,
+                                       metadata.publisher_priority);
+      }
+      return;
+    }
+
     // No need to update state.
     return;
   }
@@ -267,9 +281,10 @@
     return;
   }
   CachedObject& last_object = subgroup_it->second.rbegin()->second;
-  last_object.fin_after_this = true;
+  last_object.set_fin_after_this(true);
   for (MoqtObjectListener* listener : listeners_) {
-    listener->OnNewFinAvailable(last_object.metadata.location, stream.subgroup);
+    listener->OnNewFinAvailable(last_object.metadata().location,
+                                stream.subgroup);
   }
 }
 
@@ -286,7 +301,7 @@
 
 std::optional<PublishedObject> MoqtRelayTrackPublisher::GetCachedObject(
     uint64_t group_id, std::optional<uint64_t> subgroup_id,
-    uint64_t min_object_id) const {
+    uint64_t min_object_id, uint64_t offset) const {
   auto group_it = queue_.find(group_id);
   if (group_it == queue_.end()) {
     // Group does not exist.
@@ -299,7 +314,7 @@
       // No object after the last one received.
       return std::nullopt;
     }
-    return CachedObjectToPublishedObject(object_it->second);
+    return object_it->second.ToPublishedObject();
   }
   auto subgroup_it = group.subgroups.find(*subgroup_id);
   if (subgroup_it == group.subgroups.end()) {
@@ -316,7 +331,7 @@
     // No object after the last one received.
     return std::nullopt;
   }
-  return CachedObjectToPublishedObject(object_it->second);
+  return object_it->second.ToPublishedObject(offset);
 }
 
 void MoqtRelayTrackPublisher::AddObjectListener(MoqtObjectListener* listener) {
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.h b/quiche/quic/moqt/moqt_relay_track_publisher.h
index e10e5ab..2016a20 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher.h
+++ b/quiche/quic/moqt/moqt_relay_track_publisher.h
@@ -7,6 +7,7 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <map>
 #include <memory>
 #include <optional>
 #include <utility>
@@ -75,7 +76,7 @@
   void OnCanAckObjects(MoqtObjectAckFunction /*ack_function*/) override {}
   void OnObjectFragment(const FullTrackName& full_track_name,
                         const PublishedObjectMetadata& metadata,
-                        absl::string_view object, bool end_of_message) override;
+                        absl::string_view object, uint64_t offset) override;
   void OnPublishDone(FullTrackName full_track_name) override;
   void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {
     DeleteTrack();
@@ -87,7 +88,7 @@
   const FullTrackName& GetTrackName() const override { return track_; }
   std::optional<PublishedObject> GetCachedObject(
       uint64_t group_id, std::optional<uint64_t> subgroup_id,
-      uint64_t min_object) const override;
+      uint64_t min_object, uint64_t offset = 0) const override;
   void AddObjectListener(MoqtObjectListener* listener) override;
   void RemoveObjectListener(MoqtObjectListener* listener) override;
   std::optional<Location> largest_location() const override;
@@ -122,13 +123,13 @@
   void DeleteTrack();
 
   // Ordered by object id.
-  using Subgroup = absl::btree_map<uint64_t, CachedObject>;
+  using Subgroup = std::map<uint64_t, CachedObject>;
 
   struct Group {
     uint64_t next_object = 0;
     bool complete = false;  // If true, kEndOfGroup has been received.
     absl::btree_map<uint64_t, Subgroup> subgroups;  // Ordered by subgroup id.
-    absl::btree_map<uint64_t, CachedObject> datagrams;
+    std::map<uint64_t, CachedObject> datagrams;
   };
 
   bool is_closing_ = false;
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
index 8861e7a..32ba149 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
+++ b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
@@ -7,12 +7,12 @@
 #include <cstdint>
 #include <memory>
 #include <optional>
+#include <string>
 
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_error.h"
 #include "quiche/quic/moqt/moqt_key_value_pair.h"
-#include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_names.h"
 #include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_priority.h"
@@ -20,6 +20,7 @@
 #include "quiche/quic/moqt/moqt_session_interface.h"
 #include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/moqt/test_tools/mock_moqt_session.h"
+#include "quiche/common/platform/api/quiche_expect_bug.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -81,8 +82,9 @@
     }
     publisher_.OnObjectFragment(
         kTrackName,
-        PublishedObjectMetadata{location, subgroup, "", status, 128}, payload,
-        true);
+        PublishedObjectMetadata{location, subgroup, "", status, 128,
+                                payload.length()},
+        payload, /*offset=*/0);
     std::optional<PublishedObject> object =
         publisher_.GetCachedObject(location.group, subgroup, location.object);
     ASSERT_TRUE(object.has_value());
@@ -91,7 +93,11 @@
       EXPECT_EQ(object->metadata.subgroup, subgroup);
       EXPECT_EQ(object->metadata.status, status);
       EXPECT_EQ(object->metadata.publisher_priority, 128);
-      EXPECT_EQ(object->payload.AsStringView(), payload);
+      std::string full_payload;
+      for (const auto& slice : object->payload) {
+        full_payload += slice.AsStringView();
+      }
+      EXPECT_EQ(full_payload, payload);
       EXPECT_EQ(object->fin_after_this, fin_after_this);
     }
   }
@@ -167,8 +173,8 @@
     publisher_.OnObjectFragment(
         kTrackName,
         PublishedObjectMetadata{Location(group, 0), 0, "",
-                                MoqtObjectStatus::kEndOfGroup, 128},
-        "", true);
+                                MoqtObjectStatus::kEndOfGroup, 128, 0},
+        "", /*offset=*/0);
   }
 }
 
@@ -182,8 +188,9 @@
   EXPECT_CALL(*session_, Unsubscribe(kTrackName));
   publisher_.OnObjectFragment(
       kTrackName,
-      PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 128},
-      "object", true);
+      PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 128,
+                              6},
+      "object", /*offset=*/0);
   EXPECT_TRUE(track_deleted_);
 }
 
@@ -198,8 +205,8 @@
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{first_location, 0, "",
-                              MoqtObjectStatus::kEndOfTrack, 128},
-      "", true);
+                              MoqtObjectStatus::kEndOfTrack, 128, 0},
+      "", /*offset=*/0);
   EXPECT_TRUE(track_deleted_);
 }
 
@@ -214,8 +221,8 @@
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, 1, "", MoqtObjectStatus::kEndOfGroup,
-                              128},
-      "object", true);
+                              128, 6},
+      "object", /*offset=*/0);
   EXPECT_TRUE(track_deleted_);
 }
 
@@ -230,8 +237,8 @@
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{first_location, 1, "",
-                              MoqtObjectStatus::kEndOfGroup, 128},
-      "", true);
+                              MoqtObjectStatus::kEndOfGroup, 128, 0},
+      "", /*offset=*/0);
   EXPECT_TRUE(track_deleted_);
 }
 
@@ -245,8 +252,9 @@
   EXPECT_CALL(*session_, Unsubscribe(kTrackName));
   publisher_.OnObjectFragment(
       kTrackName,
-      PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 200},
-      "object", true);
+      PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 200,
+                              6},
+      "object", /*offset=*/0);
   EXPECT_TRUE(track_deleted_);
 }
 
@@ -262,8 +270,9 @@
   EXPECT_CALL(*session_, Unsubscribe(kTrackName));
   publisher_.OnObjectFragment(
       kTrackName,
-      PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 128},
-      "object", true);
+      PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 128,
+                              6},
+      "object", /*offset=*/0);
   EXPECT_TRUE(track_deleted_);
 }
 #endif
@@ -278,8 +287,8 @@
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{first_location, 0, "", MoqtObjectStatus::kNormal,
-                              128},
-      "object", true);
+                              128, 6},
+      "object", /*offset=*/0);
   // Object is simply ignored; track is not malformed.
   EXPECT_FALSE(track_deleted_);
 }
@@ -360,8 +369,8 @@
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
-                              128},
-      "object", /*end_of_message=*/true);
+                              128, 6},
+      "object", /*offset=*/0);
   // Exact duplicate is ignored. It doesn't matter that the arrival time
   // changed.
   EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0);
@@ -370,8 +379,8 @@
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
-                              128, quic::QuicTime::Infinite()},
-      "object", /*end_of_message=*/true);
+                              128, 6, quic::QuicTime::Infinite()},
+      "object", /*offset=*/0);
 }
 
 TEST_F(MoqtRelayTrackPublisherTest, DuplicateObjectChangedMetadata) {
@@ -383,16 +392,16 @@
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
-                              128},
-      "object", /*end_of_message=*/true);
+                              128, 6},
+      "object", /*offset=*/0);
   // Priority change; malformed track.
   EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0);
   EXPECT_CALL(listener_, OnTrackPublisherGone);
   publisher_.OnObjectFragment(
       kTrackName,
-      PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
-                              64},
-      "object", /*end_of_message=*/true);
+      PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, 64,
+                              6},
+      "object", /*offset=*/0);
   EXPECT_TRUE(track_deleted_);
 }
 
@@ -405,16 +414,16 @@
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
-                              128},
-      "payload", /*end_of_message=*/true);
+                              128, 7},
+      "payload", /*offset=*/0);
   // Payload change; malformed track.
   EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0);
   EXPECT_CALL(listener_, OnTrackPublisherGone);
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
-                              128},
-      "foobar", /*end_of_message=*/true);
+                              128, 6},
+      "foobar", /*offset=*/0);
   EXPECT_TRUE(track_deleted_);
 }
 
@@ -460,13 +469,104 @@
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, std::nullopt, "",
-                              MoqtObjectStatus::kNormal, 128},
-      "object", /*end_of_message=*/true);
+                              MoqtObjectStatus::kNormal, 128, 6},
+      "object", /*offset=*/0);
   std::optional<PublishedObject> object =
       publisher_.GetCachedObject(location.group, std::nullopt, 0);
   EXPECT_TRUE(object.has_value() && !object->metadata.subgroup.has_value());
 }
 
+TEST_F(MoqtRelayTrackPublisherTest, ObjectArrivalInFragments) {
+  SubscribeAndOk();
+  Location location = kLargestLocation.Next();
+  uint64_t subgroup = 0;
+  // Total size is 15 bytes.
+  PublishedObjectMetadata metadata = {
+      location, subgroup, "", MoqtObjectStatus::kNormal, 128, 15};
+
+  // Fragment 1 arrives.
+  EXPECT_CALL(listener_,
+              OnNewObjectAvailable(location, Optional(subgroup), 128));
+  publisher_.OnObjectFragment(kTrackName, metadata, "frag1", 0);
+
+  // Fragment 2 arrives.
+  EXPECT_CALL(listener_,
+              OnNewObjectAvailable(location, Optional(subgroup), 128));
+  publisher_.OnObjectFragment(kTrackName, metadata, "frag2", 5);
+
+  // Session retrieves the object with two fragments.
+  std::optional<PublishedObject> object =
+      publisher_.GetCachedObject(location.group, subgroup, location.object, 0);
+  ASSERT_TRUE(object.has_value());
+  std::string payload;
+  for (const auto& slice : object->payload) {
+    payload += std::string(slice.AsStringView());
+  }
+  EXPECT_EQ(payload, "frag1frag2");
+
+  // Fragment 3 arrives.
+  EXPECT_CALL(listener_,
+              OnNewObjectAvailable(location, Optional(subgroup), 128));
+  publisher_.OnObjectFragment(kTrackName, metadata, "frag3", 10);
+
+  // Third fragment retrieved separately.
+  object =
+      publisher_.GetCachedObject(location.group, subgroup, location.object, 10);
+  ASSERT_TRUE(object.has_value());
+  payload.clear();
+  for (const auto& slice : object->payload) {
+    payload += std::string(slice.AsStringView());
+  }
+  EXPECT_EQ(payload, "frag3");
+}
+
+TEST_F(MoqtRelayTrackPublisherTest, IncompleteDatagram) {
+  SubscribeAndOk();
+  Location location = kLargestLocation.Next();
+  PublishedObjectMetadata metadata = {
+      location, std::nullopt, "", MoqtObjectStatus::kNormal, 128, 10};
+  // Fragment length mismatch.
+  EXPECT_QUICHE_BUG(
+      publisher_.OnObjectFragment(kTrackName, metadata, "short", 0),
+      "Received a partial datagram.");
+  // Non-zero offset for datagram.
+  EXPECT_QUICHE_BUG(
+      publisher_.OnObjectFragment(kTrackName, metadata, "payload10", 1),
+      "Received a partial datagram.");
+}
+
+TEST_F(MoqtRelayTrackPublisherTest, AlreadyReceivedFragment) {
+  SubscribeAndOk();
+  Location location = kLargestLocation.Next();
+  uint64_t subgroup = 0;
+  // Total size is 15 bytes.
+  PublishedObjectMetadata metadata = {
+      location, subgroup, "", MoqtObjectStatus::kNormal, 128, 15};
+
+  // Fragment 1 arrives (first 10 bytes).
+  EXPECT_CALL(listener_,
+              OnNewObjectAvailable(location, Optional(subgroup), 128));
+  publisher_.OnObjectFragment(kTrackName, metadata, "0123456789", 0);
+
+  // Send a fragment that has already been fully received.
+  // Partial overlap, matches earlier data. Append() will be called with
+  // (0, "01234").
+  // Since payload_received_ (10) > 0 + 5, Append() returns false.
+  // OnObjectFragment should just return without notifying listeners.
+  EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0);
+  publisher_.OnObjectFragment(kTrackName, metadata, "01234", 0);
+
+  std::optional<PublishedObject> object =
+      publisher_.GetCachedObject(location.group, subgroup, location.object, 0);
+  ASSERT_TRUE(object.has_value());
+  // Verify that only the first 10 bytes are cached.
+  std::string payload;
+  for (const auto& slice : object->payload) {
+    payload += std::string(slice.AsStringView());
+  }
+  EXPECT_EQ(payload, "0123456789");
+}
+
 }  // namespace
 
 }  // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 499666d..f48d4c1 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -30,6 +30,7 @@
 #include "quiche/quic/core/quic_alarm_factory.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/core/quic_utils.h"
 #include "quiche/quic/moqt/moqt_bidi_stream.h"
 #include "quiche/quic/moqt/moqt_error.h"
 #include "quiche/quic/moqt/moqt_fetch_task.h"
@@ -223,9 +224,10 @@
     metadata.subgroup = std::nullopt;
     metadata.status = message.object_status;
     metadata.publisher_priority = message.publisher_priority;
+    metadata.payload_length = payload->size();
     metadata.arrival_time = callbacks_.clock->Now();
     visitor->OnObjectFragment(track->full_track_name(), metadata, *payload,
-                              true);
+                              /*offset=*/0);
   }
 }
 
@@ -666,6 +668,23 @@
               << "Got Non-normal object in FETCH";
           continue;
         }
+        if (last_object_.has_value() &&
+            object.metadata.location == last_object_->location) {
+          // This is the continuation of the previous object.
+          webtransport::StreamWriteOptions options;
+          absl::Status write_status =
+              stream_->Writev(absl::MakeSpan(object.payload), options);
+          if (!write_status.ok()) {
+            QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed)
+                << "Writing into MoQT stream failed despite CanWrite() being "
+                   "true before; status: "
+                << write_status;
+            fetch->session_->Error(MoqtError::kInternalError,
+                                   "Data stream write error");
+            return;
+          }
+          break;
+        }
         if (fetch->session_->WriteObjectToStream(
                 stream_, fetch->request_id(), object.metadata,
                 std::move(object.payload), MoqtDataStreamType::Fetch(),
@@ -1666,6 +1685,9 @@
       payload = absl::string_view(partial_object_);
     }
   }
+  if (payload.empty() && bytes_received_this_object_ > 0 && !end_of_message) {
+    return;  // Nothing arrived.
+  }
   if (!parser_.stream_type().has_value()) {
     QUICHE_BUG(quic_bug_object_with_no_stream_type)
         << "Object delivered without a stream type";
@@ -1728,9 +1750,11 @@
       metadata.extensions = message.extension_headers;
       metadata.status = message.object_status;
       metadata.publisher_priority = message.publisher_priority;
+      metadata.payload_length = message.payload_length;
       metadata.arrival_time = session_->callbacks_.clock->Now();
       subscribe->visitor()->OnObjectFragment(track->full_track_name(), metadata,
-                                             payload, end_of_message);
+                                             payload,
+                                             bytes_received_this_object_);
     }
   } else {  // FETCH
     track->OnObjectOrOk();
@@ -1758,6 +1782,11 @@
       task->AppendPayloadToObject(payload);
     }
   }
+  if (end_of_message) {
+    bytes_received_this_object_ = 0;
+  } else {
+    bytes_received_this_object_ += payload.size();
+  }
   partial_object_.clear();
 }
 
@@ -1808,12 +1837,13 @@
     return;
   }
   if (task->HasObject() && !task->NeedsMorePayload()) {
-    return;
+    return;  // The message is complete. Do not read more.
   }
+  uint64_t start_length = task->payload_length();
   parser_.ReadAtMostOneObject();
   // If it read an object, it called OnObjectMessage and may have altered the
   // task's object state.
-  if (task->HasObject() && !task->NeedsMorePayload()) {
+  if (task->payload_length() > start_length) {
     task->NotifyNewObject();
   }
 }
@@ -2364,11 +2394,16 @@
     PublishedSubscription& subscription) {
   while (stream_->CanWrite()) {
     std::optional<PublishedObject> object =
-        subscription.publisher().GetCachedObject(index_.group, index_.subgroup,
-                                                 next_object_);
+        subscription.publisher().GetCachedObject(
+            index_.group, index_.subgroup, next_object_, already_delivered_);
     if (!object.has_value()) {
       break;
     }
+    if (object->metadata.payload_length > 0 && object->payload.empty()) {
+      QUICHE_BUG(OutgoingDataStream_empty_payload)
+          << "Received non-empty object with no payload";
+      return;
+    }
 
     QUICHE_DCHECK_EQ(object->metadata.location.group, index_.group);
     QUICHE_DCHECK(object->metadata.subgroup == index_.subgroup);
@@ -2390,19 +2425,50 @@
       stream_->ResetWithUserCode(kResetCodeDeliveryTimeout);
       return;
     }
-    if (!session_->WriteObjectToStream(stream_, subscription.track_alias(),
-                                       object->metadata,
-                                       std::move(object->payload), stream_type_,
-                                       last_object_, object->fin_after_this)) {
-      // WriteObjectToStream() closes the connection on error, meaning that
-      // there is no need to process the stream any further.
+    uint64_t start_offset = already_delivered_;
+    already_delivered_ +=
+        quic::MemSliceSpanTotalSize(absl::MakeSpan(object->payload));
+    bool fin_after_this = object->fin_after_this &&
+                          already_delivered_ == object->metadata.payload_length;
+    if (start_offset > 0) {  // Just send payload.
+      if (already_delivered_ == start_offset) {
+        // Partial delivery of an object but the payload is empty. This would
+        // result in an infinite loop.
+        QUICHE_BUG(OutgoingDataStream_empty_payload)
+            << "Empty payload for partial object " << object->metadata.location;
+        return;
+      }
+      webtransport::StreamWriteOptions options;
+      options.set_send_fin(fin_after_this);
+      absl::Status write_status =
+          stream_->Writev(absl::MakeSpan(object->payload), options);
+      if (!write_status.ok()) {
+        QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed)
+            << "Writing into MoQT stream failed despite CanWrite() being true "
+               "before; status: "
+            << write_status;
+        session_->Error(MoqtError::kInternalError, "Data stream write error");
+        return;
+      }
+    } else {
+      if (!session_->WriteObjectToStream(
+              stream_, subscription.track_alias(), object->metadata,
+              std::move(object->payload), stream_type_, last_object_,
+              fin_after_this)) {
+        // WriteObjectToStream() closes the connection on error, meaning that
+        // there is no need to process the stream any further.
+        return;
+      }
+      last_object_ = object->metadata;
+      subscription.OnObjectSent(object->metadata.location);
+      next_object_ = last_object_->location.object;
+    }
+    if (already_delivered_ == last_object_->payload_length) {
+      ++next_object_;
+      already_delivered_ = 0;
+    } else {
       return;
     }
-    last_object_ = object->metadata;
-
-    next_object_ = last_object_->location.object + 1;
-    subscription.OnObjectSent(object->metadata.location);
-
     if (object->fin_after_this && !delivery_timeout.IsInfinite() &&
         !session_->alternate_delivery_timeout_) {
       CreateAndSetAlarm(object->metadata.arrival_time + delivery_timeout);
@@ -2433,9 +2499,9 @@
 
 bool MoqtSession::WriteObjectToStream(
     webtransport::Stream* stream, uint64_t id,
-    const PublishedObjectMetadata& metadata, quiche::QuicheMemSlice payload,
-    MoqtDataStreamType type, std::optional<PublishedObjectMetadata> last_object,
-    bool fin) {
+    const PublishedObjectMetadata& metadata,
+    std::vector<quiche::QuicheMemSlice> payload, MoqtDataStreamType type,
+    std::optional<PublishedObjectMetadata> last_object, bool fin) {
   QUICHE_DCHECK(stream->CanWrite());
   MoqtObject header;
   header.track_alias = id;
@@ -2445,14 +2511,18 @@
   header.publisher_priority = metadata.publisher_priority;
   header.extension_headers = metadata.extensions;
   header.object_status = metadata.status;
-  header.payload_length = payload.length();
+  header.payload_length = metadata.payload_length;
 
   quiche::QuicheBuffer serialized_header =
       framer_.SerializeObjectHeader(header, type, last_object);
   // TODO(vasilvv): add a version of WebTransport write API that accepts
   // memslices so that we can avoid a copy here.
-  std::array write_vector = {
-      quiche::QuicheMemSlice(std::move(serialized_header)), std::move(payload)};
+  std::vector<quiche::QuicheMemSlice> write_vector;
+  write_vector.reserve(payload.size() + 1);
+  write_vector.push_back(quiche::QuicheMemSlice(std::move(serialized_header)));
+  for (auto& slice : payload) {
+    write_vector.push_back(std::move(slice));
+  }
   webtransport::StreamWriteOptions options;
   options.set_send_fin(fin);
   absl::Status write_status =
@@ -2553,9 +2623,9 @@
   header.extension_headers = object->metadata.extensions;
   header.object_status = object->metadata.status;
   header.subgroup_id = std::nullopt;
-  header.payload_length = object->payload.length();
+  header.payload_length = object->metadata.payload_length;
   quiche::QuicheBuffer datagram = session_->framer_.SerializeObjectDatagram(
-      header, object->payload.AsStringView(),
+      header, object->payload[0].AsStringView(),
       default_publisher_priority_.value_or(kDefaultPublisherPriority));
   session_->session_->SendOrQueueDatagram(datagram.AsStringView());
   OnObjectSent(object->metadata.location);
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 13df94e..7c8d30e 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -359,6 +359,7 @@
     quiche::QuicheWeakPtr<RemoteTrack> track_;
     MoqtDataParser parser_;
     std::string partial_object_;
+    uint64_t bytes_received_this_object_ = 0;
   };
   // Represents a record for a single subscription to a local track that is
   // being sent to the peer.
@@ -580,6 +581,9 @@
     // exact ID of the next object in the stream because the next object could
     // be in a different subgroup or simply be skipped.
     uint64_t next_object_;
+    // Number of payload bytes from next_object_ that has already been written
+    // to the stream.
+    uint64_t already_delivered_ = 0;
     // Used in subgroup streams to compute the object ID diff. If nullopt, the
     // stream header has not been written yet.
     std::optional<PublishedObjectMetadata> last_object_;
@@ -753,7 +757,7 @@
   // the write was successful.
   bool WriteObjectToStream(webtransport::Stream* stream, uint64_t id,
                            const PublishedObjectMetadata& metadata,
-                           quiche::QuicheMemSlice payload,
+                           std::vector<quiche::QuicheMemSlice> payload,
                            MoqtDataStreamType type,
                            std::optional<PublishedObjectMetadata> last_object,
                            bool fin);
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h
index 1df78b3..af1d6d0 100644
--- a/quiche/quic/moqt/moqt_session_interface.h
+++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -41,6 +41,8 @@
   MoqtSessionParameters() = default;
   explicit MoqtSessionParameters(quic::Perspective perspective)
       : perspective(perspective), using_webtrans(true) {}
+  explicit MoqtSessionParameters(bool deliver_partial_objects)
+      : deliver_partial_objects(deliver_partial_objects) {}
   MoqtSessionParameters(quic::Perspective perspective, std::string path,
                         std::string authority)
       : perspective(perspective),
@@ -101,8 +103,7 @@
   // Called when an object fragment (or an entire object) is received.
   virtual void OnObjectFragment(const FullTrackName& full_track_name,
                                 const PublishedObjectMetadata& metadata,
-                                absl::string_view object,
-                                bool end_of_message) = 0;
+                                absl::string_view object, uint64_t offset) = 0;
   virtual void OnPublishDone(FullTrackName full_track_name) = 0;
   // Called when the track is malformed per Section 2.5 of
   // draft-ietf-moqt-moq-transport-12. If the application is a relay, it MUST
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index ecf0e53..e5336cc 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -13,6 +13,7 @@
 #include <string>
 #include <utility>
 #include <variant>
+#include <vector>
 
 #include "absl/base/casts.h"
 #include "absl/memory/memory.h"
@@ -74,6 +75,12 @@
 constexpr MoqtPriority kDefaultPublisherPriority = 0x80;
 const TrackExtensions kNoExtensions;
 
+std::vector<quiche::QuicheMemSlice> PayloadFromString(absl::string_view s) {
+  std::vector<quiche::QuicheMemSlice> payload;
+  payload.push_back(quiche::QuicheMemSlice::Copy(s));
+  return payload;
+}
+
 FullTrackName kDefaultTrackName() { return FullTrackName("foo", "bar"); }
 
 MoqtSubscribe DefaultSubscribe(uint64_t request_id) {
@@ -1155,16 +1162,16 @@
   EXPECT_CALL(remote_track_visitor_, OnObjectFragment)
       .WillOnce([&](const FullTrackName& track_name,
                     const PublishedObjectMetadata& metadata,
-                    const absl::string_view received_payload,
-                    bool end_of_message) {
+                    const absl::string_view received_payload, uint64_t offset) {
         EXPECT_EQ(track_name, ftn);
         EXPECT_EQ(metadata.location, Location(0, 0));
         EXPECT_EQ(metadata.subgroup, 0);
         EXPECT_EQ(metadata.extensions, "foo");
         EXPECT_EQ(metadata.status, MoqtObjectStatus::kNormal);
         EXPECT_EQ(metadata.publisher_priority, 0);
+        EXPECT_EQ(metadata.payload_length, payload.length());
         EXPECT_EQ(payload, received_payload);
-        EXPECT_TRUE(end_of_message);
+        EXPECT_EQ(offset, 0);
       });
   EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kIncomingUniStreamId));
@@ -1220,11 +1227,16 @@
   std::unique_ptr<MoqtDataParserVisitor> object_stream =
       MoqtSessionPeer::CreateIncomingDataStream(&session, &mock_stream_,
                                                 kDefaultSubgroupStreamType);
-
-  EXPECT_CALL(remote_track_visitor_, OnObjectFragment).Times(2);
   EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kIncomingUniStreamId));
+  EXPECT_CALL(remote_track_visitor_, OnObjectFragment(ftn, _, payload, 0));
   object_stream->OnObjectMessage(object, payload, false);
+  EXPECT_CALL(remote_track_visitor_,
+              OnObjectFragment(ftn, _, payload, payload.length()));
+  object_stream->OnObjectMessage(object, payload, true);  // complete the object
+  // New object, check the offset was reset.
+  ++object.object_id;
+  EXPECT_CALL(remote_track_visitor_, OnObjectFragment(ftn, _, payload, 0));
   object_stream->OnObjectMessage(object, payload, true);  // complete the object
 }
 
@@ -1327,14 +1339,15 @@
         fin |= options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
-    return PublishedObject{
-        PublishedObjectMetadata{Location(5, 0), 0, "extensions",
-                                MoqtObjectStatus::kNormal, 127,
-                                MoqtSessionPeer::Now(&session_)},
-        MemSliceFromString("deadbeef"), false};
-  });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
+      .WillRepeatedly([&] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location(5, 0), 0, "extensions",
+                                    MoqtObjectStatus::kNormal, 127, 8,
+                                    MoqtSessionPeer::Now(&session_)},
+            PayloadFromString("deadbeef"), false};
+      });
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0, 127);
@@ -1381,13 +1394,15 @@
         fin = options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
-    return PublishedObject{PublishedObjectMetadata{
-                               Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtSessionPeer::Now(&session_)},
-                           MemSliceFromString("deadbeef"), true};
-  });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
+      .WillRepeatedly([&] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location(5, 0), 0, "",
+                                    MoqtObjectStatus::kNormal, 127, 8,
+                                    MoqtSessionPeer::Now(&session_)},
+            PayloadFromString("deadbeef"), true};
+      });
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
@@ -1396,6 +1411,79 @@
   EXPECT_TRUE(fin);
 }
 
+TEST_F(MoqtSessionTest, SendFragmentedObject) {
+  FullTrackName ftn("foo", "bar");
+  auto track =
+      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
+  MoqtObjectListener* subscription =
+      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
+
+  // 1st OnNewObjectAvailable happens when no outgoing unidirectional stream is
+  // available.
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(false));
+  subscription->OnNewObjectAvailable(Location(5, 0), 0, 128);
+
+  // 2nd OnNewObjectAvailable happens when Part 1 ("part1", 5 bytes) and
+  // Part 2 ("part2", 5 bytes) are available.
+  bool fin = false;
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(true));
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
+  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&mock_stream_));
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor = std::move(visitor);
+      });
+  EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
+    return stream_visitor.get();
+  });
+  EXPECT_CALL(mock_stream_, GetStreamId())
+      .WillRepeatedly(Return(kOutgoingUniStreamId));
+  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
+      .WillRepeatedly(Return(&mock_stream_));
+  PublishedObjectMetadata metadata = {
+      Location(5, 0),
+      /*subgroup=*/0,
+      /*extensions=*/"",     MoqtObjectStatus::kNormal,
+      /*priority=*/128,
+      /*payload_length=*/15, MoqtSessionPeer::Now(&session_)};
+  std::vector<quiche::QuicheMemSlice> payload;
+  payload.push_back(quiche::QuicheMemSlice::Copy("part1"));
+  payload.push_back(quiche::QuicheMemSlice::Copy("part2"));
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
+      .WillOnce(Return(PublishedObject{metadata, std::move(payload), false}));
+  // Expect Writev for the header and first two parts.
+  EXPECT_CALL(mock_stream_, Writev)
+      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
+                    const webtransport::StreamWriteOptions& options) {
+        EXPECT_EQ(data.size(), 3);  // Header, part1, and part2.
+        EXPECT_EQ(data[1].AsStringView(), "part1");
+        EXPECT_EQ(data[2].AsStringView(), "part2");
+        EXPECT_FALSE(options.send_fin());
+        return absl::OkStatus();
+      });
+  subscription->OnNewObjectAvailable(Location(5, 0), 0, 128);
+
+  // 3rd OnNewObjectAvailable happens when Part 3 ("part3", 5 bytes) is
+  // available.
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 10))
+      .WillOnce(
+          Return(PublishedObject{metadata, PayloadFromString("part3"), true}));
+  EXPECT_CALL(mock_stream_, Writev)
+      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
+                    const webtransport::StreamWriteOptions& options) {
+        EXPECT_TRUE(options.send_fin());
+        EXPECT_EQ(data.size(), 1);  // No header, just part3.
+        EXPECT_EQ(data[0].AsStringView(), "part3");
+        fin = true;
+        return absl::OkStatus();
+      });
+  subscription->OnNewObjectAvailable(Location(5, 0), 0, 128);
+}
+
 TEST_F(MoqtSessionTest, GroupAbandonedNoDeliveryTimeout) {
   FullTrackName ftn("foo", "bar");
   auto track =
@@ -1433,13 +1521,15 @@
         fin |= options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
-    return PublishedObject{PublishedObjectMetadata{
-                               Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtSessionPeer::Now(&session_)},
-                           MemSliceFromString("deadbeef"), true};
-  });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
+      .WillRepeatedly([&] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location(5, 0), 0, "",
+                                    MoqtObjectStatus::kNormal, 127, 8,
+                                    MoqtSessionPeer::Now(&session_)},
+            PayloadFromString("deadbeef"), true};
+      });
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
@@ -1499,13 +1589,15 @@
         fin |= options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
-    return PublishedObject{PublishedObjectMetadata{
-                               Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtSessionPeer::Now(&session_)},
-                           MemSliceFromString("deadbeef"), true};
-  });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
+      .WillRepeatedly([&] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location(5, 0), 0, "",
+                                    MoqtObjectStatus::kNormal, 127, 8,
+                                    MoqtSessionPeer::Now(&session_)},
+            PayloadFromString("deadbeef"), true};
+      });
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
@@ -1567,13 +1659,15 @@
         fin |= options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
-    return PublishedObject{PublishedObjectMetadata{
-                               Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtSessionPeer::Now(&session_)},
-                           MemSliceFromString("deadbeef"), true};
-  });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
+      .WillRepeatedly([&] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location(5, 0), 0, "",
+                                    MoqtObjectStatus::kNormal, 127, 8,
+                                    MoqtSessionPeer::Now(&session_)},
+            PayloadFromString("deadbeef"), true};
+      });
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
@@ -1621,13 +1715,15 @@
         fin = options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
-    return PublishedObject{PublishedObjectMetadata{
-                               Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtSessionPeer::Now(&session_)},
-                           MemSliceFromString("deadbeef"), false};
-  });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
+      .WillRepeatedly([&] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location(5, 0), 0, "",
+                                    MoqtObjectStatus::kNormal, 127, 8,
+                                    MoqtSessionPeer::Now(&session_)},
+            PayloadFromString("deadbeef"), false};
+      });
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
@@ -1682,13 +1778,15 @@
         fin = options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
-    return PublishedObject{PublishedObjectMetadata{
-                               Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtSessionPeer::Now(&session_)},
-                           MemSliceFromString("deadbeef"), false};
-  });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
+      .WillRepeatedly([&] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location(5, 0), 0, "",
+                                    MoqtObjectStatus::kNormal, 127, 8,
+                                    MoqtSessionPeer::Now(&session_)},
+            PayloadFromString("deadbeef"), false};
+      });
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
@@ -1709,14 +1807,15 @@
   // object id, extensions, payload length, status.
   const std::string kExpectedMessage2 = {0x00, 0x00, 0x00, 0x03};
   EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return true; });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([&] {
-    return PublishedObject{
-        PublishedObjectMetadata{Location(5, 1), 0, "",
-                                MoqtObjectStatus::kEndOfGroup, 127,
-                                MoqtSessionPeer::Now(&session_)},
-        MemSliceFromString(""), true};
-  });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 2)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0))
+      .WillRepeatedly([&] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location(5, 1), 0, "",
+                                    MoqtObjectStatus::kEndOfGroup, 127, 0,
+                                    MoqtSessionPeer::Now(&session_)},
+            PayloadFromString(""), true};
+      });
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 2, 0)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   EXPECT_CALL(mock_stream_, Writev(_, _))
@@ -1769,13 +1868,15 @@
         fin = options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
-    return PublishedObject{PublishedObjectMetadata{
-                               Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtSessionPeer::Now(&session_)},
-                           MemSliceFromString("deadbeef"), false};
-  });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
+      .WillRepeatedly([&] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location(5, 0), 0, "",
+                                    MoqtObjectStatus::kNormal, 127, 8,
+                                    MoqtSessionPeer::Now(&session_)},
+            PayloadFromString("deadbeef"), false};
+      });
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
@@ -1821,13 +1922,13 @@
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
       .WillRepeatedly(Return(&mock_stream_));
   EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([] {
-    return PublishedObject{
-        PublishedObjectMetadata{Location(5, 0), 0, "",
-                                MoqtObjectStatus::kNormal, 128},
-        MemSliceFromString("deadbeef")};
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)).WillRepeatedly([] {
+    return PublishedObject{PublishedObjectMetadata{
+                               Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
+                               128, 8, quic::QuicTime::Zero()},
+                           PayloadFromString("deadbeef")};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1871,13 +1972,15 @@
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
       .WillRepeatedly(Return(&mock_stream_));
   EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 0)).WillRepeatedly([] {
-    return PublishedObject{
-        PublishedObjectMetadata{Location(6, 0), 0, "",
-                                MoqtObjectStatus::kNormal, 128},
-        MemSliceFromString("deadbeef")};
-  });
-  EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 0, 0))
+      .WillRepeatedly([&] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location(6, 0), 0, "",
+                                    MoqtObjectStatus::kNormal, 128, 8,
+                                    MoqtSessionPeer::Now(&session_)},
+            PayloadFromString("deadbeef")};
+      });
+  EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 1, 0)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1910,13 +2013,13 @@
       .WillRepeatedly(Return(&mock_stream_));
 
   EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)).WillRepeatedly([] {
     return PublishedObject{
         PublishedObjectMetadata{Location(5, 0), 0, "",
-                                MoqtObjectStatus::kNormal, 128},
-        MemSliceFromString("deadbeef")};
+                                MoqtObjectStatus::kNormal, 128, 8},
+        PayloadFromString("deadbeef")};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillOnce([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillOnce([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
@@ -1925,7 +2028,7 @@
   // disappear by returning nullptr.
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
       .WillRepeatedly(Return(nullptr));
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).Times(0);
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).Times(0);
   subscription->OnNewObjectAvailable(Location(5, 1), 0,
                                      kDefaultPublisherPriority);
 }
@@ -1967,12 +2070,13 @@
             webtransport::DatagramStatusCode::kSuccess, "");
       });
   EXPECT_CALL(*track_publisher,
-              GetCachedObject(5, std::optional<uint64_t>(), 0))
-      .WillRepeatedly([] {
+              GetCachedObject(5, std::optional<uint64_t>(), 0, 0))
+      .WillRepeatedly([&] {
         return PublishedObject{
             PublishedObjectMetadata{Location{5, 0}, std::nullopt, "ext",
-                                    MoqtObjectStatus::kNormal, 32},
-            quiche::QuicheMemSlice::Copy("deadbeef")};
+                                    MoqtObjectStatus::kNormal, 32, 8,
+                                    MoqtSessionPeer::Now(&session_)},
+            PayloadFromString("deadbeef")};
       });
   listener->OnNewObjectAvailable(Location(5, 0), std::nullopt, 32);
   EXPECT_TRUE(correct_message);
@@ -1998,15 +2102,16 @@
   EXPECT_CALL(remote_track_visitor_, OnObjectFragment)
       .WillOnce([&](const FullTrackName& track_name,
                     const PublishedObjectMetadata& metadata,
-                    absl::string_view received_payload, bool fin) {
+                    absl::string_view received_payload, uint64_t offset) {
         EXPECT_EQ(track_name, ftn);
         EXPECT_EQ(metadata.location,
                   Location(object.group_id, object.object_id));
         EXPECT_EQ(metadata.subgroup, object.subgroup_id);
         EXPECT_EQ(metadata.publisher_priority, object.publisher_priority);
         EXPECT_EQ(metadata.status, object.object_status);
+        EXPECT_EQ(metadata.payload_length, payload.length());
         EXPECT_EQ(payload, received_payload);
-        EXPECT_TRUE(fin);
+        EXPECT_EQ(offset, 0);
       });
   session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
 }
@@ -2033,8 +2138,10 @@
   EXPECT_CALL(remote_track_visitor_, OnObjectFragment)
       .WillOnce([&](const FullTrackName&,
                     const PublishedObjectMetadata& metadata, absl::string_view,
-                    bool) {
+                    uint64_t offset) {
         EXPECT_EQ(metadata.publisher_priority, kPeerDefaultPriority);
+        EXPECT_EQ(metadata.payload_length, 8u);
+        EXPECT_EQ(offset, 0);
       });
   session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
   // Omit priority from a stream.
@@ -2047,8 +2154,10 @@
   EXPECT_CALL(remote_track_visitor_, OnObjectFragment)
       .WillOnce([&](const FullTrackName&,
                     const PublishedObjectMetadata& metadata, absl::string_view,
-                    bool) {
+                    uint64_t offset) {
         EXPECT_EQ(metadata.publisher_priority, kPeerDefaultPriority);
+        EXPECT_EQ(metadata.payload_length, 3u);
+        EXPECT_EQ(offset, 0);
       });
   in_memory_stream.Receive(absl::string_view(stream_data, sizeof(stream_data)),
                            false);
@@ -2085,12 +2194,12 @@
     return stream_visitor.get();
   });
   EXPECT_CALL(mock_stream_, CanWrite).WillRepeatedly(Return(true));
-  EXPECT_CALL(*track, GetCachedObject)
-      .WillOnce(Return(
-          PublishedObject{PublishedObjectMetadata{Location(0, 0), 0, "",
-                                                  MoqtObjectStatus::kNormal,
-                                                  kLocalDefaultPriority},
-                          MemSliceFromString("deadbeef")}))
+  EXPECT_CALL(*track, GetCachedObject(_, _, _, _))
+      .WillOnce(Return(PublishedObject{
+          PublishedObjectMetadata{
+              Location(0, 0), 0, "", MoqtObjectStatus::kNormal,
+              kLocalDefaultPriority, 8, MoqtSessionPeer::Now(&session_)},
+          PayloadFromString("deadbeef")}))
       .WillOnce(Return(std::nullopt));
   EXPECT_CALL(mock_stream_, Writev)
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
@@ -2102,12 +2211,12 @@
       });
   listener->OnNewObjectAvailable(Location(0, 0), 0, kLocalDefaultPriority);
   // Send a datagram with the default priority.
-  EXPECT_CALL(*track, GetCachedObject)
-      .WillOnce(Return(
-          PublishedObject{PublishedObjectMetadata{Location(0, 1), std::nullopt,
-                                                  "", MoqtObjectStatus::kNormal,
-                                                  kLocalDefaultPriority},
-                          MemSliceFromString("deadbeef")}));
+  EXPECT_CALL(*track, GetCachedObject(_, _, _, _))
+      .WillOnce(Return(PublishedObject{
+          PublishedObjectMetadata{
+              Location(0, 1), std::nullopt, "", MoqtObjectStatus::kNormal,
+              kLocalDefaultPriority, 8, MoqtSessionPeer::Now(&session_)},
+          PayloadFromString("deadbeef")}));
   EXPECT_CALL(mock_session_, SendOrQueueDatagram)
       .WillOnce([](absl::string_view datagram) {
         EXPECT_TRUE(static_cast<const uint8_t>(datagram[0]) &
@@ -2118,12 +2227,12 @@
   listener->OnNewObjectAvailable(Location(0, 1), std::nullopt,
                                  kLocalDefaultPriority);
   // Non-default priority
-  EXPECT_CALL(*track, GetCachedObject)
-      .WillOnce(Return(
-          PublishedObject{PublishedObjectMetadata{Location(0, 2), std::nullopt,
-                                                  "", MoqtObjectStatus::kNormal,
-                                                  kLocalDefaultPriority + 1},
-                          MemSliceFromString("deadbeef")}));
+  EXPECT_CALL(*track, GetCachedObject(_, _, _, _))
+      .WillOnce(Return(PublishedObject{
+          PublishedObjectMetadata{
+              Location(0, 2), std::nullopt, "", MoqtObjectStatus::kNormal,
+              kLocalDefaultPriority + 1, 8, MoqtSessionPeer::Now(&session_)},
+          PayloadFromString("deadbeef")}));
   EXPECT_CALL(mock_session_, SendOrQueueDatagram)
       .WillOnce([](absl::string_view datagram) {
         EXPECT_FALSE(static_cast<const uint8_t>(datagram[0]) &
@@ -2219,26 +2328,29 @@
   EXPECT_CALL(mock_stream2, visitor()).WillOnce([&]() {
     return stream_visitor[2].get();
   });
-  EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 0))
+  EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 0, 0))
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kNormal, 127},
-          MemSliceFromString("deadbeef")}));
-  EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 1))
+                                  MoqtObjectStatus::kNormal, 128, 6,
+                                  MoqtSessionPeer::Now(&session_)},
+          PayloadFromString("foobar")}));
+  EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 1, 0))
       .WillOnce(Return(std::nullopt));
-  EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 0))
+  EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 0, 0))
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(1, 0), 0, "",
-                                  MoqtObjectStatus::kNormal, 127},
-          MemSliceFromString("deadbeef")}));
-  EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 1))
+                                  MoqtObjectStatus::kNormal, 127, 8,
+                                  MoqtSessionPeer::Now(&session_)},
+          PayloadFromString("deadbeef")}));
+  EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 1, 0))
       .WillOnce(Return(std::nullopt));
-  EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 0))
+  EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 0, 0))
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(2, 0), 0, "",
-                                  MoqtObjectStatus::kNormal, 127},
-          MemSliceFromString("deadbeef")}));
-  EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 1))
+                                  MoqtObjectStatus::kNormal, 127, 8,
+                                  MoqtSessionPeer::Now(&session_)},
+          PayloadFromString("deadbeef")}));
+  EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 1, 0))
       .WillOnce(Return(std::nullopt));
   EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
   EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
@@ -2331,12 +2443,12 @@
   EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() {
     return stream_visitor0.get();
   });
-  EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 0))
+  EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 0, 0))
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kNormal, 127},
-          MemSliceFromString("foobar")}));
-  EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 1))
+                                  MoqtObjectStatus::kNormal, 127, 6},
+          PayloadFromString("foobar")}));
+  EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 1, 0))
       .WillOnce(Return(std::nullopt));
   EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
   EXPECT_CALL(mock_stream0, Writev)
@@ -2368,12 +2480,12 @@
   EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() {
     return stream_visitor1.get();
   });
-  EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 0))
+  EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 0, 0))
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kNormal, 127},
-          MemSliceFromString("deadbeef")}));
-  EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 1))
+                                  MoqtObjectStatus::kNormal, 127, 8},
+          PayloadFromString("deadbeef")}));
+  EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 1, 0))
       .WillOnce(Return(std::nullopt));
   EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
   EXPECT_CALL(mock_stream1, Writev(_, _))
@@ -2425,7 +2537,8 @@
         output.metadata.subgroup = 0;
         output.metadata.status = status;
         output.metadata.publisher_priority = 128;
-        output.payload = quiche::QuicheMemSlice::Copy(payload);
+        output.metadata.payload_length = payload.length();
+        output.payload = PayloadFromString(payload);
         output.fin_after_this = true;  // should be ignored.
         return MoqtFetchTask::GetNextObjectResult::kSuccess;
       })
@@ -2525,6 +2638,82 @@
   stream_input->OnFetchMessage(fetch);
 }
 
+TEST_F(MoqtSessionTest, SendFragmentedFetchObject) {
+  using ::testing::ByMove;
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MoqtFetch fetch = DefaultFetch();
+  fetch.request_id = 3;  // Use an odd ID for peer request in client session.
+  MockTrackPublisher* track = CreateTrackPublisher();
+
+  // Disable synchronous callback to have more control.
+  auto fetch_task_ptr =
+      std::make_unique<MockFetchTask>(std::nullopt, std::nullopt, false);
+  MockFetchTask* fetch_task = fetch_task_ptr.get();
+  EXPECT_CALL(*track, StandaloneFetch)
+      .WillOnce(Return(ByMove(std::move(fetch_task_ptr))));
+
+  // Receive FETCH, send FETCH_OK.
+  stream_input->OnFetchMessage(fetch);
+  // FETCH_OK responding to the request.
+  MoqtFetchOk expected_ok;
+  expected_ok.request_id = fetch.request_id;
+  expected_ok.end_location = Location(1, 0);
+  EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _));
+  fetch_task->CallFetchResponseCallback(expected_ok);
+
+  webtransport::test::MockStream data_stream;
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream)
+      .WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&data_stream));
+  EXPECT_CALL(data_stream, SetVisitor)
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor = std::move(visitor);
+      });
+  EXPECT_CALL(data_stream, SetPriority);
+  EXPECT_CALL(data_stream, CanWrite).WillRepeatedly(Return(true));
+  // Trigger stream opening (calls SetObjectAvailableCallback with lambda1).
+  // Setting the stream visitor will cause a second call to the callback.
+  PublishedObjectMetadata metadata = {
+      Location(0, 0), 0, "", MoqtObjectStatus::kNormal, 128, 10};
+  EXPECT_CALL(*fetch_task, GetNextObject)
+      .WillOnce([&](PublishedObject& output) {
+        output.metadata = metadata;
+        output.payload = PayloadFromString("part1");
+        return MoqtFetchTask::GetNextObjectResult::kSuccess;
+      })
+      .WillOnce(Return(MoqtFetchTask::GetNextObjectResult::kPending));
+  EXPECT_CALL(data_stream, Writev)
+      .WillOnce([&](absl::Span<const quiche::QuicheMemSlice> data,
+                    const webtransport::StreamWriteOptions& options) {
+        EXPECT_EQ(data.size(), 2);
+        EXPECT_EQ(data[1].AsStringView(), "part1");
+        return absl::OkStatus();
+      });
+  fetch_task->CallObjectsAvailableCallback();
+  // lambda1 ran, data_stream captured, stream_visitor set.
+  ASSERT_NE(stream_visitor, nullptr);
+
+  // The second fragment is available.
+  EXPECT_CALL(*fetch_task, GetNextObject)
+      .WillOnce([&](PublishedObject& output) {
+        output.metadata = metadata;
+        output.payload = PayloadFromString("part2");
+        return MoqtFetchTask::GetNextObjectResult::kSuccess;
+      })
+      .WillRepeatedly(Return(MoqtFetchTask::GetNextObjectResult::kPending));
+  EXPECT_CALL(data_stream, Writev)
+      .WillOnce([&](absl::Span<const quiche::QuicheMemSlice> data,
+                    const webtransport::StreamWriteOptions& options) {
+        EXPECT_EQ(data.size(), 1);  // No header.
+        EXPECT_EQ(data[0].AsStringView(), "part2");
+        return absl::OkStatus();
+      });
+  fetch_task->CallObjectsAvailableCallback();
+}
+
 // The publisher has the first object locally, but has to go upstream to get
 // the rest.
 TEST_F(MoqtSessionTest, FetchReturnsObjectBeforeOk) {
@@ -3190,9 +3379,10 @@
   stream.Receive("foo", false);
   EXPECT_TRUE(task->HasObject());
   EXPECT_TRUE(task->NeedsMorePayload());
-  EXPECT_FALSE(object_ready);
-  stream.Receive("bar", false);
   EXPECT_TRUE(object_ready);
+  object_ready = false;
+  stream.Receive("bar", false);
+  EXPECT_FALSE(object_ready);  // No second call to the callback.
   EXPECT_TRUE(task->HasObject());
   EXPECT_FALSE(task->NeedsMorePayload());
   task->SetObjectAvailableCallback(nullptr);
@@ -3239,17 +3429,18 @@
     return stream_visitor.get();
   });
   EXPECT_CALL(*track_publisher, GetCachedObject)
-      .WillOnce(
-          Return(PublishedObject{PublishedObjectMetadata{
-                                     Location(0, 0),
-                                     0,
-                                     "",
-                                     MoqtObjectStatus::kObjectDoesNotExist,
-                                     0,
-                                     MoqtSessionPeer::Now(&session_) -
-                                         quic::QuicTimeDelta::FromSeconds(1),
-                                 },
-                                 quiche::QuicheMemSlice(), false}));
+      .WillOnce(Return(
+          PublishedObject{PublishedObjectMetadata{
+                              Location(0, 0),
+                              0,
+                              "",
+                              MoqtObjectStatus::kObjectDoesNotExist,
+                              0,
+                              0,
+                              MoqtSessionPeer::Now(&session_) -
+                                  quic::QuicTimeDelta::FromSeconds(1),
+                          },
+                          std::vector<quiche::QuicheMemSlice>(), false}));
   EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout))
       .WillOnce([&](webtransport::StreamErrorCode /*error*/) {
         stream_visitor.reset();
@@ -3304,9 +3495,9 @@
   EXPECT_CALL(*track_publisher, GetCachedObject)
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kObjectDoesNotExist, 0,
+                                  MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
                                   MoqtSessionPeer::Now(&session_)},
-          quiche::QuicheMemSlice(), true}))
+          PayloadFromString(""), true}))
       .WillOnce(Return(std::nullopt));
   EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
   EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout)).Times(0);
@@ -3354,9 +3545,9 @@
   EXPECT_CALL(*track_publisher, GetCachedObject)
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kObjectDoesNotExist, 0,
+                                  MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
                                   MoqtSessionPeer::Now(&session_)},
-          quiche::QuicheMemSlice(), false}))
+          PayloadFromString(""), false}))
       .WillOnce(Return(std::nullopt));
   EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
   ON_CALL(*track_publisher, largest_location())
@@ -3407,9 +3598,9 @@
   EXPECT_CALL(*track_publisher, GetCachedObject)
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kObjectDoesNotExist, 0,
+                                  MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
                                   MoqtSessionPeer::Now(&session_)},
-          quiche::QuicheMemSlice(), false}))
+          PayloadFromString(""), false}))
       .WillOnce(Return(std::nullopt));
   EXPECT_CALL(data_mock1, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
   ON_CALL(*track_publisher, largest_location)
@@ -3436,9 +3627,9 @@
   EXPECT_CALL(*track_publisher, GetCachedObject)
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(1, 0), 0, "",
-                                  MoqtObjectStatus::kObjectDoesNotExist, 0,
+                                  MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
                                   MoqtSessionPeer::Now(&session_)},
-          quiche::QuicheMemSlice(), false}))
+          PayloadFromString(""), false}))
       .WillOnce(Return(std::nullopt));
   EXPECT_CALL(data_mock2, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
   ON_CALL(*track_publisher, largest_location)
@@ -4168,13 +4359,14 @@
                                        /*start_object=*/0);
 
   // Send a datagram in window.
-  EXPECT_CALL(*mock_publisher, GetCachedObject(8, std::optional<uint64_t>(), 0))
+  EXPECT_CALL(*mock_publisher,
+              GetCachedObject(8, std::optional<uint64_t>(), 0, 0))
       .WillOnce([&] {
         return PublishedObject{
             PublishedObjectMetadata{Location(8, 0), std::nullopt, "extensions",
-                                    MoqtObjectStatus::kNormal, 128,
+                                    MoqtObjectStatus::kNormal, 128, 8,
                                     MoqtSessionPeer::Now(&session_)},
-            quiche::QuicheMemSlice::Copy("deadbeef"), false};
+            PayloadFromString("deadbeef"), false};
       });
   EXPECT_CALL(mock_session_, SendOrQueueDatagram)
       .WillOnce(Return(webtransport::DatagramStatus(
@@ -4216,13 +4408,13 @@
     return data_stream_visitor.get();
   });
   EXPECT_CALL(mock_stream_, CanWrite).WillRepeatedly(Return(true));
-  EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 0))
+  EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 0, 0))
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 1, "",
-                                  MoqtObjectStatus::kNormal, 0x80,
+                                  MoqtObjectStatus::kNormal, 0x80, 0,
                                   MoqtSessionPeer::Now(&session_)},
-          quiche::QuicheMemSlice(), false}));
-  EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 1))
+          PayloadFromString(""), false}));
+  EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 1, 0))
       .WillOnce(Return(std::nullopt));
   SetLargestId(track, Location(0, 0));
   EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus()));
diff --git a/quiche/quic/moqt/moqt_trace_recorder.cc b/quiche/quic/moqt/moqt_trace_recorder.cc
index 6bcc537..b8f1273 100644
--- a/quiche/quic/moqt/moqt_trace_recorder.cc
+++ b/quiche/quic/moqt/moqt_trace_recorder.cc
@@ -94,7 +94,7 @@
   std::optional<PublishedObject> object_copy =
       publisher.GetCachedObject(location.group, subgroup, location.object);
   if (object_copy.has_value() && object_copy->metadata.location == location) {
-    object->set_payload_size(object_copy->payload.length());
+    object->set_payload_size(object_copy->metadata.payload_length);
   } else {
     QUICHE_DLOG(WARNING) << "Track " << track_alias << " has marked "
                          << location
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index acecdee..df1188b 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -24,9 +24,7 @@
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
-#include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/simple_buffer_allocator.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
@@ -101,7 +99,6 @@
     std::unique_ptr<MoqtFetchTask> fetch_task) {
   fetch_task_ = std::move(fetch_task);
   fetch_task_->SetObjectAvailableCallback([this]() { FetchObjects(); });
-  FetchObjects();
 }
 
 void SubscribeRemoteTrack::FetchObjects() {
@@ -116,8 +113,27 @@
     PublishedObject object;
     switch (fetch_task_->GetNextObject(object)) {
       case MoqtFetchTask::GetNextObjectResult::kSuccess:
-        visitor_->OnObjectFragment(full_track_name(), object.metadata,
-                                   object.payload.AsStringView(), true);
+        if (object.metadata.payload_length == 0) {
+          QUICHE_DCHECK_EQ(fetch_object_offset_, 0);
+          visitor_->OnObjectFragment(full_track_name(), object.metadata, "", 0);
+          break;
+        }
+        for (size_t i = 0; i < object.payload.size(); ++i) {
+          if (fetch_object_offset_ > 0 && object.payload[i].empty()) {
+            QUICHE_BUG(SubscribeRemoteTrack_empty_payload)
+                << "Empty payload for partial object "
+                << object.metadata.location;
+            continue;
+          }
+          visitor_->OnObjectFragment(full_track_name(), object.metadata,
+                                     object.payload[i].AsStringView(),
+                                     fetch_object_offset_);
+          fetch_object_offset_ += object.payload[i].length();
+          if (fetch_object_offset_ == object.metadata.payload_length) {
+            fetch_object_offset_ = 0;
+            break;
+          }
+        }
         break;
       case MoqtFetchTask::GetNextObjectResult::kError:
       case MoqtFetchTask::GetNextObjectResult::kEof:
@@ -230,29 +246,41 @@
     need_object_available_callback_ = true;
     return kPending;
   }
-  if (!payload_.empty()) {
-    quiche::QuicheMemSlice message_slice(std::move(payload_));
-    output.payload = std::move(message_slice);
+  if (next_object_->payload_length > 0 && payload_.empty()) {
+    return kPending;
+  }
+  while (!payload_.empty()) {
+    payload_offset_ += payload_.front().length();
+    output.payload.push_back(std::move(payload_.front()));
+    payload_.pop_front();
   }
   output.metadata.location =
       Location(next_object_->group_id, next_object_->object_id);
   output.metadata.subgroup = next_object_->subgroup_id;
   output.metadata.status = next_object_->object_status;
   output.metadata.publisher_priority = next_object_->publisher_priority;
+  output.metadata.payload_length = next_object_->payload_length;
   output.fin_after_this = false;
   if (output.metadata.location ==
       largest_location_) {  // This is the last object.
     eof_ = true;
   }
-  next_object_.reset();
+  if (payload_offset_ == next_object_->payload_length) {
+    next_object_.reset();
+    payload_offset_ = 0;
+    payload_length_ = 0;
+  }
   can_read_callback_();
   return kSuccess;
 }
 
 void UpstreamFetch::UpstreamFetchTask::NewObject(const MoqtObject& message) {
   next_object_ = message;
-  payload_ = quiche::QuicheBuffer(quiche::SimpleBufferAllocator::Get(),
-                                  message.payload_length);
+  while (!payload_.empty()) {
+    payload_.pop_front();
+  };
+  payload_offset_ = 0;
+  payload_length_ = 0;
 }
 
 void UpstreamFetch::UpstreamFetchTask::AppendPayloadToObject(
@@ -263,15 +291,11 @@
   QUICHE_BUG_IF(quic_bug_AlreadyGotPayload, next_object_->payload_length == 0)
       << "AppendPayloadToObject called after payload was already full";
   // Copy |payload| to the right spot in the buffer.
-  memcpy(payload_.data() + payload_.size() - next_object_->payload_length,
-         payload.data(), payload.length());
-  next_object_->payload_length -= payload.length();
+  payload_length_ += payload.length();
+  payload_.push_back(quiche::QuicheMemSlice::Copy(payload));
 }
 
 void UpstreamFetch::UpstreamFetchTask::NotifyNewObject() {
-  QUICHE_BUG_IF(quic_bug_NotifyNewObjectCalledEarly,
-                !next_object_.has_value() || next_object_->payload_length > 0)
-      << "NotifyNewObject called without a full object in store";
   if (need_object_available_callback_ && object_available_callback_) {
     need_object_available_callback_ = false;
     object_available_callback_();
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 5075b33..2e6f66d 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -22,8 +22,9 @@
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_session_interface.h"
 #include "quiche/quic/moqt/moqt_types.h"
-#include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_callbacks.h"
+#include "quiche/common/quiche_circular_deque.h"
+#include "quiche/common/quiche_mem_slice.h"
 #include "quiche/common/quiche_weak_ptr.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -165,6 +166,8 @@
   bool dynamic_groups_ = kDefaultDynamicGroups;
   void FetchObjects();
   std::unique_ptr<MoqtFetchTask> fetch_task_;
+  // If nonzero, fetch_task_ is in mid-object.
+  uint64_t fetch_object_offset_ = 0;
 
   std::optional<const uint64_t> track_alias_;
   SubscribeVisitor* visitor_;
@@ -280,7 +283,8 @@
     // MoqtSession calls this for a hint if the object has been read.
     bool HasObject() const { return next_object_.has_value(); }
     bool NeedsMorePayload() const {
-      return next_object_.has_value() && next_object_->payload_length > 0;
+      return next_object_.has_value() &&
+             payload_length_ < next_object_->payload_length;
     }
     // MoqtSession calls NotifyNewObject() after NewObject() because it has to
     // exit the parser loop before the callback possibly causes another read.
@@ -294,6 +298,9 @@
         std::optional<webtransport::StreamErrorCode> error,
         absl::string_view reason_phrase);
 
+    uint64_t payload_offset() const { return payload_offset_; }
+    uint64_t payload_length() const { return payload_length_; }
+
    private:
     Location largest_location_;
     absl::Status status_ = absl::OkStatus();
@@ -303,9 +310,11 @@
     // payload bytes not yet received. The application receives a
     // PublishedObject that is constructed from next_object_ and payload_.
     std::optional<MoqtObject> next_object_;
-    // Store payload separately. Will be converted into QuicheMemSlice only when
-    // complete, since QuicheMemSlice is immutable.
-    quiche::QuicheBuffer payload_;
+    quiche::QuicheCircularDeque<quiche::QuicheMemSlice> payload_;
+    // The starting point of payload_. Data is deleted as it is delivered.
+    uint64_t payload_offset_ = 0;
+    // Total data delivered for this object.
+    uint64_t payload_length_ = 0;
 
     // The task should only call object_available_callback_ when the last result
     // was kPending. Otherwise, there can be recursive loops of
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 134906f..152c95b 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -20,6 +20,7 @@
 #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
 #include "quiche/quic/platform/api/quic_test.h"
 #include "quiche/common/quiche_mem_slice.h"
+#include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
 
@@ -27,6 +28,8 @@
 
 namespace {
 
+using ::testing::_;
+
 class AlarmDelegate : public quic::QuicAlarm::DelegateWithoutContext {
  public:
   AlarmDelegate(bool* fired) : fired_(fired) {}
@@ -80,6 +83,107 @@
   EXPECT_FALSE(track_.InWindow(Location(1, 25)));
 }
 
+TEST_F(SubscribeRemoteTrackTest, JoiningFetchMultiObject) {
+  auto fetch_task = std::make_unique<MockFetchTask>();
+  MockFetchTask* task_ptr = fetch_task.get();
+  track_.OnJoiningFetchReady(std::move(fetch_task));
+
+  PublishedObject o1, o2;
+  o1.metadata.location = Location(2, 0);
+  o1.metadata.payload_length = 3;
+  o1.payload.push_back(quiche::QuicheMemSlice::Copy("abc"));
+
+  o2.metadata.location = Location(2, 1);
+  o2.metadata.payload_length = 3;
+  o2.payload.push_back(quiche::QuicheMemSlice::Copy("def"));
+
+  EXPECT_CALL(visitor_,
+              OnObjectFragment(track_.full_track_name(), _, "abc", 0));
+  EXPECT_CALL(visitor_,
+              OnObjectFragment(track_.full_track_name(), _, "def", 0));
+  EXPECT_CALL(*task_ptr, GetNextObject)
+      .WillOnce([&](PublishedObject& output) {
+        output = std::move(o1);
+        return MoqtFetchTask::GetNextObjectResult::kSuccess;
+      })
+      .WillOnce([&](PublishedObject& output) {
+        output = std::move(o2);
+        return MoqtFetchTask::GetNextObjectResult::kSuccess;
+      })
+      .WillOnce(testing::Return(MoqtFetchTask::GetNextObjectResult::kPending));
+  task_ptr->CallObjectsAvailableCallback();
+  EXPECT_NE(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr);
+  EXPECT_CALL(*task_ptr, GetNextObject)
+      .WillOnce(testing::Return(MoqtFetchTask::GetNextObjectResult::kEof));
+  task_ptr->CallObjectsAvailableCallback();
+  EXPECT_EQ(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr);
+}
+
+TEST_F(SubscribeRemoteTrackTest, JoiningFetchFragmented) {
+  auto fetch_task = std::make_unique<MockFetchTask>();
+  MockFetchTask* task_ptr = fetch_task.get();
+  track_.OnJoiningFetchReady(std::move(fetch_task));
+
+  PublishedObject part1, part2;
+  part1.metadata.location = Location(2, 0);
+  part1.metadata.payload_length = 6;
+  part1.payload.push_back(quiche::QuicheMemSlice::Copy("abc"));
+
+  part2.metadata.location = Location(2, 0);
+  part2.metadata.payload_length = 6;
+  part2.payload.push_back(quiche::QuicheMemSlice::Copy("def"));
+
+  EXPECT_CALL(visitor_,
+              OnObjectFragment(track_.full_track_name(), _, "abc", 0));
+  EXPECT_CALL(visitor_,
+              OnObjectFragment(track_.full_track_name(), _, "def", 3));
+  EXPECT_CALL(*task_ptr, GetNextObject)
+      .WillOnce([&](PublishedObject& output) {
+        output = std::move(part1);
+        return MoqtFetchTask::GetNextObjectResult::kSuccess;
+      })
+      .WillOnce([&](PublishedObject& output) {
+        output = std::move(part2);
+        return MoqtFetchTask::GetNextObjectResult::kSuccess;
+      })
+      .WillOnce(testing::Return(MoqtFetchTask::GetNextObjectResult::kPending));
+  task_ptr->CallObjectsAvailableCallback();
+}
+
+TEST_F(SubscribeRemoteTrackTest, JoiningFetchEmptyPayload) {
+  auto fetch_task = std::make_unique<MockFetchTask>();
+  MockFetchTask* task_ptr = fetch_task.get();
+  track_.OnJoiningFetchReady(std::move(fetch_task));
+
+  PublishedObject o1;
+  o1.metadata.location = Location(2, 0);
+  o1.metadata.payload_length = 0;
+  o1.metadata.status = MoqtObjectStatus::kEndOfGroup;
+
+  // Since object.payload is empty, is called once.
+  EXPECT_CALL(visitor_,
+              OnObjectFragment(track_.full_track_name(), o1.metadata, "", 0));
+  EXPECT_CALL(*task_ptr, GetNextObject)
+      .WillOnce([&](PublishedObject& output) {
+        output = std::move(o1);
+        return MoqtFetchTask::GetNextObjectResult::kSuccess;
+      })
+      .WillOnce(testing::Return(MoqtFetchTask::GetNextObjectResult::kPending));
+  task_ptr->CallObjectsAvailableCallback();
+}
+
+TEST_F(SubscribeRemoteTrackTest, JoiningFetchError) {
+  auto fetch_task = std::make_unique<MockFetchTask>();
+  MockFetchTask* task_ptr = fetch_task.get();
+  track_.OnJoiningFetchReady(std::move(fetch_task));
+
+  EXPECT_NE(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr);
+  EXPECT_CALL(*task_ptr, GetNextObject)
+      .WillOnce(testing::Return(MoqtFetchTask::GetNextObjectResult::kError));
+  task_ptr->CallObjectsAvailableCallback();
+  EXPECT_EQ(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr);
+}
+
 class UpstreamFetchTest : public quic::test::QuicTest {
  protected:
   UpstreamFetchTest()
@@ -163,7 +267,8 @@
               MoqtFetchTask::GetNextObjectResult::kSuccess);
     EXPECT_EQ(object.metadata.location, Location(3, 0));
     EXPECT_EQ(object.metadata.subgroup, 0);
-    EXPECT_EQ(object.payload.AsStringView(), "foobar");
+    EXPECT_EQ(object.payload[0].AsStringView(), "foo");
+    EXPECT_EQ(object.payload[1].AsStringView(), "bar");
   });
   int got_read_callback = 0;
   fetch_.OnStreamOpened([&]() { ++got_read_callback; });
@@ -187,6 +292,71 @@
   EXPECT_TRUE(got_object);
 }
 
+TEST_F(UpstreamFetchTest, ObjectRetrievalEmptyPayload) {
+  fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr);
+  MoqtObject moqt_obj = {1, 3, 0, 128, "", MoqtObjectStatus::kEndOfGroup, 0, 0};
+  fetch_.task()->NewObject(moqt_obj);
+  fetch_.task()->NotifyNewObject();
+  fetch_.OnStreamOpened([]() {});
+
+  PublishedObject output;
+  EXPECT_EQ(fetch_task_->GetNextObject(output),
+            MoqtFetchTask::GetNextObjectResult::kSuccess);
+  EXPECT_TRUE(output.payload.empty());
+  EXPECT_EQ(output.metadata.status, MoqtObjectStatus::kEndOfGroup);
+}
+
+TEST_F(UpstreamFetchTest, GetNextObjectAfterEof) {
+  fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr);
+  fetch_.task()->OnStreamAndFetchClosed(std::nullopt, "");
+
+  PublishedObject object;
+  EXPECT_EQ(fetch_task_->GetNextObject(object),
+            MoqtFetchTask::GetNextObjectResult::kEof);
+  // Subsequent calls should still return EOF.
+  EXPECT_EQ(fetch_task_->GetNextObject(object),
+            MoqtFetchTask::GetNextObjectResult::kEof);
+}
+
+TEST_F(UpstreamFetchTest, GetNextObjectEofAtLargestLocation) {
+  Location largest(3, 50);
+  fetch_.OnFetchResult(largest, absl::OkStatus(), nullptr);
+  fetch_.OnStreamOpened([]() {});
+
+  MoqtObject obj1 = {1, 3, 49, 128, "", MoqtObjectStatus::kNormal, 0, 1};
+  fetch_.task()->NewObject(obj1);
+  fetch_.task()->AppendPayloadToObject("a");
+  fetch_.task()->NotifyNewObject();
+
+  PublishedObject out;
+  EXPECT_EQ(fetch_task_->GetNextObject(out),
+            MoqtFetchTask::GetNextObjectResult::kSuccess);
+  // Not at largest location yet.
+  EXPECT_EQ(fetch_task_->GetNextObject(out),
+            MoqtFetchTask::GetNextObjectResult::kPending);
+
+  MoqtObject obj2 = {1, 3, 50, 128, "", MoqtObjectStatus::kNormal, 0, 1};
+  fetch_.task()->NewObject(obj2);
+  fetch_.task()->AppendPayloadToObject("b");
+  fetch_.task()->NotifyNewObject();
+
+  EXPECT_EQ(fetch_task_->GetNextObject(out),
+            MoqtFetchTask::GetNextObjectResult::kSuccess);
+  // Reached largest location. EOF should be set.
+  EXPECT_EQ(fetch_task_->GetNextObject(out),
+            MoqtFetchTask::GetNextObjectResult::kEof);
+}
+
+TEST_F(UpstreamFetchTest, CloseWithError) {
+  fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr);
+  fetch_.task()->OnStreamAndFetchClosed(
+      static_cast<webtransport::StreamErrorCode>(0x123), "reason");
+  PublishedObject out;
+  EXPECT_EQ(fetch_task_->GetNextObject(out),
+            MoqtFetchTask::GetNextObjectResult::kError);
+  EXPECT_FALSE(fetch_task_->GetStatus().ok());
+}
+
 TEST_F(UpstreamFetchTest, LocationIsValidOkFirstObjectIdDeclining) {
   fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr);
   EXPECT_TRUE(
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
index de9654a..4b87f5e 100644
--- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -6,6 +6,7 @@
 #define QUICHE_QUIC_MOQT_TOOLS_MOQT_MOCK_VISITOR_H_
 
 #include <cstdint>
+#include <map>
 #include <memory>
 #include <optional>
 #include <utility>
@@ -77,7 +78,8 @@
   const FullTrackName& GetTrackName() const override { return track_name_; }
 
   MOCK_METHOD(std::optional<PublishedObject>, GetCachedObject,
-              (uint64_t, std::optional<uint64_t>, uint64_t), (const, override));
+              (uint64_t, std::optional<uint64_t>, uint64_t, uint64_t),
+              (const, override));
   MOCK_METHOD(void, AddObjectListener, (MoqtObjectListener * listener),
               (override));
   MOCK_METHOD(void, RemoveObjectListener, (MoqtObjectListener * listener),
@@ -105,14 +107,14 @@
       : track_name_(std::move(name)) {}
   const FullTrackName& GetTrackName() const override { return track_name_; }
   std::optional<PublishedObject> GetCachedObject(
-      uint64_t group, std::optional<uint64_t> subgroup,
-      uint64_t object) const override {
+      uint64_t group, std::optional<uint64_t> subgroup, uint64_t object,
+      uint64_t /*offset*/) const override {
     Location location(group, object);
     auto it = objects_.find(location);
     if (it == objects_.end()) {
       return std::nullopt;
     }
-    return CachedObjectToPublishedObject(it->second);
+    return it->second.ToPublishedObject();
   }
   void AddObjectListener(MoqtObjectListener* listener) override {
     listeners_.insert(listener);
@@ -146,16 +148,20 @@
   }
   void AddObject(Location location, uint64_t subgroup,
                  absl::string_view payload, bool fin) {
-    CachedObject object;
-    object.metadata.location = location;
-    object.metadata.subgroup = subgroup;
-    object.metadata.extensions = "";
-    object.metadata.status = MoqtObjectStatus::kNormal;
-    object.metadata.publisher_priority = 128;
-    object.payload = std::make_shared<quiche::QuicheMemSlice>(
-        quiche::QuicheMemSlice::Copy(payload));
-    object.fin_after_this = fin;
-    objects_[location] = std::move(object);
+    PublishedObjectMetadata metadata;
+    metadata.location = location;
+    metadata.subgroup = subgroup;
+    metadata.extensions = "";
+    metadata.status = MoqtObjectStatus::kNormal;
+    metadata.publisher_priority = 128;
+    metadata.payload_length = payload.length();
+    auto it = objects_.find(location);
+    if (it != objects_.end()) {
+      it->second.Append(it->second.payload_received(), payload);
+    } else {
+      objects_.try_emplace(location, metadata,
+                           quiche::QuicheMemSlice::Copy(payload), fin);
+    }
     if (!largest_location_.has_value() || *largest_location_ < location) {
       largest_location_ = location;
     }
@@ -172,7 +178,7 @@
  private:
   FullTrackName track_name_;
   absl::flat_hash_set<MoqtObjectListener*> listeners_;
-  absl::flat_hash_map<Location, CachedObject> objects_;
+  std::map<Location, CachedObject> objects_;
   std::optional<Location> largest_location_;
   TrackExtensions extensions_;
 };
@@ -189,7 +195,7 @@
   MOCK_METHOD(void, OnObjectFragment,
               (const FullTrackName& full_track_name,
                const PublishedObjectMetadata& metadata,
-               absl::string_view object, bool end_of_message),
+               absl::string_view object, uint64_t offset),
               (override));
   MOCK_METHOD(void, OnPublishDone, (FullTrackName full_track_name), (override));
   MOCK_METHOD(void, OnMalformedTrack, (const FullTrackName& full_track_name),
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.cc b/quiche/quic/moqt/test_tools/moqt_simulator.cc
index 2ed2240..5090baa 100644
--- a/quiche/quic/moqt/test_tools/moqt_simulator.cc
+++ b/quiche/quic/moqt/test_tools/moqt_simulator.cc
@@ -30,13 +30,15 @@
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_bitrate_adjuster.h"
+#include "quiche/quic/moqt/moqt_error.h"
 #include "quiche/quic/moqt/moqt_known_track_publisher.h"
-#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_names.h"
 #include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_outgoing_queue.h"
 #include "quiche/quic/moqt/moqt_session.h"
 #include "quiche/quic/moqt/moqt_session_interface.h"
 #include "quiche/quic/moqt/moqt_trace_recorder.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
 #include "quiche/quic/test_tools/simulator/actor.h"
 #include "quiche/quic/test_tools/simulator/link.h"
@@ -206,13 +208,14 @@
 void ObjectReceiver::OnObjectFragment(const FullTrackName& full_track_name,
                                       const PublishedObjectMetadata& metadata,
                                       absl::string_view object,
-                                      bool end_of_message) {
+                                      uint64_t offset) {
   QUICHE_DCHECK(full_track_name == TrackName());
   if (metadata.status != MoqtObjectStatus::kNormal) {
-    QUICHE_DCHECK(end_of_message);
+    QUICHE_DCHECK(object.empty() && metadata.payload_length == 0 &&
+                  offset == 0);
     return;
   }
-  if (!end_of_message) {
+  if (metadata.payload_length != object.length() || offset != 0) {
     QUICHE_LOG(DFATAL) << "Partial receiving of objects wasn't enabled";
     return;
   }
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.h b/quiche/quic/moqt/test_tools/moqt_simulator.h
index 8a2e314..bf3b51f 100644
--- a/quiche/quic/moqt/test_tools/moqt_simulator.h
+++ b/quiche/quic/moqt/test_tools/moqt_simulator.h
@@ -165,7 +165,7 @@
 
   void OnObjectFragment(const FullTrackName& full_track_name,
                         const PublishedObjectMetadata& metadata,
-                        absl::string_view object, bool end_of_message) override;
+                        absl::string_view object, uint64_t offset) override;
 
   void OnPublishDone(FullTrackName /*full_track_name*/) override {}
   void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {}
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index c408333..1c4882e 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -7,6 +7,7 @@
 #include <poll.h>
 #include <unistd.h>
 
+#include <cstdint>
 #include <iostream>
 #include <memory>
 #include <optional>
@@ -221,9 +222,10 @@
 }
 
 void ChatClient::RemoteTrackVisitor::OnObjectFragment(
-    const FullTrackName& full_track_name, const PublishedObjectMetadata&,
-    absl::string_view object, bool end_of_message) {
-  if (!end_of_message) {
+    const FullTrackName& full_track_name,
+    const PublishedObjectMetadata& metadata, absl::string_view object,
+    uint64_t offset) {
+  if (offset != 0 || object.length() != metadata.payload_length) {
     std::cerr << "Error: received partial message despite requesting "
                  "buffering\n";
   }
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h
index 516ea07..6e229d5 100644
--- a/quiche/quic/moqt/tools/chat_client.h
+++ b/quiche/quic/moqt/tools/chat_client.h
@@ -5,6 +5,7 @@
 #ifndef QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
 #define QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
 
+#include <cstdint>
 #include <memory>
 #include <optional>
 #include <variant>
@@ -111,8 +112,7 @@
 
     void OnObjectFragment(const moqt::FullTrackName& full_track_name,
                           const PublishedObjectMetadata& metadata,
-                          absl::string_view object,
-                          bool end_of_message) override;
+                          absl::string_view object, uint64_t offset) override;
 
     void OnPublishDone(FullTrackName) override {}
 
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
index b3fd295..74fa035 100644
--- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -201,8 +201,8 @@
 
     void OnObjectFragment(const FullTrackName& full_track_name,
                           const PublishedObjectMetadata& metadata,
-                          absl::string_view object,
-                          bool /*end_of_message*/) override {
+                          absl::string_view object, uint64_t offset) override {
+      QUICHE_CHECK(offset == 0 && metadata.payload_length == object.length());
       std::string file_name =
           absl::StrCat(metadata.location.group, "-", metadata.location.object,
                        ".", full_track_name.track_namespace().tuple().back());
diff --git a/quiche/quic/moqt/tools/moqt_relay.cc b/quiche/quic/moqt/tools/moqt_relay.cc
index 74e35fa..6d16802 100644
--- a/quiche/quic/moqt/tools/moqt_relay.cc
+++ b/quiche/quic/moqt/tools/moqt_relay.cc
@@ -52,9 +52,11 @@
       // TODO(martinduke): Extend MoqtServer so that partial objects can be
       // received.
       server_(std::make_unique<MoqtServer>(
-          std::move(proof_source), [this](absl::string_view path) {
+          std::move(proof_source),
+          [this](absl::string_view path) {
             return IncomingSessionHandler(path);
-          })) {
+          },
+          MoqtSessionParameters(/*deliver_partial_objects=*/true))) {
   quiche::QuicheIpAddress bind_ip_address;
   QUICHE_CHECK(bind_ip_address.FromString(bind_address));
   // CreateUDPSocketAndListen() creates the event loop that we will pass to
@@ -87,8 +89,9 @@
   } else {
     verifier = quic::CreateDefaultProofVerifier(server_id.host());
   }
-  return std::make_unique<moqt::MoqtClient>(peer_address, server_id,
-                                            std::move(verifier), event_loop);
+  MoqtSessionParameters parameters(/*deliver_partial_objects=*/true);
+  return std::make_unique<moqt::MoqtClient>(
+      peer_address, server_id, std::move(verifier), event_loop, parameters);
 }
 
 MoqtSessionCallbacks MoqtRelay::CreateClientCallbacks() {