Make incoming FETCH asynchronous to support going further upstream for objects.

Today, when FETCH arrives, MoQT expects the application to tell it immediately if the data is available. Receiving MoqtFetchTask from the application is a signal that it is safe to send FETCH_OK.

Change MoqtFetchTask to have both a FetchOkCallback and a FetchErrorCallback to facilitate delayed notification as the result of an upstream FETCH.

This CL does not change the current outgoing FETCH model where MoQT does not deliver MoqtFetchTask to the calling application until FETCH_OK arrives. This is deferred to a later CL. It currently "works" but there ought to be a consistent API.

PiperOrigin-RevId: 754128549
diff --git a/quiche/quic/moqt/moqt_failed_fetch.h b/quiche/quic/moqt/moqt_failed_fetch.h
index e825b82..fd13a52 100644
--- a/quiche/quic/moqt/moqt_failed_fetch.h
+++ b/quiche/quic/moqt/moqt_failed_fetch.h
@@ -24,7 +24,13 @@
   absl::Status GetStatus() override { return status_; }
   void SetObjectAvailableCallback(
       ObjectsAvailableCallback /*callback*/) override {}
-  Location GetLargestId() const override { return Location(); }
+  void SetFetchResponseCallback(FetchResponseCallback callback) {
+    MoqtFetchError error;
+    error.subscribe_id = 0;
+    error.error_code = StatusToSubscribeErrorCode(status_);
+    error.reason_phrase = status_.message();
+    std::move(callback)(error);
+  }
 
  private:
   absl::Status status_;
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 73a67d6..2f785f6 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -416,7 +416,6 @@
   EXPECT_TRUE(success);
 
   EXPECT_TRUE(fetch->GetStatus().ok());
-  EXPECT_EQ(fetch->GetLargestId(), Location(99, 0));
   MoqtFetchTask::GetNextObjectResult result;
   PublishedObject object;
   Location expected{97, 0};
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index eba04a3..516c62a 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -92,6 +92,25 @@
   return static_cast<MoqtObjectStatus>(integer);
 }
 
