MoQT Upstream FETCH data streams.

Also, added track_alias() query to Parser API.

PiperOrigin-RevId: 710104119
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 1b4de80..b05fdcc 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -202,6 +202,13 @@
   // Returns the type of the unidirectional stream, if already known.
   std::optional<MoqtDataStreamType> stream_type() const { return type_; }
 
+  // Returns the track alias, if already known.
+  std::optional<uint64_t> track_alias() const {
+    return (next_input_ == kStreamType || next_input_ == kTrackAlias)
+               ? std::optional<uint64_t>()
+               : metadata_.track_alias;
+  }
+
  private:
   friend class test::MoqtDataParserPeer;
 
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
index 010db7f..0f378ed 100644
--- a/quiche/quic/moqt/moqt_publisher.h
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -77,7 +77,9 @@
 
   // Sets the callback that is called when GetNextObject() has previously
   // returned kPending, but now a new object (or potentially an error or an
-  // end-of-fetch) is available.
+  // end-of-fetch) is available. The application is responsible for calling
+  // GetNextObject() until it gets kPending; no further callback will occur
+  // until then.
   virtual void SetObjectAvailableCallback(
       ObjectsAvailableCallback callback) = 0;
 
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 8496188..a7473b1 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -36,6 +36,7 @@
 #include "quiche/quic/platform/api/quic_logging.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_stream.h"
 #include "quiche/common/simple_buffer_allocator.h"
@@ -959,7 +960,6 @@
     subscribe->visitor()->OnReply(track->full_track_name(), message.largest_id,
                                   std::nullopt);
   }
-  subscribe->OnObjectOrOk();
 }
 
 void MoqtSession::ControlStream::OnSubscribeErrorMessage(
@@ -1385,19 +1385,114 @@
     return;
   }
   track->OnObjectOrOk();
-  SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track);
-  if (subscribe->visitor() != nullptr) {
-    subscribe->visitor()->OnObjectFragment(
-        track->full_track_name(),
-        FullSequence{message.group_id, message.subgroup_id.value_or(0),
-                     message.object_id},
-        message.publisher_priority, message.object_status, payload,
-        end_of_message);
+  if (!track->is_fetch()) {
+    SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track);
+    if (subscribe->visitor() != nullptr) {
+      subscribe->visitor()->OnObjectFragment(
+          track->full_track_name(),
+          FullSequence{message.group_id, message.subgroup_id.value_or(0),
+                       message.object_id},
+          message.publisher_priority, message.object_status, payload,
+          end_of_message);
+    }
+  } else {  // FETCH
+    UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track);
+    UpstreamFetch::UpstreamFetchTask* task = fetch->task();
+    if (task == nullptr) {
+      // The application killed the FETCH.
+      stream_->SendStopSending(kResetCodeSubscriptionGone);
+      return;
+    }
+    if (!task->HasObject()) {
+      task->NewObject(message);
+    }
+    if (task->NeedsMorePayload() && !payload.empty()) {
+      task->AppendPayloadToObject(payload);
+    }
   }
   partial_object_.clear();
 }
 
