Refresh MoQT integration test for FETCH and fix detected errors.
PiperOrigin-RevId: 712510813
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 1c968ee..a268578 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -16,6 +16,7 @@
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
@@ -336,68 +337,55 @@
}
}
-// TODO(martinduke): Restore this test when FETCH is implemented.
-#if 0
TEST_F(MoqtIntegrationTest, FetchItemsFromPast) {
EstablishSession();
MoqtKnownTrackPublisher publisher;
server_->session()->set_publisher(&publisher);
- for (MoqtForwardingPreference forwarding_preference :
- {MoqtForwardingPreference::kSubgroup,
- MoqtForwardingPreference::kDatagram}) {
- SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference));
- MockSubscribeRemoteTrackVisitor client_visitor;
- std::string name =
- absl::StrCat("pref_", static_cast<int>(forwarding_preference));
- auto queue = std::make_shared<MoqtOutgoingQueue>(
- FullTrackName{"test", name}, forwarding_preference);
- publisher.Add(queue);
- for (int i = 0; i < 100; ++i) {
- queue->AddObject(MemSliceFromString("object"), /*key=*/true);
- }
-
- client_->session()->SubscribeAbsolute(FullTrackName("test", name), 0, 0,
- &client_visitor);
- int received = 0;
- // Those won't arrive since they have expired.
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{0, 0}, _, _, _, true))
- .Times(0);
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{0, 0}, _, _, _, true))
- .Times(0);
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{96, 0}, _, _, _, true))
- .Times(0);
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{96, 0}, _, _, _, true))
- .Times(0);
- // Those are within the "last three groups" window.
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{97, 0}, _, _, _, true))
- .WillOnce([&] { ++received; });
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{97, 1}, _, _, _, true))
- .WillOnce([&] { ++received; });
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{98, 0}, _, _, _, true))
- .WillOnce([&] { ++received; });
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{98, 1}, _, _, _, true))
- .WillOnce([&] { ++received; });
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{99, 0}, _, _, _, true))
- .WillOnce([&] { ++received; });
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{99, 1}, _, _, _, true))
- .Times(0); // The current group should not be closed yet.
- bool success = test_harness_.RunUntilWithDefaultTimeout(
- [&]() { return received >= 5; });
- EXPECT_TRUE(success);
+ MockSubscribeRemoteTrackVisitor client_visitor;
+ FullTrackName full_track_name("test", "fetch");
+ auto queue = std::make_shared<MoqtOutgoingQueue>(
+ full_track_name, MoqtForwardingPreference::kSubgroup);
+ publisher.Add(queue);
+ for (int i = 0; i < 100; ++i) {
+ queue->AddObject(MemSliceFromString("object"), /*key=*/true);
}
+ std::unique_ptr<MoqtFetchTask> fetch;
+ EXPECT_TRUE(client_->session()->Fetch(
+ full_track_name,
+ [&](std::unique_ptr<MoqtFetchTask> task) { fetch = std::move(task); },
+ FullSequence{0, 0}, 99, std::nullopt, 128, std::nullopt,
+ MoqtSubscribeParameters()));
+ // Run until we get FETCH_OK.
+ bool success = test_harness_.RunUntilWithDefaultTimeout(
+ [&]() { return fetch != nullptr; });
+ EXPECT_TRUE(success);
+
+ EXPECT_TRUE(fetch->GetStatus().ok());
+ EXPECT_EQ(fetch->GetLargestId(), FullSequence(99, 0));
+ MoqtFetchTask::GetNextObjectResult result;
+ PublishedObject object;
+ FullSequence expected{97, 0};
+ do {
+ result = fetch->GetNextObject(object);
+ if (result == MoqtFetchTask::GetNextObjectResult::kEof) {
+ break;
+ }
+ EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kSuccess);
+ EXPECT_EQ(object.sequence, expected);
+ if (object.sequence.object == 1) {
+ EXPECT_EQ(object.status, MoqtObjectStatus::kEndOfGroup);
+ expected.object = 0;
+ ++expected.group;
+ } else {
+ EXPECT_EQ(object.status, MoqtObjectStatus::kNormal);
+ EXPECT_EQ(object.payload.AsStringView(), "object");
+ ++expected.object;
+ }
+ } while (result == MoqtFetchTask::GetNextObjectResult::kSuccess);
+ EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kEof);
+ EXPECT_EQ(expected, FullSequence(99, 1));
}
-#endif
TEST_F(MoqtIntegrationTest, AnnounceFailure) {
EstablishSession();
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index a7473b1..ddc9ae9 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -1415,11 +1415,9 @@
MoqtSession::IncomingDataStream::~IncomingDataStream() {
if (parser_.track_alias().has_value() &&
- parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch) {
- RemoteTrack* track = session_->RemoteTrackById(*parser_.track_alias());
- if (track != nullptr && track->is_fetch()) {
- session_->upstream_by_id_.erase(*parser_.track_alias());
- }
+ parser_.stream_type() == MoqtDataStreamType::kStreamHeaderFetch &&
+ track_.IsValid()) {
+ session_->upstream_by_id_.erase(*parser_.track_alias());
}
}
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index d8e31a3..8f6a77e 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -75,14 +75,19 @@
need_object_available_callback_ = true;
return kPending;
}
- quiche::QuicheMemSlice message_slice(std::move(payload_));
+ if (!payload_.empty()) {
+ quiche::QuicheMemSlice message_slice(std::move(payload_));
+ output.payload = std::move(message_slice);
+ }
output.sequence = FullSequence(next_object_->group_id,
next_object_->subgroup_id.value_or(0),
next_object_->object_id);
output.status = next_object_->object_status;
output.publisher_priority = next_object_->publisher_priority;
- output.payload = std::move(message_slice);
output.fin_after_this = false;
+ if (output.sequence == largest_id_) { // This is the last object.
+ eof_ = true;
+ }
next_object_.reset();
can_read_callback_();
return kSuccess;