+SubscribeErrorCode StatusToSubscribeErrorCode(absl::Status status) {
+  QUICHE_DCHECK(!status.ok());
+  switch (status.code()) {
+    case absl::StatusCode::kPermissionDenied:
+    case absl::StatusCode::kUnauthenticated:
+      return SubscribeErrorCode::kUnauthorized;
+    case absl::StatusCode::kDeadlineExceeded:
+      return SubscribeErrorCode::kTimeout;
+    case absl::StatusCode::kUnavailable:
+      return SubscribeErrorCode::kNotSupported;
+    case absl::StatusCode::kNotFound:
+      return SubscribeErrorCode::kDoesNotExist;
+    case absl::StatusCode::kOutOfRange:
+      return SubscribeErrorCode::kInvalidRange;
+    default:
+      return SubscribeErrorCode::kInternalError;
+  }
+}
+
 MoqtFilterType GetFilterType(const MoqtSubscribe& message) {
   if (message.start.has_value()) {
     if (message.end_group.has_value()) {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 41809c7..77d916e 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -230,6 +230,8 @@
 };
 using MoqtAnnounceErrorReason = MoqtSubscribeErrorReason;
 
+SubscribeErrorCode StatusToSubscribeErrorCode(absl::Status status);
+
 // Full track name represents a tuple of name elements. All higher order
 // elements MUST be present, but lower-order ones (like the name) can be
 // omitted.
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index 3d3c7f0..af480ca 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -110,12 +110,36 @@
 
     GetNextObjectResult GetNextObject(PublishedObject&) override;
     absl::Status GetStatus() override { return status_; }
-    Location GetLargestId() const override { return objects_.back(); }
 
     void SetObjectAvailableCallback(
-        ObjectsAvailableCallback /*callback*/) override {
+        ObjectsAvailableCallback callback) override {
       // Not needed since all objects in a fetch against an in-memory queue are
       // guaranteed to resolve immediately.
+      callback();
+    }
+    void SetFetchResponseCallback(FetchResponseCallback callback) override {
+      if (!status_.ok()) {
+        MoqtFetchError error(0, StatusToSubscribeErrorCode(status_),
+                             std::string(status_.message()));
+        error.error_code = StatusToSubscribeErrorCode(status_);
+        error.reason_phrase = status_.message();
+        std::move(callback)(error);
+        return;
+      }
+      if (objects_.empty()) {
+        MoqtFetchError error(0, StatusToSubscribeErrorCode(status_),
+                             "No objects in range");
+        std::move(callback)(error);
+        return;
+      }
+      MoqtFetchOk ok;
+      ok.group_order = MoqtDeliveryOrder::kAscending;
+      ok.largest_id = *(objects_.crbegin());
+      if (objects_.size() > 1 && *(objects_.cbegin()) > ok.largest_id) {
+        ok.group_order = MoqtDeliveryOrder::kDescending;
+        ok.largest_id = *(objects_.cbegin());
+      }
+      std::move(callback)(ok);
     }
 
    private:
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
index b70dfc9..77d85d7 100644
--- a/quiche/quic/moqt/moqt_publisher.h
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -8,6 +8,7 @@
 #include <cstdint>
 #include <memory>
 #include <optional>
+#include <variant>
 #include <vector>
 
 #include "absl/status/status.h"
@@ -75,6 +76,10 @@
 class MoqtFetchTask {
  public:
   using ObjectsAvailableCallback = quiche::MultiUseCallback<void()>;
+  // If the fields are not correct (e.g. end_of_track is less than start) it
+  // will result in QUICHE_BUG. The request_id field will be ignored.
+  using FetchResponseCallback = quiche::SingleUseCallback<void(
+      std::variant<MoqtFetchOk, MoqtFetchError>)>;
 
   virtual ~MoqtFetchTask() = default;
 
@@ -91,7 +96,8 @@
     kError,
   };
 
-  // Returns the next object received via the fetch, if available.
+  // Returns the next object received via the fetch, if available. MUST NOT
+  // return an object with status kObjectDoesNotExist.
   virtual GetNextObjectResult GetNextObject(PublishedObject& output) = 0;
 
   // Sets the callback that is called when GetNextObject() has previously
@@ -99,15 +105,18 @@
   // end-of-fetch) is available. The application is responsible for calling
   // GetNextObject() until it gets kPending; no further callback will occur
   // until then.
+  // If an object is available immediately, the callback will be called
+  // immediately.
   virtual void SetObjectAvailableCallback(
       ObjectsAvailableCallback callback) = 0;
+  // One of these callbacks is called as soon as the data publisher has enough
+  // information for either FETCH_OK or FETCH_ERROR.
+  // If the appropriate response is already available, the callback will be
+  // called immediately.
+  virtual void SetFetchResponseCallback(FetchResponseCallback callback) = 0;
 
   // Returns the error if fetch has completely failed, and OK otherwise.
   virtual absl::Status GetStatus() = 0;
-
-  // Returns the highest sequence number that will be delivered by the fetch.
-  // It is the minimum of the end of the fetch range and the live edge.
-  virtual Location GetLargestId() const = 0;
 };
 
 // MoqtTrackPublisher is an application-side API for an MoQT publisher
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 967d81a..5fba8a1 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -11,6 +11,7 @@
 #include <optional>
 #include <string>
 #include <utility>
+#include <variant>
 #include <vector>
 
 
@@ -504,6 +505,8 @@
       case MoqtFetchTask::GetNextObjectResult::kSuccess:
         // Skip ObjectDoesNotExist in FETCH.
         if (object.status == MoqtObjectStatus::kObjectDoesNotExist) {
+          QUIC_BUG(quic_bug_got_doesnotexist_in_fetch)
+              << "Got ObjectDoesNotExist in FETCH";
           continue;
         }
         if (fetch->session_->WriteObjectToStream(
@@ -686,13 +689,14 @@
     return false;
   }
   fetch->SetStreamId(new_stream->GetStreamId());
-  new_stream->SetVisitor(
-      std::make_unique<PublishedFetch::FetchStreamVisitor>(fetch, new_stream));
-  if (new_stream->CanWrite()) {
-    new_stream->visitor()->OnCanWrite();
-  }
   new_stream->SetPriority(webtransport::StreamPriority{
       /*send_group_id=*/kMoqtSendGroupId, send_order});
+  // The line below will lead to updating ObjectsAvailableCallback in the
+  // FetchTask to call OnCanWrite() on the stream. If there is an object
+  // available, the callback will be invoked synchronously (i.e. before
+  // SetVisitor() returns).
+  new_stream->SetVisitor(
+      std::make_unique<PublishedFetch::FetchStreamVisitor>(fetch, new_stream));
   return true;
 }
 
@@ -1332,21 +1336,54 @@
                     << " could not be added to the session";
     SendFetchError(message.fetch_id, SubscribeErrorCode::kInternalError,
                    "Could not initialize FETCH state");
-    return;
   }