-void MoqtSession::IncomingDataStream::OnCanRead() { parser_.ReadAllData(); }
+MoqtSession::IncomingDataStream::~IncomingDataStream() {
+  if (parser_.track_alias().has_value() &&
+      parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch) {
+    RemoteTrack* track = session_->RemoteTrackById(*parser_.track_alias());
+    if (track != nullptr && track->is_fetch()) {
+      session_->upstream_by_id_.erase(*parser_.track_alias());
+    }
+  }
+}
+
+void MoqtSession::IncomingDataStream::MaybeReadOneObject() {
+  if (!parser_.track_alias().has_value() ||
+      parser_.stream_type() != MoqtDataStreamType::kStreamHeaderFetch) {
+    QUICHE_BUG(quic_bug_read_one_object_parser_unexpected_state)
+        << "Requesting object, parser in unexpected state";
+  }
+  RemoteTrack* track = session_->RemoteTrackById(*parser_.track_alias());
+  if (track == nullptr || !track->is_fetch()) {
+    QUICHE_BUG(quic_bug_read_one_object_track_unexpected_state)
+        << "Requesting object, track in unexpected state";
+    return;
+  }
+  UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track);
+  UpstreamFetch::UpstreamFetchTask* task = fetch->task();
+  if (task == nullptr) {
+    return;
+  }
+  if (task->HasObject() && !task->NeedsMorePayload()) {
+    return;
+  }
+  parser_.ReadAtMostOneObject();
+  // If it read an object, it called OnObjectMessage and may have altered the
+  // task's object state.
+  if (task->HasObject() && !task->NeedsMorePayload()) {
+    task->NotifyNewObject();
+  }
+}
+
+void MoqtSession::IncomingDataStream::OnCanRead() {
+  if (!parser_.stream_type().has_value()) {
+    parser_.ReadStreamType();
+    if (!parser_.stream_type().has_value()) {
+      return;
+    }
+  }
+  if (parser_.stream_type() != MoqtDataStreamType::kStreamHeaderFetch) {
+    parser_.ReadAllData();
+    return;
+  }
+  bool learned_track_alias = false;
+  if (!parser_.track_alias().has_value()) {
+    learned_track_alias = true;
+    parser_.ReadTrackAlias();
+    if (!parser_.track_alias().has_value()) {
+      return;
+    }
+  }
+  auto it = session_->upstream_by_id_.find(*parser_.track_alias());
+  if (it == session_->upstream_by_id_.end()) {
+    QUIC_DLOG(INFO) << ENDPOINT << "Received object for a track with no FETCH";
+    // This is a not a session error because there might be an UNSUBSCRIBE in
+    // flight.
+    stream_->SendStopSending(kResetCodeSubscriptionGone);
+    return;
+  }
+  if (it->second == nullptr) {
+    QUICHE_BUG(quiche_bug_moqt_fetch_pointer_is_null)
+        << "Fetch pointer is null";
+    return;
+  }
+  UpstreamFetch* fetch = static_cast<UpstreamFetch*>(it->second.get());
+  if (learned_track_alias) {
+    // If the task already exists (FETCH_OK has arrived), the callback will
+    // immediately execute to read the first object. Otherwise, it will only
+    // execute when the task is created or a cached object is read.
+    fetch->OnStreamOpened([this]() { MaybeReadOneObject(); });
+    return;
+  }
+  MaybeReadOneObject();
+}
 
 void MoqtSession::IncomingDataStream::OnControlMessageReceived() {
   session_->Error(MoqtError::kProtocolViolation,
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index adf4b7e..18c02e2 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -314,6 +314,7 @@
    public:
     IncomingDataStream(MoqtSession* session, webtransport::Stream* stream)
         : session_(session), stream_(stream), parser_(stream, this) {}
+    ~IncomingDataStream();
 
     // webtransport::StreamVisitor implementation.
     void OnCanRead() override;
@@ -335,6 +336,8 @@
 
     webtransport::Stream* stream() const { return stream_; }
 
+    void MaybeReadOneObject();
+
    private:
     friend class test::MoqtSessionPeer;
     void OnControlMessageReceived();
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 172a152..ae2a3ca 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -8,6 +8,7 @@
 #include <cstring>
 #include <memory>
 #include <optional>
+#include <queue>
 #include <string>
 #include <utility>
 #include <vector>
@@ -19,17 +20,22 @@
 #include "quiche/quic/core/quic_data_reader.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_framer.h"
 #include "quiche/quic/moqt/moqt_known_track_publisher.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_parser.h"
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/test_tools/moqt_framer_utils.h"
 #include "quiche/quic/moqt/test_tools/moqt_session_peer.h"
 #include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
 #include "quiche/quic/platform/api/quic_test.h"
 #include "quiche/quic/test_tools/quic_test_utils.h"
+#include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_stream.h"
+#include "quiche/common/simple_buffer_allocator.h"
+#include "quiche/web_transport/test_tools/in_memory_stream.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -2256,6 +2262,218 @@
   EXPECT_EQ(fetch_task->GetStatus().message(), "No username provided");
 }
 
+// The application takes objects as they arrive.
+TEST_F(MoqtSessionTest, IncomingFetchObjectsGreedyApp) {
+  webtransport::test::MockStream mock_stream;
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  std::unique_ptr<MoqtFetchTask> fetch_task;
+  uint64_t expected_object_id = 0;
+  session_.Fetch(
+      FullTrackName("foo", "bar"),
+      [&](std::unique_ptr<MoqtFetchTask> task) {
+        fetch_task = std::move(task);
+        fetch_task->SetObjectAvailableCallback([&]() {
+          PublishedObject object;
+          MoqtFetchTask::GetNextObjectResult result;
+          do {
+            result = fetch_task->GetNextObject(object);
+            if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) {
+              EXPECT_EQ(object.sequence.object, expected_object_id);
+              ++expected_object_id;
+            }
+          } while (result != MoqtFetchTask::GetNextObjectResult::kPending);
+        });
+      },
+      FullSequence(0, 0), 4, std::nullopt, 128, std::nullopt,
+      MoqtSubscribeParameters());
+  // Build queue of packets to arrive.
+  std::queue<quiche::QuicheBuffer> headers;
+  std::queue<std::string> payloads;
+  MoqtObject object = {
+      /*subscribe_id=*/0,
+      /*group_id, object_id=*/0,
+      0,
+      /*publisher_priority=*/128,
+      /*status=*/MoqtObjectStatus::kNormal,
+      /*subgroup=*/0,
+      /*payload_length=*/3,
+  };
+  MoqtFramer framer_(quiche::SimpleBufferAllocator::Get(), true);
+  for (int i = 0; i < 4; ++i) {
+    object.object_id = i;
+    headers.push(framer_.SerializeObjectHeader(
+        object, MoqtDataStreamType::kStreamHeaderFetch, i == 0));
+    payloads.push("foo");
+  }
+
+  // Open stream, deliver two objects before FETCH_OK. Neither should be read.
+  webtransport::test::InMemoryStream data_stream(kIncomingUniStreamId);
+  data_stream.SetVisitor(
+      MoqtSessionPeer::CreateIncomingStreamVisitor(&session_, &data_stream));
+  for (int i = 0; i < 2; ++i) {
+    data_stream.Receive(headers.front().AsStringView(), false);
+    data_stream.Receive(payloads.front(), false);
+    headers.pop();
+    payloads.pop();
+  }
+  EXPECT_EQ(fetch_task, nullptr);
+  EXPECT_GT(data_stream.ReadableBytes(), 0);
+
+  // FETCH_OK arrives, objects are delivered.
+  MoqtFetchOk ok = {
+      /*subscribe_id=*/0,
+      /*group_order=*/MoqtDeliveryOrder::kAscending,
+      /*largest_id=*/FullSequence(3, 25),
+      MoqtSubscribeParameters(),
+  };
+  stream_input->OnFetchOkMessage(ok);
+  ASSERT_NE(fetch_task, nullptr);
+  EXPECT_EQ(expected_object_id, 2);
+
+  // Deliver the rest of the objects.
+  for (int i = 2; i < 4; ++i) {
+    data_stream.Receive(headers.front().AsStringView(), false);
+    data_stream.Receive(payloads.front(), false);
+    headers.pop();
+    payloads.pop();
+  }
+  EXPECT_EQ(expected_object_id, 4);
+}
+
+TEST_F(MoqtSessionTest, IncomingFetchObjectsSlowApp) {
+  webtransport::test::MockStream mock_stream;
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  std::unique_ptr<MoqtFetchTask> fetch_task;
+  uint64_t expected_object_id = 0;
+  bool objects_available = false;
+  session_.Fetch(
+      FullTrackName("foo", "bar"),
+      [&](std::unique_ptr<MoqtFetchTask> task) {
+        fetch_task = std::move(task);
+        fetch_task->SetObjectAvailableCallback(
+            [&]() { objects_available = true; });
+      },
+      FullSequence(0, 0), 4, std::nullopt, 128, std::nullopt,
+      MoqtSubscribeParameters());
+  // Build queue of packets to arrive.
+  std::queue<quiche::QuicheBuffer> headers;
+  std::queue<std::string> payloads;
+  MoqtObject object = {
+      /*subscribe_id=*/0,
+      /*group_id, object_id=*/0,
+      0,
+      /*publisher_priority=*/128,
+      /*status=*/MoqtObjectStatus::kNormal,
+      /*subgroup=*/0,
+      /*payload_length=*/3,
+  };
+  MoqtFramer framer_(quiche::SimpleBufferAllocator::Get(), true);
+  for (int i = 0; i < 4; ++i) {
+    object.object_id = i;
+    headers.push(framer_.SerializeObjectHeader(
+        object, MoqtDataStreamType::kStreamHeaderFetch, i == 0));
+    payloads.push("foo");
+  }
+
+  // Open stream, deliver two objects before FETCH_OK. Neither should be read.
+  webtransport::test::InMemoryStream data_stream(kIncomingUniStreamId);
+  data_stream.SetVisitor(
+      MoqtSessionPeer::CreateIncomingStreamVisitor(&session_, &data_stream));
+  for (int i = 0; i < 2; ++i) {
+    data_stream.Receive(headers.front().AsStringView(), false);
+    data_stream.Receive(payloads.front(), false);
+    headers.pop();
+    payloads.pop();
+  }
+  EXPECT_EQ(fetch_task, nullptr);
+  EXPECT_GT(data_stream.ReadableBytes(), 0);
+
+  // FETCH_OK arrives, objects are available.
+  MoqtFetchOk ok = {
+      /*subscribe_id=*/0,
+      /*group_order=*/MoqtDeliveryOrder::kAscending,
+      /*largest_id=*/FullSequence(3, 25),
+      MoqtSubscribeParameters(),
+  };
+  stream_input->OnFetchOkMessage(ok);
+  ASSERT_NE(fetch_task, nullptr);
+  EXPECT_TRUE(objects_available);
+
+  // Get the objects
+  MoqtFetchTask::GetNextObjectResult result;
+  do {
+    PublishedObject new_object;
+    result = fetch_task->GetNextObject(new_object);
+    if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) {
+      EXPECT_EQ(new_object.sequence.object, expected_object_id);
+      ++expected_object_id;
+    }
+  } while (result != MoqtFetchTask::GetNextObjectResult::kPending);
+  EXPECT_EQ(expected_object_id, 2);
+  objects_available = false;
+
+  // Deliver the rest of the objects.
+  for (int i = 2; i < 4; ++i) {
+    data_stream.Receive(headers.front().AsStringView(), false);
+    data_stream.Receive(payloads.front(), false);
+    headers.pop();
+    payloads.pop();
+  }
+  EXPECT_TRUE(objects_available);
+  EXPECT_EQ(expected_object_id, 2);  // Not delivered yet.
+  // Get the objects
+  do {
+    PublishedObject new_object;
+    result = fetch_task->GetNextObject(new_object);
+    if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) {
+      EXPECT_EQ(new_object.sequence.object, expected_object_id);
+      ++expected_object_id;
+    }
+  } while (result != MoqtFetchTask::GetNextObjectResult::kPending);
+  EXPECT_EQ(expected_object_id, 4);
+}
+
+TEST_F(MoqtSessionTest, PartialObjectFetch) {
+  MoqtSessionParameters parameters(quic::Perspective::IS_CLIENT);
+  parameters.deliver_partial_objects = true;
+  MoqtSession session(&mock_session_, parameters,
+                      session_callbacks_.AsSessionCallbacks());
+  webtransport::test::InMemoryStream stream(kIncomingUniStreamId);
+  std::unique_ptr<MoqtFetchTask> fetch_task =
+      MoqtSessionPeer::CreateUpstreamFetch(&session, &stream);
+  UpstreamFetch::UpstreamFetchTask* task =
+      static_cast<UpstreamFetch::UpstreamFetchTask*>(fetch_task.get());
+  ASSERT_NE(task, nullptr);
+  EXPECT_FALSE(task->HasObject());
+  bool object_ready = false;
+  task->SetObjectAvailableCallback([&]() { object_ready = true; });
+  MoqtObject object = {
+      /*subscribe_id=*/0,
+      /*group_id, object_id=*/0,
+      0,
+      /*publisher_priority=*/128,
+      /*status=*/MoqtObjectStatus::kNormal,
+      /*subgroup=*/0,
+      /*payload_length=*/6,
+  };
+  MoqtFramer framer_(quiche::SimpleBufferAllocator::Get(), true);
+  quiche::QuicheBuffer header = framer_.SerializeObjectHeader(
+      object, MoqtDataStreamType::kStreamHeaderFetch, true);
+  stream.Receive(header.AsStringView(), false);
+  EXPECT_FALSE(task->HasObject());
+  EXPECT_FALSE(object_ready);
+  stream.Receive("foo", false);
+  EXPECT_TRUE(task->HasObject());
+  EXPECT_TRUE(task->NeedsMorePayload());
+  EXPECT_FALSE(object_ready);
+  stream.Receive("bar", false);
+  EXPECT_TRUE(object_ready);
+  EXPECT_TRUE(task->HasObject());
+  EXPECT_FALSE(task->NeedsMorePayload());
+}
+
 // TODO: re-enable this test once this behavior is re-implemented.
 #if 0
 TEST_F(MoqtSessionTest, SubscribeUpdateClosesSubscription) {
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index 31980d1..d8e31a3 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -4,6 +4,7 @@
 
 #include "quiche/quic/moqt/moqt_track.h"
 
+#include <cstring>
 #include <memory>
 #include <optional>
 #include <utility>
@@ -12,7 +13,10 @@
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
-#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/simple_buffer_allocator.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
@@ -39,6 +43,18 @@
                                                   std::move(callback));
   task_ = task->weak_ptr();
   std::move(ok_callback_)(std::move(task));
