Refresh MoQT integration test for FETCH and fix detected errors.

PiperOrigin-RevId: 712510813
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 1c968ee..a268578 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -16,6 +16,7 @@
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_outgoing_queue.h"
 #include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_session.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
@@ -336,68 +337,55 @@
   }
 }
 
-// TODO(martinduke): Restore this test when FETCH is implemented.
-#if 0
 TEST_F(MoqtIntegrationTest, FetchItemsFromPast) {
   EstablishSession();
   MoqtKnownTrackPublisher publisher;
   server_->session()->set_publisher(&publisher);
 
-  for (MoqtForwardingPreference forwarding_preference :
-       {MoqtForwardingPreference::kSubgroup,
-        MoqtForwardingPreference::kDatagram}) {
-    SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference));
-    MockSubscribeRemoteTrackVisitor client_visitor;
-    std::string name =
-        absl::StrCat("pref_", static_cast<int>(forwarding_preference));
-    auto queue = std::make_shared<MoqtOutgoingQueue>(
-        FullTrackName{"test", name}, forwarding_preference);
-    publisher.Add(queue);
-    for (int i = 0; i < 100; ++i) {
-      queue->AddObject(MemSliceFromString("object"), /*key=*/true);
-    }
-
-    client_->session()->SubscribeAbsolute(FullTrackName("test", name), 0, 0,
-                                          &client_visitor);
-    int received = 0;
-    // Those won't arrive since they have expired.
-    EXPECT_CALL(client_visitor,
-                OnObjectFragment(_, FullSequence{0, 0}, _, _, _, true))
-        .Times(0);
-    EXPECT_CALL(client_visitor,
-                OnObjectFragment(_, FullSequence{0, 0}, _, _, _, true))
-        .Times(0);
-    EXPECT_CALL(client_visitor,
-                OnObjectFragment(_, FullSequence{96, 0}, _, _, _, true))
-        .Times(0);
-    EXPECT_CALL(client_visitor,
-                OnObjectFragment(_, FullSequence{96, 0}, _, _, _, true))
-        .Times(0);
-    // Those are within the "last three groups" window.
-    EXPECT_CALL(client_visitor,
-                OnObjectFragment(_, FullSequence{97, 0}, _, _, _, true))
-        .WillOnce([&] { ++received; });
-    EXPECT_CALL(client_visitor,
-                OnObjectFragment(_, FullSequence{97, 1}, _, _, _, true))
-        .WillOnce([&] { ++received; });
-    EXPECT_CALL(client_visitor,
-                OnObjectFragment(_, FullSequence{98, 0}, _, _, _, true))
-        .WillOnce([&] { ++received; });
-    EXPECT_CALL(client_visitor,
-                OnObjectFragment(_, FullSequence{98, 1}, _, _, _, true))
-        .WillOnce([&] { ++received; });
-    EXPECT_CALL(client_visitor,
-                OnObjectFragment(_, FullSequence{99, 0}, _, _, _, true))
-        .WillOnce([&] { ++received; });
-    EXPECT_CALL(client_visitor,
-                OnObjectFragment(_, FullSequence{99, 1}, _, _, _, true))
-        .Times(0);  // The current group should not be closed yet.
-    bool success = test_harness_.RunUntilWithDefaultTimeout(
-        [&]() { return received >= 5; });
-    EXPECT_TRUE(success);
+  MockSubscribeRemoteTrackVisitor client_visitor;
+  FullTrackName full_track_name("test", "fetch");
+  auto queue = std::make_shared<MoqtOutgoingQueue>(
+      full_track_name, MoqtForwardingPreference::kSubgroup);
+  publisher.Add(queue);
+  for (int i = 0; i < 100; ++i) {
+    queue->AddObject(MemSliceFromString("object"), /*key=*/true);
   }
+  std::unique_ptr<MoqtFetchTask> fetch;
+  EXPECT_TRUE(client_->session()->Fetch(
+      full_track_name,
+      [&](std::unique_ptr<MoqtFetchTask> task) { fetch = std::move(task); },
+      FullSequence{0, 0}, 99, std::nullopt, 128, std::nullopt,
+      MoqtSubscribeParameters()));
+  // Run until we get FETCH_OK.
+  bool success = test_harness_.RunUntilWithDefaultTimeout(
+      [&]() { return fetch != nullptr; });
+  EXPECT_TRUE(success);
+
+  EXPECT_TRUE(fetch->GetStatus().ok());
+  EXPECT_EQ(fetch->GetLargestId(), FullSequence(99, 0));
+  MoqtFetchTask::GetNextObjectResult result;
+  PublishedObject object;
+  FullSequence expected{97, 0};
+  do {
+    result = fetch->GetNextObject(object);
+    if (result == MoqtFetchTask::GetNextObjectResult::kEof) {
+      break;
+    }
+    EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kSuccess);
+    EXPECT_EQ(object.sequence, expected);
+    if (object.sequence.object == 1) {
+      EXPECT_EQ(object.status, MoqtObjectStatus::kEndOfGroup);
+      expected.object = 0;
+      ++expected.group;
+    } else {
+      EXPECT_EQ(object.status, MoqtObjectStatus::kNormal);
+      EXPECT_EQ(object.payload.AsStringView(), "object");
+      ++expected.object;
+    }
+  } while (result == MoqtFetchTask::GetNextObjectResult::kSuccess);
+  EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kEof);
+  EXPECT_EQ(expected, FullSequence(99, 1));
 }
-#endif
 
 TEST_F(MoqtIntegrationTest, AnnounceFailure) {
   EstablishSession();
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index a7473b1..ddc9ae9 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -1415,11 +1415,9 @@
 
 MoqtSession::IncomingDataStream::~IncomingDataStream() {
   if (parser_.track_alias().has_value() &&
-      parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch) {
-    RemoteTrack* track = session_->RemoteTrackById(*parser_.track_alias());
-    if (track != nullptr && track->is_fetch()) {
-      session_->upstream_by_id_.erase(*parser_.track_alias());
-    }
+      parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch &&
+      track_.IsValid()) {
+    session_->upstream_by_id_.erase(*parser_.track_alias());
   }
 }
 
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index d8e31a3..8f6a77e 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -75,14 +75,19 @@
     need_object_available_callback_ = true;
     return kPending;
   }
-  quiche::QuicheMemSlice message_slice(std::move(payload_));
+  if (!payload_.empty()) {
+    quiche::QuicheMemSlice message_slice(std::move(payload_));
+    output.payload = std::move(message_slice);
+  }
   output.sequence = FullSequence(next_object_->group_id,
                                  next_object_->subgroup_id.value_or(0),
                                  next_object_->object_id);
   output.status = next_object_->object_status;
   output.publisher_priority = next_object_->publisher_priority;
-  output.payload = std::move(message_slice);
   output.fin_after_this = false;
+  if (output.sequence == largest_id_) {  // This is the last object.
+    eof_ = true;
+  }
   next_object_.reset();
   can_read_callback_();
   return kSuccess;