-  MoqtFetchOk fetch_ok;
-  fetch_ok.subscribe_id = message.fetch_id;
-  fetch_ok.group_order =
-      message.group_order.value_or((*track_publisher)->GetDeliveryOrder());
-  fetch_ok.largest_id = result.first->second->fetch_task()->GetLargestId();
-  SendOrBufferMessage(session_->framer_.SerializeFetchOk(fetch_ok));
-  webtransport::SendOrder send_order =
-      SendOrderForFetch(message.subscriber_priority);
-  if (!session_->session()->CanOpenNextOutgoingUnidirectionalStream() ||
-      !session_->OpenDataStream(result.first->second, send_order)) {
-    // Put the FETCH in the queue for a new stream.
-    session_->UpdateQueuedSendOrder(message.fetch_id, std::nullopt, send_order);
-  }
+  MoqtFetchTask* fetch_task = result.first->second->fetch_task();
+  fetch_task->SetFetchResponseCallback(
+      [this, request_id = message.fetch_id, fetch_start = start_object,
+       fetch_end = Location(end_group, end_object.value_or(UINT64_MAX))](
+          std::variant<MoqtFetchOk, MoqtFetchError> message) {
+        if (!session_->incoming_fetches_.contains(request_id)) {
+          return;  // FETCH was cancelled.
+        }
+        if (std::holds_alternative<MoqtFetchOk>(message)) {
+          MoqtFetchOk& fetch_ok = std::get<MoqtFetchOk>(message);
+          fetch_ok.subscribe_id = request_id;
+          if (fetch_ok.largest_id < fetch_start ||
+              fetch_ok.largest_id > fetch_end) {
+            // TODO(martinduke): Add end_of_track to fetch_ok and check it's
+            // larger than largest_id.
+            QUIC_BUG(quic_bug_fetch_ok_status_error)
+                << "FETCH_OK end or end_of_track is invalid";
+            session_->Error(MoqtError::kInternalError, "FETCH_OK status error");
+            return;
+          }
+          SendOrBufferMessage(session_->framer_.SerializeFetchOk(fetch_ok));
+          return;
+        }
+        MoqtFetchError& fetch_error = std::get<MoqtFetchError>(message);
+        fetch_error.subscribe_id = request_id;
+        SendOrBufferMessage(session_->framer_.SerializeFetchError(fetch_error));
+      });
+  // Set a temporary new-object callback that creates a data stream. When
+  // created, the stream visitor will replace this callback.
+  fetch_task->SetObjectAvailableCallback(
+      [this, send_order = SendOrderForFetch(message.subscriber_priority),
+       request_id = message.fetch_id]() {
+        auto it = session_->incoming_fetches_.find(request_id);
+        if (it == session_->incoming_fetches_.end()) {
+          return;
+        }
+        if (!session_->session()->CanOpenNextOutgoingUnidirectionalStream() ||
+            !session_->OpenDataStream(it->second, send_order)) {
+          if (!session_->subscribes_with_queued_outgoing_data_streams_.contains(
+                  SubscriptionWithQueuedStream(request_id, send_order))) {
+            // Put the FETCH in the queue for a new stream unless it has already
+            // done so.
+            session_->UpdateQueuedSendOrder(request_id, std::nullopt,
+                                            send_order);
+          }
+        }
+      });
 }
 
 void MoqtSession::ControlStream::OnFetchOkMessage(const MoqtFetchOk& message) {
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index efe079e..2513404 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -33,6 +33,7 @@
 #include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
 #include "quiche/quic/platform/api/quic_test.h"
 #include "quiche/quic/test_tools/quic_test_utils.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_stream.h"
 #include "quiche/common/simple_buffer_allocator.h"
@@ -2023,85 +2024,198 @@
   session_.OnCanCreateNewOutgoingUnidirectionalStream();
 }
 