+  if (can_read_callback_) {
+    task_.GetIfAvailable()->set_can_read_callback(
+        std::move(can_read_callback_));
+  }
+}
+
+void UpstreamFetch::OnStreamOpened(CanReadCallback can_read_callback) {
+  if (task_.IsValid()) {
+    task_.GetIfAvailable()->set_can_read_callback(std::move(can_read_callback));
+  } else {
+    can_read_callback_ = std::move(can_read_callback);
+  }
 }
 
 UpstreamFetch::UpstreamFetchTask::~UpstreamFetchTask() {
@@ -56,17 +72,47 @@
     if (eof_) {
       return kEof;
     }
+    need_object_available_callback_ = true;
     return kPending;
   }
-  output = *std::move(next_object_);
+  quiche::QuicheMemSlice message_slice(std::move(payload_));
+  output.sequence = FullSequence(next_object_->group_id,
+                                 next_object_->subgroup_id.value_or(0),
+                                 next_object_->object_id);
+  output.status = next_object_->object_status;
+  output.publisher_priority = next_object_->publisher_priority;
+  output.payload = std::move(message_slice);
+  output.fin_after_this = false;
   next_object_.reset();
+  can_read_callback_();
   return kSuccess;
 }
 
-void UpstreamFetch::UpstreamFetchTask::NewObject(PublishedObject& object) {
-  QUICHE_DCHECK(!next_object_.has_value());
-  next_object_ = std::move(object);
-  if (object_available_callback_) {
+void UpstreamFetch::UpstreamFetchTask::NewObject(const MoqtObject& message) {
+  next_object_ = message;
+  payload_ = quiche::QuicheBuffer(quiche::SimpleBufferAllocator::Get(),
+                                  message.payload_length);
+}
+
+void UpstreamFetch::UpstreamFetchTask::AppendPayloadToObject(
+    absl::string_view payload) {
+  QUICHE_BUG_IF(quic_bug_AppendPayloadToObjectCalledEarly,
+                !next_object_.has_value())
+      << "AppendPayloadToObject called without an object";
+  QUICHE_BUG_IF(quic_bug_AlreadyGotPayload, next_object_->payload_length == 0)
+      << "AppendPayloadToObject called after payload was already full";
+  // Copy |payload| to the right spot in the buffer.
+  memcpy(payload_.data() + payload_.size() - next_object_->payload_length,
+         payload.data(), payload.length());
+  next_object_->payload_length -= payload.length();
+}
+
+void UpstreamFetch::UpstreamFetchTask::NotifyNewObject() {
+  QUICHE_BUG_IF(quic_bug_NotifyNewObjectCalledEarly,
+                !next_object_.has_value() || next_object_->payload_length > 0)
+      << "NotifyNewObject called without a full object in store";
+  if (need_object_available_callback_ && object_available_callback_) {
+    need_object_available_callback_ = false;
     object_available_callback_();
   }
 }
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index fe158e1..6c8bab3 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -17,6 +17,7 @@
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_callbacks.h"
 #include "quiche/common/quiche_weak_ptr.h"
 #include "quiche/web_transport/web_transport.h"
@@ -195,13 +196,27 @@
       return weak_ptr_factory_.Create();
     }
 
-    // Manage the relationship with the data stream.
-    void OnStreamOpened(CanReadCallback callback) {
+    // MoqtSession should not use this function; use
+    // UpstreamFetch::OnStreamOpened() instead, in case the task does not exist
+    // yet.
+    void set_can_read_callback(CanReadCallback callback) {
       can_read_callback_ = std::move(callback);
+      can_read_callback_();  // Accept the first object.
     }
 
     // Called when the data stream receives a new object.
-    void NewObject(PublishedObject& object);
+    void NewObject(const MoqtObject& message);
+    void AppendPayloadToObject(absl::string_view payload);
+    // MoqtSession calls this for a hint if the object has been read.
+    bool HasObject() const { return next_object_.has_value(); }
+    bool NeedsMorePayload() const {
+      return next_object_.has_value() && next_object_->payload_length > 0;
+    }
+    // MoqtSession calls NotifyNewObject() after NewObject() because it has to
+    // exit the parser loop before the callback possibly causes another read.
+    // Furthermore, NewObject() may be a partial object, and so
+    // NotifyNewObject() is called only when the object is complete.
+    void NotifyNewObject();
 
     // Deletes callbacks to session or stream, updates the status. If |error|
     // has no value, will append an EOF to the object stream.
@@ -214,10 +229,22 @@
     absl::Status status_;
     TaskDestroyedCallback task_destroyed_callback_;
 
-    // Object delivery state.
-    std::optional<PublishedObject> next_object_;
+    // Object delivery state. The payload_length member is used to track the
+    // payload bytes not yet received. The application receives a
+    // PublishedObject that is constructed from next_object_ and payload_.
+    std::optional<MoqtObject> next_object_;
+    // Store payload separately. Will be converted into QuicheMemSlice only when
+    // complete, since QuicheMemSlice is immutable.
+    quiche::QuicheBuffer payload_;
+
+    // The task should only call object_available_callback_ when the last result
+    // was kPending. Otherwise, there can be recursive loops of
+    // GetNextObjectResult().
+    bool need_object_available_callback_ = true;
     bool eof_ = false;  // The next object is EOF.
+    // The Fetch task signals the application when it has new objects.
     ObjectsAvailableCallback object_available_callback_;
+    // The Fetch task signals the stream when it has dispensed of an object.
     CanReadCallback can_read_callback_;
 
     // Must be last.
@@ -230,9 +257,16 @@
 
   UpstreamFetchTask* task() { return task_.GetIfAvailable(); }
 
+  // Manage the relationship with the data stream.
+  void OnStreamOpened(CanReadCallback callback);
+
  private:
   quiche::QuicheWeakPtr<UpstreamFetchTask> task_;
 
+  // Before FetchTask is created, an incoming stream will register the callback
+  // here instead.
+  CanReadCallback can_read_callback_;
+
   // Initial values from Fetch() call.
   FetchResponseCallback ok_callback_;  // Will be destroyed on FETCH_OK.
 };
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 8072535..ecf1ed3 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -149,15 +149,35 @@
   PublishedObject object;
   EXPECT_EQ(fetch_task_->GetNextObject(object),
             MoqtFetchTask::GetNextObjectResult::kPending);
-  PublishedObject new_object(FullSequence(3, 0),
-                             MoqtObjectStatus::kGroupDoesNotExist, 128,
-                             quiche::QuicheMemSlice(), false);
+  MoqtObject new_object = {1, 3, 0, 128, MoqtObjectStatus::kNormal, 0, 6};
+  bool got_object = false;
   fetch_task_->SetObjectAvailableCallback([&]() {
+    got_object = true;
     EXPECT_EQ(fetch_task_->GetNextObject(object),
               MoqtFetchTask::GetNextObjectResult::kSuccess);
-    EXPECT_EQ(object.sequence, new_object.sequence);
+    EXPECT_EQ(object.sequence, FullSequence(3, 0, 0));
+    EXPECT_EQ(object.payload.AsStringView(), "foobar");
   });
+  int got_read_callback = 0;
+  fetch_.OnStreamOpened([&]() { ++got_read_callback; });
+  EXPECT_FALSE(fetch_.task()->HasObject());
+  EXPECT_FALSE(fetch_.task()->NeedsMorePayload());
   fetch_.task()->NewObject(new_object);
+  EXPECT_TRUE(fetch_.task()->HasObject());
+  EXPECT_TRUE(fetch_.task()->NeedsMorePayload());
+  fetch_.task()->AppendPayloadToObject("foo");
+  EXPECT_TRUE(fetch_.task()->HasObject());
+  EXPECT_TRUE(fetch_.task()->NeedsMorePayload());
+  fetch_.task()->AppendPayloadToObject("bar");
+  EXPECT_TRUE(fetch_.task()->HasObject());
+  EXPECT_FALSE(fetch_.task()->NeedsMorePayload());
+  EXPECT_FALSE(got_object);
+  EXPECT_EQ(got_read_callback, 1);  // Call from OnStreamOpened().
+  fetch_.task()->NotifyNewObject();
+  EXPECT_FALSE(fetch_.task()->HasObject());
+  EXPECT_FALSE(fetch_.task()->NeedsMorePayload());
+  EXPECT_EQ(got_read_callback, 2);  // Call from GetNextObjectResult().
+  EXPECT_TRUE(got_object);
 }
 
 }  // namespace test
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index e867376..dc94e7b 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -10,6 +10,8 @@
 #include <optional>
 #include <utility>
 