-TEST_F(MoqtSessionTest, FetchReturnsOk) {
-  std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtFetch fetch = DefaultFetch();
-  MockTrackPublisher* track = CreateTrackPublisher();
-  SetLargestId(track, Location(0, 0));
-
-  auto fetch_task_ptr = std::make_unique<MockFetchTask>();
-  EXPECT_CALL(*track, Fetch).WillOnce(Return(std::move(fetch_task_ptr)));
-  // Compose and send the FETCH_OK.
-  EXPECT_CALL(mock_stream_,
-              Writev(ControlMessageOfType(MoqtMessageType::kFetchOk), _));
-  // Stream can't open yet.
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream)
-      .WillOnce(Return(false));
-  stream_input->OnFetchMessage(fetch);
-}
-
-TEST_F(MoqtSessionTest, FetchReturnsOkImmediateOpen) {
-  webtransport::test::MockStream control_stream;
-  std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
-  MoqtFetch fetch = DefaultFetch();
-  MockTrackPublisher* track = CreateTrackPublisher();
-  SetLargestId(track, Location(0, 0));
-
-  auto fetch_task_ptr = std::make_unique<MockFetchTask>();
-  MockFetchTask* fetch_task = fetch_task_ptr.get();
-  EXPECT_CALL(*track, Fetch(_, _, _, _))
-      .WillOnce(Return(std::move(fetch_task_ptr)));
-  // Compose and send the FETCH_OK.
-  EXPECT_CALL(control_stream,
-              Writev(ControlMessageOfType(MoqtMessageType::kFetchOk), _));
-  // Open stream immediately.
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream)
+// Helper functions to handle the many EXPECT_CALLs for FETCH processing and
+// delivery.
+namespace {
+// Handles all the mock calls for the first object available for a FETCH.
+void ExpectStreamOpen(
+    webtransport::test::MockSession& session, MockFetchTask* fetch_task,
+    webtransport::test::MockStream& data_stream,
+    std::unique_ptr<webtransport::StreamVisitor>& stream_visitor) {
+  EXPECT_CALL(session, CanOpenNextOutgoingUnidirectionalStream)
       .WillOnce(Return(true));
-  webtransport::test::MockStream data_stream;
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
+  EXPECT_CALL(session, OpenOutgoingUnidirectionalStream())
       .WillOnce(Return(&data_stream));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(data_stream, SetVisitor(_))
+  EXPECT_CALL(data_stream, SetVisitor)
       .WillOnce(
           Invoke([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
             stream_visitor = std::move(visitor);
           }));
-  EXPECT_CALL(data_stream, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(data_stream, visitor()).WillOnce(Invoke([&]() {
-    return stream_visitor.get();
-  }));
-  EXPECT_CALL(data_stream, SetPriority(_)).Times(1);
-  EXPECT_CALL(*fetch_task, GetNextObject(_))
-      .WillOnce(Return(MoqtFetchTask::GetNextObjectResult::kPending));
-  stream_input->OnFetchMessage(fetch);
+  EXPECT_CALL(data_stream, SetPriority).Times(1);
+}
 
-  // Signal the stream that pending object is now available.
-  EXPECT_CALL(data_stream, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(*fetch_task, GetNextObject(_))
-      .WillOnce(Invoke([](PublishedObject& output) {
-        output.sequence = Location(0, 0, 0);
-        output.status = MoqtObjectStatus::kNormal;
+// Sets expectations to send one object at the start of the stream, and then
+// return a different status on the second GetNextObject call. |second_result|
+// cannot be kSuccess.
+void ExpectSendObject(MockFetchTask* fetch_task,
+                      webtransport::test::MockStream& data_stream,
+                      MoqtObjectStatus status, Location location,
+                      absl::string_view payload,
+                      MoqtFetchTask::GetNextObjectResult second_result) {
+  // Nothing is sent for status = kObjectDoesNotExist. Do not use this function.
+  QUICHE_DCHECK(status != MoqtObjectStatus::kObjectDoesNotExist);
+  QUICHE_DCHECK(second_result != MoqtFetchTask::GetNextObjectResult::kSuccess);
+  EXPECT_CALL(data_stream, CanWrite).WillRepeatedly(Return(true));
+  EXPECT_CALL(*fetch_task, GetNextObject)
+      .WillOnce(Invoke([=](PublishedObject& output) {
+        output.sequence = location;
+        output.status = status;
         output.publisher_priority = 128;
-        output.payload = MemSliceFromString("foo");
-        output.fin_after_this = true;
+        output.payload = quiche::QuicheMemSlice::Copy(payload);
+        output.fin_after_this = true;  // should be ignored.
         return MoqtFetchTask::GetNextObjectResult::kSuccess;
       }))
-      .WillOnce(Invoke([](PublishedObject& /*output*/) {
-        return MoqtFetchTask::GetNextObjectResult::kPending;
-      }));
-  EXPECT_CALL(data_stream, Writev(_, _))
-      .WillOnce([&](absl::Span<const absl::string_view> data,
-                    const quiche::StreamWriteOptions& options) {
+      .WillOnce(
+          Invoke([=](PublishedObject& /*output*/) { return second_result; }));
+  if (second_result == MoqtFetchTask::GetNextObjectResult::kEof) {
+    EXPECT_CALL(data_stream, Writev)
+        .WillOnce(Invoke([](absl::Span<const absl::string_view> data,
+                            const quiche::StreamWriteOptions& options) {
+          quic::QuicDataReader reader(data[0]);
+          uint64_t type;
+          EXPECT_TRUE(reader.ReadVarInt62(&type));
+          EXPECT_EQ(type, static_cast<uint64_t>(
+                              MoqtDataStreamType::kStreamHeaderFetch));
+          EXPECT_FALSE(options.send_fin());  // fin_after_this is ignored.
+          return absl::OkStatus();
+        }))
+        .WillOnce(Invoke([](absl::Span<const absl::string_view> data,
+                            const quiche::StreamWriteOptions& options) {
+          EXPECT_TRUE(data.empty());
+          EXPECT_TRUE(options.send_fin());
+          return absl::OkStatus();
+        }));
+    return;
+  }
+  EXPECT_CALL(data_stream, Writev)
+      .WillOnce(Invoke([](absl::Span<const absl::string_view> data,
+                          const quiche::StreamWriteOptions& options) {
         quic::QuicDataReader reader(data[0]);
         uint64_t type;
         EXPECT_TRUE(reader.ReadVarInt62(&type));
         EXPECT_EQ(type, static_cast<uint64_t>(
                             MoqtDataStreamType::kStreamHeaderFetch));
+        EXPECT_FALSE(options.send_fin());  // fin_after_this is ignored.
         return absl::OkStatus();
-      });
-  fetch_task->objects_available_callback()();
+      }));
+  if (second_result == MoqtFetchTask::GetNextObjectResult::kError) {
+    EXPECT_CALL(data_stream, ResetWithUserCode);
+  }
+}
+}  // namespace
+
+// All callbacks are called asynchronously.
+TEST_F(MoqtSessionTest, ProcessFetchGetEverythingFromUpstream) {
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MoqtFetch fetch = DefaultFetch();
+  MockTrackPublisher* track = CreateTrackPublisher();
+
+  // No callbacks are synchronous. MockFetchTask will store the callbacks.
+  auto fetch_task_ptr = std::make_unique<MockFetchTask>();
+  MockFetchTask* fetch_task = fetch_task_ptr.get();
+  EXPECT_CALL(*track, Fetch).WillOnce(Return(std::move(fetch_task_ptr)));
+  stream_input->OnFetchMessage(fetch);
+
+  // Compose and send the FETCH_OK.
+  MoqtFetchOk expected_ok;
+  expected_ok.subscribe_id = fetch.fetch_id;
+  expected_ok.group_order = MoqtDeliveryOrder::kAscending;
+  expected_ok.largest_id = Location(1, 4);
+  EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _));
+  fetch_task->CallFetchResponseCallback(expected_ok);
+  // Data arrives.
+  webtransport::test::MockStream data_stream;
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  ExpectStreamOpen(mock_session_, fetch_task, data_stream, stream_visitor);
+  ExpectSendObject(fetch_task, data_stream, MoqtObjectStatus::kNormal,
+                   Location(0, 0), "foo",
+                   MoqtFetchTask::GetNextObjectResult::kPending);
+  fetch_task->CallObjectsAvailableCallback();
+}
+
+// All callbacks are called synchronously. All relevant data is cached (or this
+// is the original publisher).
+TEST_F(MoqtSessionTest, ProcessFetchWholeRangeIsPresent) {
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MoqtFetch fetch = DefaultFetch();
+  MockTrackPublisher* track = CreateTrackPublisher();
+
+  MoqtFetchOk expected_ok;
+  expected_ok.subscribe_id = fetch.fetch_id;
+  expected_ok.group_order = MoqtDeliveryOrder::kAscending;
+  expected_ok.largest_id = Location(1, 4);
+  auto fetch_task_ptr =
+      std::make_unique<MockFetchTask>(expected_ok, std::nullopt, true);
+  MockFetchTask* fetch_task = fetch_task_ptr.get();
+  EXPECT_CALL(*track, Fetch).WillOnce(Return(std::move(fetch_task_ptr)));
+  EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _));
+  webtransport::test::MockStream data_stream;
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  ExpectStreamOpen(mock_session_, fetch_task, data_stream, stream_visitor);
+  ExpectSendObject(fetch_task, data_stream, MoqtObjectStatus::kNormal,
+                   Location(0, 0), "foo",
+                   MoqtFetchTask::GetNextObjectResult::kPending);
+  // Everything spins upon message receipt. FetchTask is generating the
+  // necessary callbacks.
+  stream_input->OnFetchMessage(fetch);
+}
+
+// The publisher has the first object locally, but has to go upstream to get
+// the rest.
+TEST_F(MoqtSessionTest, FetchReturnsObjectBeforeOk) {
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MoqtFetch fetch = DefaultFetch();
+  MockTrackPublisher* track = CreateTrackPublisher();
+
+  // Object returns synchronously.
+  auto fetch_task_ptr =
+      std::make_unique<MockFetchTask>(std::nullopt, std::nullopt, true);
+  MockFetchTask* fetch_task = fetch_task_ptr.get();
+  EXPECT_CALL(*track, Fetch).WillOnce(Return(std::move(fetch_task_ptr)));
+  webtransport::test::MockStream data_stream;
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  ExpectStreamOpen(mock_session_, fetch_task, data_stream, stream_visitor);
+  ExpectSendObject(fetch_task, data_stream, MoqtObjectStatus::kNormal,
+                   Location(0, 0), "foo",
+                   MoqtFetchTask::GetNextObjectResult::kPending);
+  stream_input->OnFetchMessage(fetch);
+
+  MoqtFetchOk expected_ok;
+  expected_ok.subscribe_id = fetch.fetch_id;
+  expected_ok.group_order = MoqtDeliveryOrder::kAscending;
+  expected_ok.largest_id = Location(1, 4);
+  EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _));
+  fetch_task->CallFetchResponseCallback(expected_ok);
+}
+
+TEST_F(MoqtSessionTest, FetchReturnsObjectBeforeError) {
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MoqtFetch fetch = DefaultFetch();
+  MockTrackPublisher* track = CreateTrackPublisher();
+
+  auto fetch_task_ptr =
+      std::make_unique<MockFetchTask>(std::nullopt, std::nullopt, true);
+  MockFetchTask* fetch_task = fetch_task_ptr.get();
+  EXPECT_CALL(*track, Fetch).WillOnce(Return(std::move(fetch_task_ptr)));
+  webtransport::test::MockStream data_stream;
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  ExpectStreamOpen(mock_session_, fetch_task, data_stream, stream_visitor);
+  ExpectSendObject(fetch_task, data_stream, MoqtObjectStatus::kNormal,
+                   Location(0, 0), "foo",
+                   MoqtFetchTask::GetNextObjectResult::kPending);
+  stream_input->OnFetchMessage(fetch);
+
+  MoqtFetchError expected_error;
+  expected_error.subscribe_id = fetch.fetch_id;
+  expected_error.error_code = SubscribeErrorCode::kDoesNotExist;
+  expected_error.reason_phrase = "foo";
+  EXPECT_CALL(mock_stream_,
+              Writev(SerializedControlMessage(expected_error), _));
+  fetch_task->CallFetchResponseCallback(expected_error);
 }
 
 TEST_F(MoqtSessionTest, InvalidFetch) {
@@ -2135,130 +2249,33 @@
   stream_input->OnFetchMessage(fetch);
 }
 
-TEST_F(MoqtSessionTest, FetchDelivery) {
-  constexpr uint64_t kFetchId = 0;
-  MockFetchTask* fetch = MoqtSessionPeer::AddFetch(&session_, kFetchId);
-  // Stream creation started out blocked. Allow its creation, but data is
-  // blocked.
-  webtransport::test::MockStream data_stream;
+TEST_F(MoqtSessionTest, FullFetchDeliveryWithFlowControl) {
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MoqtFetch fetch = DefaultFetch();
+  MockTrackPublisher* track = CreateTrackPublisher();
+
+  auto fetch_task_ptr =
+      std::make_unique<MockFetchTask>(std::nullopt, std::nullopt, true);
+  MockFetchTask* fetch_task = fetch_task_ptr.get();
+  EXPECT_CALL(*track, Fetch).WillOnce(Return(std::move(fetch_task_ptr)));
+
+  stream_input->OnFetchMessage(fetch);
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillRepeatedly(Return(true));
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&data_stream));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(data_stream, GetStreamId()).WillOnce(Return(4));
-  EXPECT_CALL(data_stream, SetVisitor(_))
-      .WillOnce(
-          Invoke([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-            stream_visitor = std::move(visitor);
-          }));
-  EXPECT_CALL(data_stream, CanWrite()).WillOnce(Return(false));
-  EXPECT_CALL(data_stream, SetPriority(_)).Times(1);
-  session_.OnCanCreateNewOutgoingUnidirectionalStream();
-  // Unblock the stream. Provide one object and an EOF.
-  EXPECT_CALL(data_stream, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(*fetch, GetNextObject(_))
-      .WillOnce(Invoke([](PublishedObject& output) {
-        output.sequence = Location(0, 0, 0);
-        output.status = MoqtObjectStatus::kNormal;
-        output.publisher_priority = 128;
-        output.payload = MemSliceFromString("foo");
-        output.fin_after_this = true;
-        return MoqtFetchTask::GetNextObjectResult::kSuccess;
-      }))
-      .WillOnce(Invoke([](PublishedObject& /*output*/) {
-        return MoqtFetchTask::GetNextObjectResult::kEof;
-      }));
+      .WillOnce(Return(false));
+  fetch_task->CallObjectsAvailableCallback();
 
-  int objects_received = 0;
-  EXPECT_CALL(data_stream, Writev(_, _))
-      .WillOnce(Invoke([&](absl::Span<const absl::string_view> data,
-                           const quiche::StreamWriteOptions& options) {
-        ++objects_received;
-        quic::QuicDataReader reader(data[0]);
-        uint64_t type;
-        EXPECT_TRUE(reader.ReadVarInt62(&type));
-        EXPECT_EQ(type, static_cast<uint64_t>(
-                            MoqtDataStreamType::kStreamHeaderFetch));
-        EXPECT_FALSE(options.send_fin());  // fin_after_this is ignored.
-        return absl::OkStatus();
-      }))
-      .WillOnce(Invoke([&](absl::Span<const absl::string_view> data,
-                           const quiche::StreamWriteOptions& options) {
-        ++objects_received;
-        EXPECT_TRUE(data.empty());
-        EXPECT_TRUE(options.send_fin());
-        return absl::OkStatus();
-      }));
-  stream_visitor->OnCanWrite();
-  EXPECT_EQ(objects_received, 2);
-}
-
-TEST_F(MoqtSessionTest, FetchNonNormalObjects) {
-  constexpr uint64_t kFetchId = 0;
-  MockFetchTask* fetch = MoqtSessionPeer::AddFetch(&session_, kFetchId);
-  // Stream creation started out blocked. Allow its creation, but data is
-  // blocked.
+  // Stream opens, but with no credit.
   webtransport::test::MockStream data_stream;
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillRepeatedly(Return(true));
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&data_stream));
   std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(data_stream, SetVisitor(_))