+
+#include "absl/status/status.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_parser.h"
 #include "quiche/quic/moqt/moqt_priority.h"
@@ -57,6 +59,14 @@
     return new_stream;
   }
 
+  static std::unique_ptr<webtransport::StreamVisitor>
+  CreateIncomingStreamVisitor(MoqtSession* session,
+                              webtransport::Stream* stream) {
+    auto new_stream =
+        std::make_unique<MoqtSession::IncomingDataStream>(session, stream);
+    return new_stream;
+  }
+
   // In the test OnSessionReady, the session creates a stream and then passes
   // its unique_ptr to the mock webtransport stream. This function casts
   // that unique_ptr into a MoqtSession::Stream*, which is a private class of
@@ -156,6 +166,43 @@
                                                  uint64_t subscribe_id) {
     return *session->published_subscriptions_[subscribe_id]->largest_sent();
   }
+
+  // Adds an upstream fetch and a stream ready to receive data.
+  static std::unique_ptr<MoqtFetchTask> CreateUpstreamFetch(
+      MoqtSession* session, webtransport::Stream* stream) {
+    MoqtFetch fetch_message = {
+        0,
+        FullTrackName{"foo", "bar"},
+        128,
+        std::nullopt,
+        FullSequence{0, 0},
+        4,
+        std::nullopt,
+        MoqtSubscribeParameters(),
+    };
+    std::unique_ptr<MoqtFetchTask> task;
+    auto [it, success] = session->upstream_by_id_.try_emplace(
+        0, std::make_unique<UpstreamFetch>(
+               fetch_message, [&](std::unique_ptr<MoqtFetchTask> fetch_task) {
+                 task = std::move(fetch_task);
+               }));
+    QUICHE_DCHECK(success);
+    UpstreamFetch* fetch = static_cast<UpstreamFetch*>(it->second.get());
+    // Initialize the fetch task
+    fetch->OnFetchResult(
+        FullSequence{4, 10}, absl::OkStatus(),
+        [=, session_ptr = session, fetch_id = fetch_message.subscribe_id]() {
+          session_ptr->CancelFetch(fetch_id);
+        });
+    ;
+    auto mock_session =
+        static_cast<webtransport::test::MockSession*>(session->session());
+    EXPECT_CALL(*mock_session, AcceptIncomingUnidirectionalStream())
+        .WillOnce(testing::Return(stream))
+        .WillOnce(testing::Return(nullptr));
+    session->OnIncomingUnidirectionalStreamAvailable();
+    return task;
+  }
 };
 
 }  // namespace moqt::test