-      .WillOnce(
-          Invoke([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-            stream_visitor = std::move(visitor);
-          }));
+  ExpectStreamOpen(mock_session_, fetch_task, data_stream, stream_visitor);
   EXPECT_CALL(data_stream, CanWrite()).WillOnce(Return(false));
-  EXPECT_CALL(data_stream, SetPriority(_)).Times(1);
   session_.OnCanCreateNewOutgoingUnidirectionalStream();
-  // Unblock the stream. Provide one object and an EOF.
-  EXPECT_CALL(data_stream, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(*fetch, GetNextObject(_))
-      .WillOnce(Invoke([](PublishedObject& output) {
-        // DoesNotExist will be skipped.
-        output.sequence = Location(0, 0, 0);
-        output.status = MoqtObjectStatus::kObjectDoesNotExist;
-        output.publisher_priority = 128;
-        output.payload = MemSliceFromString("");
-        output.fin_after_this = true;
-        return MoqtFetchTask::GetNextObjectResult::kSuccess;
-      }))
-      .WillOnce(Invoke([](PublishedObject& output) {
-        output.sequence = Location(0, 0, 1);
-        output.status = MoqtObjectStatus::kEndOfGroup;
-        output.publisher_priority = 128;
-        output.payload = MemSliceFromString("");
-        output.fin_after_this = true;
-        return MoqtFetchTask::GetNextObjectResult::kSuccess;
-      }))
-      .WillOnce(Invoke([](PublishedObject& /*output*/) {
-        return MoqtFetchTask::GetNextObjectResult::kEof;
-      }));
-
-  int objects_received = 0;
-  EXPECT_CALL(data_stream, Writev(_, _))
-      .WillOnce(Invoke([&](absl::Span<const absl::string_view> data,
-                           const quiche::StreamWriteOptions& options) {
-        ++objects_received;
-        quic::QuicDataReader reader(data[0]);
-        uint64_t type;
-        EXPECT_TRUE(reader.ReadVarInt62(&type));
-        EXPECT_EQ(type, static_cast<uint64_t>(
-                            MoqtDataStreamType::kStreamHeaderFetch));
-        EXPECT_FALSE(options.send_fin());
-        return absl::OkStatus();
-      }))
-      .WillOnce(Invoke([&](absl::Span<const absl::string_view> data,
-                           const quiche::StreamWriteOptions& options) {
-        ++objects_received;
-        EXPECT_TRUE(data.empty());
-        EXPECT_TRUE(options.send_fin());
-        return absl::OkStatus();
-      }));
+  // Object with FIN
+  ExpectSendObject(fetch_task, data_stream, MoqtObjectStatus::kNormal,
+                   Location(0, 0), "foo",
+                   MoqtFetchTask::GetNextObjectResult::kEof);
   stream_visitor->OnCanWrite();
-  EXPECT_EQ(objects_received, 2);
 }
 
 TEST_F(MoqtSessionTest, IncomingJoiningFetch) {
@@ -2285,8 +2302,6 @@
   fetch.joining_fetch = {1, 2};
   EXPECT_CALL(*track, Fetch(Location(2, 0), 4, std::optional<uint64_t>(10), _))
       .WillOnce(Return(std::make_unique<MockFetchTask>()));
-  EXPECT_CALL(mock_stream_,
-              Writev(ControlMessageOfType(MoqtMessageType::kFetchOk), _));
   stream_input->OnFetchMessage(fetch);
 }
 
@@ -2462,7 +2477,6 @@
   };
   stream_input->OnFetchOkMessage(ok);
   ASSERT_NE(fetch_task, nullptr);
-  EXPECT_EQ(fetch_task->GetLargestId(), Location(3, 25));
   EXPECT_TRUE(fetch_task->GetStatus().ok());
   PublishedObject object;
   EXPECT_EQ(fetch_task->GetNextObject(object),
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index b323979..a829930 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -249,8 +249,10 @@
         ObjectsAvailableCallback callback) override {
       object_available_callback_ = std::move(callback);
     };
+    // TODO(martinduke): Implement the new API, but for now, only deliver the
+    // FetchTask on FETCH_OK.
+    void SetFetchResponseCallback(FetchResponseCallback callback) override {}
     absl::Status GetStatus() override { return status_; };
-    Location GetLargestId() const override { return largest_id_; }
 
     quiche::QuicheWeakPtr<UpstreamFetchTask> weak_ptr() {
       return weak_ptr_factory_.Create();
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 8b9703e..9ab9078 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -140,7 +140,6 @@
   EXPECT_NE(fetch_task_, nullptr);
   EXPECT_NE(fetch_.task(), nullptr);
   EXPECT_TRUE(fetch_task_->GetStatus().ok());
-  EXPECT_EQ(fetch_task_->GetLargestId(), Location(3, 50));
 }
 
 TEST_F(UpstreamFetchTest, FetchClosedByMoqt) {
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 611292a..e88744f 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -151,17 +151,6 @@
     session->peer_max_subscribe_id_ = id;
   }
 
-  static MockFetchTask* AddFetch(MoqtSession* session, uint64_t fetch_id) {
-    auto fetch_task = std::make_unique<MockFetchTask>();
-    MockFetchTask* return_ptr = fetch_task.get();
-    auto published_fetch = std::make_unique<MoqtSession::PublishedFetch>(
-        fetch_id, session, std::move(fetch_task));
-    session->incoming_fetches_.emplace(fetch_id, std::move(published_fetch));
-    // Add the fetch to the pending stream queue.
-    session->UpdateQueuedSendOrder(fetch_id, std::nullopt, 0);
-    return return_ptr;
-  }
-
   static MoqtSession::PublishedFetch* GetFetch(MoqtSession* session,
                                                uint64_t fetch_id) {
     auto it = session->incoming_fetches_.find(fetch_id);
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index 9d1f1e7..220d230 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -9,6 +9,7 @@
 #include <memory>
 #include <optional>
 #include <utility>
+#include <variant>
 #include <vector>
 
 #include "absl/status/status.h"
@@ -115,20 +116,56 @@
 
 class MockFetchTask : public MoqtFetchTask {
  public:
+  MockFetchTask() {};  // No synchronous callbacks.
+  MockFetchTask(std::optional<MoqtFetchOk> fetch_ok,
+                std::optional<MoqtFetchError> fetch_error,
+                bool synchronous_object_available)
+      : synchronous_fetch_ok_(fetch_ok),
+        synchronous_fetch_error_(fetch_error),
+        synchronous_object_available_(synchronous_object_available) {
+    QUICHE_DCHECK(!synchronous_fetch_ok_.has_value() ||
+                  !synchronous_fetch_error_.has_value());
+  }
+
   MOCK_METHOD(MoqtFetchTask::GetNextObjectResult, GetNextObject,
               (PublishedObject & output), (override));
   MOCK_METHOD(absl::Status, GetStatus, (), (override));
-  MOCK_METHOD(Location, GetLargestId, (), (const, override));
 
   void SetObjectAvailableCallback(ObjectsAvailableCallback callback) override {
     objects_available_callback_ = std::move(callback);
+    if (synchronous_object_available_) {
+      // The first call is installed by the session to trigger stream creation.
+      // An object might not exist yet.
+      objects_available_callback_();
+    }
+    // The second call is a result of the stream replacing the callback, which
+    // means there is an object available.
+    synchronous_object_available_ = true;
   }
-  ObjectsAvailableCallback& objects_available_callback() {
-    return objects_available_callback_;
-  };
+  void SetFetchResponseCallback(FetchResponseCallback callback) override {
+    if (synchronous_fetch_ok_.has_value()) {
+      std::move(callback)(*synchronous_fetch_ok_);
+      return;
+    }
+    if (synchronous_fetch_error_.has_value()) {
+      std::move(callback)(*synchronous_fetch_error_);
+      return;
+    }
+    fetch_response_callback_ = std::move(callback);
+  }
+
+  void CallObjectsAvailableCallback() { objects_available_callback_(); };
+  void CallFetchResponseCallback(
+      std::variant<MoqtFetchOk, MoqtFetchError> response) {
+    std::move(fetch_response_callback_)(response);
+  }
 
  private:
+  FetchResponseCallback fetch_response_callback_;
   ObjectsAvailableCallback objects_available_callback_;
+  std::optional<MoqtFetchOk> synchronous_fetch_ok_;
+  std::optional<MoqtFetchError> synchronous_fetch_error_;
+  bool synchronous_object_available_ = false;
 };
 
 }  // namespace moqt::test