Allow fragmented MOQT object payloads. MoqtOutgoingQueue does not create objects in fragments, but MoqtLiveRelayQueue should be prepared to accept them. PiperOrigin-RevId: 914368728
diff --git a/build/source_list.bzl b/build/source_list.bzl index b2b27f6..d741557 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -1656,6 +1656,7 @@ "quic/moqt/moqt_messages_test.cc", "quic/moqt/moqt_names_test.cc", "quic/moqt/moqt_namespace_stream_test.cc", + "quic/moqt/moqt_object_test.cc", "quic/moqt/moqt_outgoing_queue_test.cc", "quic/moqt/moqt_outstanding_objects_test.cc", "quic/moqt/moqt_parser_fuzz_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni index c68b800..f07ff9b 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -1661,6 +1661,7 @@ "src/quiche/quic/moqt/moqt_messages_test.cc", "src/quiche/quic/moqt/moqt_names_test.cc", "src/quiche/quic/moqt/moqt_namespace_stream_test.cc", + "src/quiche/quic/moqt/moqt_object_test.cc", "src/quiche/quic/moqt/moqt_outgoing_queue_test.cc", "src/quiche/quic/moqt/moqt_outstanding_objects_test.cc", "src/quiche/quic/moqt/moqt_parser_fuzz_test.cc",
diff --git a/build/source_list.json b/build/source_list.json index 215e90a..dc40e1a 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -1660,6 +1660,7 @@ "quiche/quic/moqt/moqt_messages_test.cc", "quiche/quic/moqt/moqt_names_test.cc", "quiche/quic/moqt/moqt_namespace_stream_test.cc", + "quiche/quic/moqt/moqt_object_test.cc", "quiche/quic/moqt/moqt_outgoing_queue_test.cc", "quiche/quic/moqt/moqt_outstanding_objects_test.cc", "quiche/quic/moqt/moqt_parser_fuzz_test.cc",
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 6202316..ffa3e3d 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -331,13 +331,13 @@ EXPECT_CALL(subscribe_visitor_, OnObjectFragment) .WillOnce([&](const FullTrackName& full_track_name, const PublishedObjectMetadata& metadata, - absl::string_view object, bool end_of_message) { + absl::string_view object, uint64_t offset) { EXPECT_EQ(full_track_name, FullTrackName("test", "data")); EXPECT_EQ(metadata.location.group, 0u); EXPECT_EQ(metadata.location.object, 0u); EXPECT_EQ(metadata.status, MoqtObjectStatus::kNormal); EXPECT_EQ(object, "object data"); - EXPECT_TRUE(end_of_message); + EXPECT_EQ(offset, 0u); received_object = true; }); success = test_harness_.RunUntilWithDefaultTimeout( @@ -391,20 +391,20 @@ OnObjectFragment(_, MetadataLocationAndStatus( Location{0, 3}, MoqtObjectStatus::kEndOfGroup), - "", true)) + "", /*offset=*/0)) .WillOnce([&] { ++received; }); EXPECT_CALL(subscribe_visitor_, OnObjectFragment(_, MetadataLocationAndStatus( Location{1, 0}, MoqtObjectStatus::kNormal), - "object 4", true)) + "object 4", /*offset=*/0)) .WillOnce([&] { ++received; }); queue->AddObject(MemSliceFromString("object 4"), /*key=*/true); EXPECT_CALL(subscribe_visitor_, OnObjectFragment(_, MetadataLocationAndStatus( Location{1, 1}, MoqtObjectStatus::kNormal), - "object 5", true)) + "object 5", /*offset=*/0)) .WillOnce([&] { ++received; }); queue->AddObject(MemSliceFromString("object 5"), /*key=*/false); @@ -416,7 +416,7 @@ OnObjectFragment(_, MetadataLocationAndStatus( Location{1, 2}, MoqtObjectStatus::kNormal), - "object 6", true)) + "object 6", /*offset=*/0)) .WillOnce([&] { ++received; }); queue->AddObject(MemSliceFromString("object 6"), /*key=*/false); EXPECT_CALL( @@ -424,20 +424,20 @@ OnObjectFragment(_, MetadataLocationAndStatus( Location{1, 3}, MoqtObjectStatus::kEndOfGroup), - "", true)) + "", /*offset=*/0)) .WillOnce([&] { ++received; }); EXPECT_CALL(subscribe_visitor_, OnObjectFragment(_, MetadataLocationAndStatus( Location{2, 0}, MoqtObjectStatus::kNormal), - "object 7", true)) + "object 7", /*offset=*/0)) .WillOnce([&] { ++received; }); queue->AddObject(MemSliceFromString("object 7"), /*key=*/true); EXPECT_CALL(subscribe_visitor_, OnObjectFragment(_, MetadataLocationAndStatus( Location{2, 1}, MoqtObjectStatus::kNormal), - "object 8", true)) + "object 8", /*offset=*/0)) .WillOnce([&] { ++received; }); queue->AddObject(MemSliceFromString("object 8"), /*key=*/false); @@ -450,14 +450,14 @@ OnObjectFragment(_, MetadataLocationAndStatus( Location{2, 2}, MoqtObjectStatus::kEndOfGroup), - "", true)) + "", /*offset=*/0)) .WillOnce([&] { ++received; }); EXPECT_CALL( subscribe_visitor_, OnObjectFragment(_, MetadataLocationAndStatus( Location{3, 0}, MoqtObjectStatus::kEndOfTrack), - "", true)) + "", /*offset=*/0)) .WillOnce([&] { ++received; }); queue->Close(); success = test_harness_.RunUntilWithDefaultTimeout( @@ -499,7 +499,7 @@ EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kSuccess); EXPECT_EQ(object.metadata.location, expected); EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kNormal); - EXPECT_EQ(object.payload.AsStringView(), "object"); + EXPECT_EQ(object.payload[0].AsStringView(), "object"); ++expected.group; } while (result == MoqtFetchTask::GetNextObjectResult::kSuccess); EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kEof); @@ -779,7 +779,7 @@ .WillRepeatedly( [&](const FullTrackName&, const PublishedObjectMetadata& metadata, absl::string_view object, - bool end_of_message) { bytes_received += object.size(); }); + uint64_t offset) { bytes_received += object.size(); }); queue->AddObject(Location{0, 0}, 0, data, false); queue->AddObject(Location{0, 1}, 0, data, false); queue->AddObject(Location{0, 2}, 0, data, false); @@ -830,7 +830,7 @@ .WillRepeatedly( [&](const FullTrackName&, const PublishedObjectMetadata& metadata, absl::string_view object, - bool end_of_message) { bytes_received += object.size(); }); + uint64_t offset) { bytes_received += object.size(); }); queue->AddObject(Location{0, 0}, 0, data, false); queue->AddObject(Location{1, 0}, 0, data, false); success = test_harness_.RunUntilWithDefaultTimeout([&]() { @@ -904,7 +904,7 @@ OnObjectFragment(_, MetadataLocationAndStatus( Location{0, 0}, MoqtObjectStatus::kNormal), - kObjectPayload, true)) + kObjectPayload, /*offset=*/0)) .WillOnce([&] { ++received; }); success =
diff --git a/quiche/quic/moqt/moqt_object.cc b/quiche/quic/moqt/moqt_object.cc index 3ee1a72..b894d7a 100644 --- a/quiche/quic/moqt/moqt_object.cc +++ b/quiche/quic/moqt/moqt_object.cc
@@ -4,22 +4,58 @@ #include "quiche/quic/moqt/moqt_object.h" +#include <cstdint> + #include "absl/strings/string_view.h" -#include "quiche/common/quiche_mem_slice.h" +#include "absl/synchronization/mutex.h" +#include "quiche/common/platform/api/quiche_bug_tracker.h" +#include "quiche/common/quiche_cord_utils.h" namespace moqt { -moqt::PublishedObject CachedObjectToPublishedObject( - const CachedObject& object) { - PublishedObject result; - result.metadata = object.metadata; - if (object.payload != nullptr && !object.payload->empty()) { - result.payload = quiche::QuicheMemSlice( - object.payload->data(), object.payload->length(), - [retained_pointer = object.payload](absl::string_view) {}); +bool CachedObject::Append(uint64_t offset, absl::string_view payload) { + absl::MutexLock lock(mutex_); + uint64_t total_length = payload_received_locked(); + if (offset > total_length) { + QUICHE_BUG(cached_object_gap) << "Gap in bytes in CachedObject::Append"; + return false; } - result.fin_after_this = object.fin_after_this; + if (offset + payload.length() > metadata_.payload_length) { + // This object is larger than the declared size. + QUICHE_BUG(cached_object_too_large) + << "Object is larger than the declared size"; + return false; + } + if (offset + payload.length() <= total_length) { + return false; // No new data. + } + payload_.Append(payload.substr(total_length - offset)); + return true; +} + +PublishedObject CachedObject::ToPublishedObject(uint64_t offset) const { + PublishedObject result; + result.metadata = metadata(); + absl::MutexLock lock(mutex_); + uint64_t total_length = payload_received_locked(); + quiche::CordToMemSlicesTo(payload_.Subcord(offset, total_length - offset), + result.payload); + result.fin_after_this = fin_after_this_; return result; } +bool CachedObject::OverlapIsEqual(uint64_t offset, + absl::string_view payload) const { + absl::MutexLock lock(mutex_); + uint64_t total_length = payload_received_locked(); + if (offset >= total_length) { + return true; + } + if (offset + payload.length() <= total_length) { + return payload_.Subcord(offset, payload.length()) == payload; + } + return payload_.Subcord(offset, total_length - offset) == + payload.substr(0, total_length - offset); +} + } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_object.h b/quiche/quic/moqt/moqt_object.h index 87469d9..a465036 100644 --- a/quiche/quic/moqt/moqt_object.h +++ b/quiche/quic/moqt/moqt_object.h
@@ -6,13 +6,19 @@ #define QUICHE_QUIC_MOQT_MOQT_OBJECT_H_ #include <cstdint> -#include <memory> #include <optional> #include <string> +#include <utility> +#include <vector> +#include "absl/base/thread_annotations.h" +#include "absl/strings/cord.h" +#include "absl/strings/string_view.h" +#include "absl/synchronization/mutex.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_types.h" +#include "quiche/common/quiche_cord_utils.h" #include "quiche/common/quiche_mem_slice.h" namespace moqt { @@ -23,6 +29,9 @@ std::string extensions; MoqtObjectStatus status; MoqtPriority publisher_priority; + // The length of the entire payload, which might include data that is not + // present in an encompassing PublishedObject or CachedObject. + uint64_t payload_length; quic::QuicTime arrival_time = quic::QuicTime::Zero(); bool IsMalformed(const PublishedObjectMetadata& other) const { // It's OK for arrival_time to be different when checking immutables. @@ -30,26 +39,70 @@ status != other.status || publisher_priority != other.publisher_priority); } + bool operator==(const PublishedObjectMetadata& other) const = default; }; // PublishedObject is a description of an object that is sufficient to publish // it on a given track. struct PublishedObject { PublishedObjectMetadata metadata; - quiche::QuicheMemSlice payload; + // This could be a partial object, containing the data between the requested + // offset and the end of the data the publisher has on hand. + std::vector<quiche::QuicheMemSlice> payload; bool fin_after_this = false; }; // CachedObject is a version of PublishedObject with a reference counted -// payload. -struct CachedObject { - PublishedObjectMetadata metadata; - std::shared_ptr<quiche::QuicheMemSlice> payload; - bool fin_after_this; // This is the last object before FIN. -}; +// payload. This is thread-safe. +// TODO(martinduke): Allow for the deletion of the front of the payload. The +// number of bytes deleted will have to be subtracted from any offset that the +// caller provides (as well as added in payload_received). +class CachedObject { + public: + CachedObject(const PublishedObjectMetadata& metadata, + quiche::QuicheMemSlice payload, bool fin_after_this) + : metadata_(metadata), + payload_(quiche::MemSliceToCord(std::move(payload))), + fin_after_this_(fin_after_this) {} -// Transforms a CachedObject into a PublishedObject. -PublishedObject CachedObjectToPublishedObject(const CachedObject& object); + // Add |payload| at |offset|. Checks for overlaps in data. Returns false if + // the payload is too large, or there is no new data. + bool Append(uint64_t offset, absl::string_view payload); + // Returns a PublishedObject with only the portion of payload starting at + // |offset|. + PublishedObject ToPublishedObject(uint64_t offset = 0) const; + const PublishedObjectMetadata& metadata() const { return metadata_; } + bool fin_after_this() const ABSL_LOCKS_EXCLUDED(mutex_) { + absl::MutexLock lock(mutex_); + return fin_after_this_; + } + void set_fin_after_this(bool fin) { + absl::MutexLock lock(mutex_); + fin_after_this_ = fin; + } + // This function wraps payload_.size(), both for Mutex purposes, and because + // eventually it will account for memory blocks that have been freed from the + // front of the payload. + uint64_t payload_received() const { + absl::MutexLock lock(mutex_); + return payload_received_locked(); + } + // Returns true if data in payload_ starting at |offset| is equal to + // |payload|, checking only until the offset where one of the two strings + // ends. Returns true if there is no overlap in the offsets. + bool OverlapIsEqual(uint64_t offset, absl::string_view payload) const; + + private: + // TODO(martinduke): Account for memory blocks that have been freed from the + // front of the payload. + uint64_t payload_received_locked() const { return payload_.size(); } + + mutable absl::Mutex mutex_; + const PublishedObjectMetadata metadata_; + absl::Cord payload_; + // If true, this is the last object before FIN. + bool ABSL_GUARDED_BY(mutex_) fin_after_this_; +}; } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_object_test.cc b/quiche/quic/moqt/moqt_object_test.cc new file mode 100644 index 0000000..df9090b --- /dev/null +++ b/quiche/quic/moqt/moqt_object_test.cc
@@ -0,0 +1,271 @@ +// Copyright 2024 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/quic/moqt/moqt_object.h" + +#include <cstring> +#include <string> +#include <utility> +#include <vector> + +#include "absl/strings/string_view.h" +#include "quiche/quic/core/quic_time.h" +#include "quiche/quic/moqt/moqt_types.h" +#include "quiche/common/platform/api/quiche_expect_bug.h" +#include "quiche/common/platform/api/quiche_test.h" +#include "quiche/common/quiche_mem_slice.h" + +namespace moqt { +namespace test { +namespace { + +class CachedObjectTest : public quiche::test::QuicheTest { + public: + PublishedObjectMetadata DefaultMetadata() { + PublishedObjectMetadata metadata; + metadata.location = Location(1, 2); + metadata.subgroup = 3; + metadata.status = MoqtObjectStatus::kNormal; + metadata.publisher_priority = 4; + metadata.payload_length = 10; + return metadata; + } + + std::string Reassemble(const PublishedObject& published) { + std::string result; + for (const auto& slice : published.payload) { + result += std::string(slice.AsStringView()); + } + return result; + } +}; + +TEST_F(CachedObjectTest, Constructor) { + CachedObject object(DefaultMetadata(), quiche::QuicheMemSlice::Copy("abc"), + false); + PublishedObject published = object.ToPublishedObject(); + EXPECT_EQ(published.metadata.location, Location(1, 2)); + EXPECT_EQ(published.payload.size(), 1); + EXPECT_EQ(published.payload[0].AsStringView(), "abc"); + EXPECT_FALSE(published.fin_after_this); +} + +TEST(PublishedObjectMetadataTest, IsMalformed) { + PublishedObjectMetadata metadata; + metadata.location = Location(1, 2); + metadata.subgroup = 3; + metadata.status = MoqtObjectStatus::kNormal; + metadata.publisher_priority = 4; + metadata.payload_length = 10; + + PublishedObjectMetadata other = metadata; + EXPECT_FALSE(metadata.IsMalformed(other)); + + other.location = Location(1, 3); + EXPECT_TRUE(metadata.IsMalformed(other)); + other = metadata; + + other.subgroup = 4; + EXPECT_TRUE(metadata.IsMalformed(other)); + other = metadata; + + other.status = MoqtObjectStatus::kObjectDoesNotExist; + EXPECT_TRUE(metadata.IsMalformed(other)); + other = metadata; + + other.publisher_priority = 5; + EXPECT_TRUE(metadata.IsMalformed(other)); + + // arrival_time, payload_length, and extensions being different should NOT + // make it malformed. + other = metadata; + other.arrival_time = + quic::QuicTime::Zero() + quic::QuicTimeDelta::FromSeconds(1); + EXPECT_FALSE(metadata.IsMalformed(other)); + other.payload_length = 20; + EXPECT_FALSE(metadata.IsMalformed(other)); + other.extensions = "ext"; + EXPECT_FALSE(metadata.IsMalformed(other)); +} + +TEST(PublishedObjectMetadataTest, Equality) { + PublishedObjectMetadata metadata; + metadata.location = Location(1, 2); + metadata.subgroup = 3; + metadata.status = MoqtObjectStatus::kNormal; + metadata.publisher_priority = 4; + metadata.payload_length = 10; + metadata.extensions = "ext"; + metadata.arrival_time = + quic::QuicTime::Zero() + quic::QuicTimeDelta::FromSeconds(1); + + PublishedObjectMetadata other = metadata; + EXPECT_EQ(metadata, other); + + other.location = Location(1, 3); + EXPECT_NE(metadata, other); + other = metadata; + + other.subgroup = 4; + EXPECT_NE(metadata, other); + other = metadata; + + other.status = MoqtObjectStatus::kObjectDoesNotExist; + EXPECT_NE(metadata, other); + other = metadata; + + other.publisher_priority = 5; + EXPECT_NE(metadata, other); + other = metadata; + + other.payload_length = 20; + EXPECT_NE(metadata, other); + other = metadata; + + other.extensions = "something else"; + EXPECT_NE(metadata, other); + other = metadata; + + other.arrival_time = + quic::QuicTime::Zero() + quic::QuicTimeDelta::FromSeconds(2); + EXPECT_NE(metadata, other); +} + +TEST_F(CachedObjectTest, SetFinAfterThis) { + CachedObject object(DefaultMetadata(), quiche::QuicheMemSlice::Copy("abc"), + false); + EXPECT_FALSE(object.fin_after_this()); + object.set_fin_after_this(true); + EXPECT_TRUE(object.fin_after_this()); +} + +TEST_F(CachedObjectTest, Append) { + PublishedObjectMetadata metadata = DefaultMetadata(); + metadata.payload_length = 10; + CachedObject object(metadata, quiche::QuicheMemSlice::Copy("abc"), false); + + // Success: append at the end. + EXPECT_TRUE(object.Append(3, "def")); + + // Success: partial overlap, should append remaining. + EXPECT_TRUE(object.Append(5, "fghi")); // length 4. 5+4 = 9. + + // Failure: gap. + EXPECT_QUICHE_BUG(object.Append(10, "k"), + "Gap in bytes in CachedObject::Append"); + + // Failure: beyond payload_length. + EXPECT_QUICHE_BUG( + object.Append(9, "abc"), + "Object is larger than the declared size"); // 9+3 = 12 > 10. + + // Failure: already received. + EXPECT_FALSE(object.Append(0, "abc")); + + PublishedObject published = object.ToPublishedObject(); + std::string full_payload; + for (const auto& slice : published.payload) { + full_payload += std::string(slice.AsStringView()); + } + EXPECT_EQ(full_payload, "abcdefghi"); +} + +TEST_F(CachedObjectTest, GetPayload) { + PublishedObjectMetadata metadata = DefaultMetadata(); + // We have to use large object fragments to avoid absl::Cord optimizing them + // into a single slice. + const size_t kBlockSize = 1000; + metadata.payload_length = kBlockSize * 3; + std::string object_data(metadata.payload_length, 'a'); + absl::string_view payload = absl::string_view(object_data); + CachedObject object( + metadata, quiche::QuicheMemSlice::Copy(payload.substr(0, kBlockSize)), + false); + object.Append(kBlockSize, payload.substr(kBlockSize, kBlockSize)); + object.Append(2 * kBlockSize, payload.substr(2 * kBlockSize, kBlockSize)); + + // Offset 0: get full payload. + std::string received = Reassemble(object.ToPublishedObject(0)); + EXPECT_EQ(received, payload); + // Offset at slice boundary. + received = Reassemble(object.ToPublishedObject(kBlockSize)); + EXPECT_EQ(received, payload.substr(kBlockSize)); + // Offset in the middle of a slice. + received = Reassemble(object.ToPublishedObject(kBlockSize + 1)); + EXPECT_EQ(received, payload.substr(kBlockSize + 1)); + // Offset beyond the last slice but within payload_length. + received = Reassemble(object.ToPublishedObject(3 * kBlockSize)); + EXPECT_EQ(received, ""); + // Offset way beyond. + received = Reassemble(object.ToPublishedObject(4 * kBlockSize)); + EXPECT_EQ(object.payload_received(), 3 * kBlockSize); +} + +TEST_F(CachedObjectTest, ToPublishedObjectReferenceCounting) { + CachedObject object(DefaultMetadata(), quiche::QuicheMemSlice::Copy("abc"), + false); + PublishedObject published = object.ToPublishedObject(); + EXPECT_EQ(published.payload[0].AsStringView(), "abc"); + + // Even if we append more, the old published object's slices should remain + // valid. + object.Append(3, "def"); + EXPECT_EQ(published.payload[0].AsStringView(), "abc"); + EXPECT_EQ(published.payload.size(), 1); +} + +TEST_F(CachedObjectTest, OverlapIsEqual) { + PublishedObjectMetadata metadata = DefaultMetadata(); + metadata.payload_length = 20; + CachedObject object(metadata, quiche::QuicheMemSlice::Copy("abcdefghij"), + false); + // payload_ covers [0, 10). + + // No overlap. + EXPECT_TRUE(object.OverlapIsEqual(10, "klm")); + EXPECT_TRUE(object.OverlapIsEqual(15, "xyz")); + + // Exact match. + EXPECT_TRUE(object.OverlapIsEqual(0, "abcdefghij")); + EXPECT_TRUE(object.OverlapIsEqual(5, "fghij")); + + // Partial overlap, matches. + EXPECT_TRUE(object.OverlapIsEqual(0, "abcde")); + EXPECT_TRUE(object.OverlapIsEqual(8, "ijmnop")); // Overlap is "ij", matches. + + // Partial overlap, mismatch. + EXPECT_FALSE(object.OverlapIsEqual(0, "axcde")); + EXPECT_FALSE( + object.OverlapIsEqual(8, "ixmnop")); // Overlap is "ij" vs "ix", mismatch + + // Overlap beyond end of payload_, matches. + EXPECT_TRUE( + object.OverlapIsEqual(5, "fghijXXXXX")); // Overlap is "fghij", matches. + + // Empty payload argument. + EXPECT_TRUE(object.OverlapIsEqual(5, "")); +} + +TEST_F(CachedObjectTest, OverlapIsEqualMultiSlice) { + PublishedObjectMetadata metadata = DefaultMetadata(); + metadata.payload_length = 20; + CachedObject object(metadata, quiche::QuicheMemSlice::Copy("abc"), false); + object.Append(3, "def"); + object.Append(6, "ghi"); + + // Overlap across two slices: "bcde" (indices 1 to 5) + EXPECT_TRUE(object.OverlapIsEqual(1, "bcde")); + EXPECT_FALSE(object.OverlapIsEqual(1, "bcxe")); + + // Overlap across three slices: "bcdefgh" + EXPECT_TRUE(object.OverlapIsEqual(1, "bcdefgh")); + EXPECT_FALSE(object.OverlapIsEqual(1, "bcdefgx")); + + // Overlap starting exactly at boundary + EXPECT_TRUE(object.OverlapIsEqual(3, "defg")); +} + +} // namespace +} // namespace test +} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc index 3abeaaa..71b3e39 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -17,7 +17,6 @@ #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" -#include "quiche/quic/moqt/moqt_stream_map.h" #include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/quiche_mem_slice.h" @@ -75,11 +74,15 @@ quiche::QuicheMemSlice payload) { Location sequence{current_group_id_, queue_.back().size()}; bool fin = status == MoqtObjectStatus::kEndOfGroup; - queue_.back().push_back(CachedObject{ - PublishedObjectMetadata{sequence, 0, "", status, - default_publisher_priority(), - clock_->ApproximateNow()}, - std::make_shared<quiche::QuicheMemSlice>(std::move(payload)), fin}); + PublishedObjectMetadata metadata{sequence, + 0, + "", + status, + default_publisher_priority(), + payload.length(), + clock_->ApproximateNow()}; + queue_.back().push_back( + std::make_unique<CachedObject>(metadata, std::move(payload), fin)); for (MoqtObjectListener* listener : listeners_) { listener->OnNewObjectAvailable(sequence, /*subgroup=*/0, default_publisher_priority()); @@ -87,7 +90,8 @@ } std::optional<PublishedObject> MoqtOutgoingQueue::GetCachedObject( - uint64_t group, std::optional<uint64_t> subgroup, uint64_t object) const { + uint64_t group, std::optional<uint64_t> subgroup, uint64_t object, + uint64_t offset) const { QUICHE_DCHECK(subgroup.has_value() && subgroup == 0u); if (group < first_group_in_queue()) { return std::nullopt; @@ -95,24 +99,24 @@ if (group > current_group_id_) { return std::nullopt; } - const std::vector<CachedObject>& group_objects = + const std::vector<std::unique_ptr<CachedObject>>& group_objects = queue_[group - first_group_in_queue()]; if (object >= group_objects.size()) { return std::nullopt; } QUICHE_DCHECK(Location(group, object) == - group_objects[object].metadata.location); - return CachedObjectToPublishedObject(group_objects[object]); + group_objects[object]->metadata().location); + return group_objects[object]->ToPublishedObject(offset); } std::vector<Location> MoqtOutgoingQueue::GetCachedObjectsInRange( Location start, Location end) const { std::vector<Location> sequences; for (const Group& group : queue_) { - for (const CachedObject& object : group) { - if (object.metadata.location >= start && - object.metadata.location <= end) { - sequences.push_back(object.metadata.location); + for (const std::unique_ptr<CachedObject>& object : group) { + if (object->metadata().location >= start && + object->metadata().location <= end) { + sequences.push_back(object->metadata().location); } } }
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h index 2cc9309..2d64c0d 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.h +++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -58,8 +58,8 @@ // MoqtTrackPublisher implementation. const FullTrackName& GetTrackName() const override { return track_; } std::optional<PublishedObject> GetCachedObject( - uint64_t group, std::optional<uint64_t> subgroup, - uint64_t min_object) const override; + uint64_t group, std::optional<uint64_t> subgroup, uint64_t min_object, + uint64_t offset = 0) const override; void AddObjectListener(MoqtObjectListener* listener) override { listeners_.insert(listener); listener->OnSubscribeAccepted(); @@ -158,7 +158,9 @@ absl::Status status_ = absl::OkStatus(); }; - using Group = std::vector<CachedObject>; + // CachedObject is non-movable, so we have to use unique_ptr for pointer + // stability. + using Group = std::vector<std::unique_ptr<CachedObject>>; // Appends an object to the end of the current group. void AddRawObject(MoqtObjectStatus status, quiche::QuicheMemSlice payload);
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc index 07b5a5f..b091536 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -65,7 +65,7 @@ if (object->metadata.status == MoqtObjectStatus::kNormal) { PublishObject(object->metadata.location.group, object->metadata.location.object, - object->payload.AsStringView()); + object->payload[0].AsStringView()); } else { CloseStreamForGroup(object->metadata.location.group); } @@ -110,7 +110,7 @@ switch (result) { case MoqtFetchTask::kSuccess: if (object.metadata.status == MoqtObjectStatus::kNormal) { - objects.emplace_back(object.payload.AsStringView()); + objects.emplace_back(object.payload[0].AsStringView()); } else { EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kEndOfGroup); }
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h index 2d13655..f978df2 100644 --- a/quiche/quic/moqt/moqt_publisher.h +++ b/quiche/quic/moqt/moqt_publisher.h
@@ -36,9 +36,10 @@ // This could happen synchronously or asynchronously. virtual void OnSubscribeRejected(MoqtRequestErrorInfo info) = 0; - // Notifies that a new object is available on the track. The object payload - // itself may be retrieved via GetCachedObject method of the associated track - // publisher. If |subgroup| is nullopt, the object is a datagram. + // Notifies that a new object is available on the track, or new payload of + // a partially delivered object. The object payload itself may be retrieved + // via GetCachedObject method of the associated track publisher. If |subgroup| + // is nullopt, the object is a datagram. virtual void OnNewObjectAvailable(Location sequence, std::optional<uint64_t> subgroup, MoqtPriority publisher_priority) = 0; @@ -85,8 +86,8 @@ // This method returns nullopt if the object is not currently available. // If |subgroup| is nullopt, the object is a datagram. virtual std::optional<PublishedObject> GetCachedObject( - uint64_t group, std::optional<uint64_t> subgroup, - uint64_t min_object) const = 0; + uint64_t group, std::optional<uint64_t> subgroup, uint64_t min_object, + uint64_t offset = 0) const = 0; // Registers a listener with the track. The listener will be notified of all // newly arriving objects. The pointer to the listener must be valid until
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.cc b/quiche/quic/moqt/moqt_relay_track_publisher.cc index 6d76d65..797febe 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher.cc +++ b/quiche/quic/moqt/moqt_relay_track_publisher.cc
@@ -5,7 +5,6 @@ #include "quiche/quic/moqt/moqt_relay_track_publisher.h" #include <cstdint> -#include <memory> #include <optional> #include <utility> #include <variant> @@ -15,7 +14,6 @@ #include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_key_value_pair.h" -#include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_publisher.h" @@ -64,13 +62,13 @@ void MoqtRelayTrackPublisher::OnObjectFragment( const FullTrackName& full_track_name, const PublishedObjectMetadata& metadata, absl::string_view object, - bool end_of_message) { + uint64_t offset) { if (is_closing_) { return; } - if (!end_of_message) { - QUICHE_BUG(moqt_relay_track_publisher_got_fragment) - << "Received a fragment of an object."; + if (object.empty() && offset > 0) { + QUICHE_BUG(moqt_relay_track_publisher_empty_payload) + << "Empty payload for partial object " << metadata.location; return; } bool last_object_in_stream = false; @@ -127,13 +125,17 @@ } CachedObject* duplicate_object = nullptr; if (!metadata.subgroup.has_value()) { // It's a datagram. - std::shared_ptr<quiche::QuicheMemSlice> slice; + if (offset != 0 || object.length() != metadata.payload_length) { + QUICHE_BUG(moqt_relay_track_publisher_incomplete_datagram) + << "Received a partial datagram."; + return; + } + quiche::QuicheMemSlice slice; if (!object.empty()) { - slice = std::make_shared<quiche::QuicheMemSlice>( - quiche::QuicheMemSlice::Copy(object)); + slice = quiche::QuicheMemSlice::Copy(object); } auto [it, inserted] = group.datagrams.try_emplace( - metadata.location.object, CachedObject{metadata, slice, false}); + metadata.location.object, metadata, std::move(slice), false); if (!inserted) { duplicate_object = &it->second; } @@ -142,13 +144,13 @@ auto& subgroup = subgroup_it.first->second; if (!subgroup.empty()) { // Check if the new object is valid CachedObject& last_object = subgroup.rbegin()->second; - if (last_object.metadata.publisher_priority != + if (last_object.metadata().publisher_priority != metadata.publisher_priority) { QUICHE_DLOG(INFO) << "Publisher priority changing in a subgroup"; OnMalformedTrack(full_track_name); return; } - if (last_object.fin_after_this) { + if (last_object.fin_after_this()) { QUICHE_DLOG(INFO) << "Skipping object because it is after the end of " << "the subgroup"; OnMalformedTrack(full_track_name); @@ -157,9 +159,9 @@ // If last_object has stream-ending status, it should have been caught by // the fin_after_this check above. QUICHE_DCHECK( - last_object.metadata.status != MoqtObjectStatus::kEndOfGroup && - last_object.metadata.status != MoqtObjectStatus::kEndOfTrack); - if (last_object.metadata.location.object > metadata.location.object) { + last_object.metadata().status != MoqtObjectStatus::kEndOfGroup && + last_object.metadata().status != MoqtObjectStatus::kEndOfTrack); + if (last_object.metadata().location.object > metadata.location.object) { QUICHE_DLOG(INFO) << "Skipping object because it decreases the " << "object ID in the subgroup."; return; @@ -170,29 +172,41 @@ // Anticipate stream FIN. last_object_in_stream = true; } - std::shared_ptr<quiche::QuicheMemSlice> slice; + quiche::QuicheMemSlice slice; if (!object.empty()) { - slice = std::make_shared<quiche::QuicheMemSlice>( - quiche::QuicheMemSlice::Copy(object)); + slice = quiche::QuicheMemSlice::Copy(object); } - auto [it, inserted] = subgroup.try_emplace( - metadata.location.object, - CachedObject{metadata, slice, last_object_in_stream}); + auto [it, inserted] = + subgroup.try_emplace(metadata.location.object, metadata, + std::move(slice), last_object_in_stream); if (!inserted) { duplicate_object = &it->second; } } if (duplicate_object != nullptr) { - if (metadata.IsMalformed(duplicate_object->metadata)) { + if (metadata.IsMalformed(duplicate_object->metadata())) { // Something besides the arrival time and extension headers changed. OnMalformedTrack(full_track_name); return; } - // TODO(b/467718801): Fix this when the class supports partial object - // delivery. When objects are complete, we can simply compare payloads. - if (duplicate_object->payload->AsStringView() != object) { + if (!duplicate_object->OverlapIsEqual(offset, object)) { OnMalformedTrack(full_track_name); + return; } + // This could complete an incomplete object. + if (duplicate_object->metadata().payload_length > + duplicate_object->payload_received()) { + if (!duplicate_object->Append(offset, object)) { + return; + } + // Data added to the object. Notify listeners. + for (MoqtObjectListener* listener : listeners_) { + listener->OnNewObjectAvailable(metadata.location, metadata.subgroup, + metadata.publisher_priority); + } + return; + } + // No need to update state. return; } @@ -267,9 +281,10 @@ return; } CachedObject& last_object = subgroup_it->second.rbegin()->second; - last_object.fin_after_this = true; + last_object.set_fin_after_this(true); for (MoqtObjectListener* listener : listeners_) { - listener->OnNewFinAvailable(last_object.metadata.location, stream.subgroup); + listener->OnNewFinAvailable(last_object.metadata().location, + stream.subgroup); } } @@ -286,7 +301,7 @@ std::optional<PublishedObject> MoqtRelayTrackPublisher::GetCachedObject( uint64_t group_id, std::optional<uint64_t> subgroup_id, - uint64_t min_object_id) const { + uint64_t min_object_id, uint64_t offset) const { auto group_it = queue_.find(group_id); if (group_it == queue_.end()) { // Group does not exist. @@ -299,7 +314,7 @@ // No object after the last one received. return std::nullopt; } - return CachedObjectToPublishedObject(object_it->second); + return object_it->second.ToPublishedObject(); } auto subgroup_it = group.subgroups.find(*subgroup_id); if (subgroup_it == group.subgroups.end()) { @@ -316,7 +331,7 @@ // No object after the last one received. return std::nullopt; } - return CachedObjectToPublishedObject(object_it->second); + return object_it->second.ToPublishedObject(offset); } void MoqtRelayTrackPublisher::AddObjectListener(MoqtObjectListener* listener) {
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.h b/quiche/quic/moqt/moqt_relay_track_publisher.h index e10e5ab..2016a20 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher.h +++ b/quiche/quic/moqt/moqt_relay_track_publisher.h
@@ -7,6 +7,7 @@ #include <cstddef> #include <cstdint> +#include <map> #include <memory> #include <optional> #include <utility> @@ -75,7 +76,7 @@ void OnCanAckObjects(MoqtObjectAckFunction /*ack_function*/) override {} void OnObjectFragment(const FullTrackName& full_track_name, const PublishedObjectMetadata& metadata, - absl::string_view object, bool end_of_message) override; + absl::string_view object, uint64_t offset) override; void OnPublishDone(FullTrackName full_track_name) override; void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override { DeleteTrack(); @@ -87,7 +88,7 @@ const FullTrackName& GetTrackName() const override { return track_; } std::optional<PublishedObject> GetCachedObject( uint64_t group_id, std::optional<uint64_t> subgroup_id, - uint64_t min_object) const override; + uint64_t min_object, uint64_t offset = 0) const override; void AddObjectListener(MoqtObjectListener* listener) override; void RemoveObjectListener(MoqtObjectListener* listener) override; std::optional<Location> largest_location() const override; @@ -122,13 +123,13 @@ void DeleteTrack(); // Ordered by object id. - using Subgroup = absl::btree_map<uint64_t, CachedObject>; + using Subgroup = std::map<uint64_t, CachedObject>; struct Group { uint64_t next_object = 0; bool complete = false; // If true, kEndOfGroup has been received. absl::btree_map<uint64_t, Subgroup> subgroups; // Ordered by subgroup id. - absl::btree_map<uint64_t, CachedObject> datagrams; + std::map<uint64_t, CachedObject> datagrams; }; bool is_closing_ = false;
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc index 8861e7a..32ba149 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc +++ b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
@@ -7,12 +7,12 @@ #include <cstdint> #include <memory> #include <optional> +#include <string> #include "absl/strings/string_view.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_key_value_pair.h" -#include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" @@ -20,6 +20,7 @@ #include "quiche/quic/moqt/moqt_session_interface.h" #include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/moqt/test_tools/mock_moqt_session.h" +#include "quiche/common/platform/api/quiche_expect_bug.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/web_transport/web_transport.h" @@ -81,8 +82,9 @@ } publisher_.OnObjectFragment( kTrackName, - PublishedObjectMetadata{location, subgroup, "", status, 128}, payload, - true); + PublishedObjectMetadata{location, subgroup, "", status, 128, + payload.length()}, + payload, /*offset=*/0); std::optional<PublishedObject> object = publisher_.GetCachedObject(location.group, subgroup, location.object); ASSERT_TRUE(object.has_value()); @@ -91,7 +93,11 @@ EXPECT_EQ(object->metadata.subgroup, subgroup); EXPECT_EQ(object->metadata.status, status); EXPECT_EQ(object->metadata.publisher_priority, 128); - EXPECT_EQ(object->payload.AsStringView(), payload); + std::string full_payload; + for (const auto& slice : object->payload) { + full_payload += slice.AsStringView(); + } + EXPECT_EQ(full_payload, payload); EXPECT_EQ(object->fin_after_this, fin_after_this); } } @@ -167,8 +173,8 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{Location(group, 0), 0, "", - MoqtObjectStatus::kEndOfGroup, 128}, - "", true); + MoqtObjectStatus::kEndOfGroup, 128, 0}, + "", /*offset=*/0); } } @@ -182,8 +188,9 @@ EXPECT_CALL(*session_, Unsubscribe(kTrackName)); publisher_.OnObjectFragment( kTrackName, - PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 128}, - "object", true); + PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 128, + 6}, + "object", /*offset=*/0); EXPECT_TRUE(track_deleted_); } @@ -198,8 +205,8 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{first_location, 0, "", - MoqtObjectStatus::kEndOfTrack, 128}, - "", true); + MoqtObjectStatus::kEndOfTrack, 128, 0}, + "", /*offset=*/0); EXPECT_TRUE(track_deleted_); } @@ -214,8 +221,8 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, 1, "", MoqtObjectStatus::kEndOfGroup, - 128}, - "object", true); + 128, 6}, + "object", /*offset=*/0); EXPECT_TRUE(track_deleted_); } @@ -230,8 +237,8 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{first_location, 1, "", - MoqtObjectStatus::kEndOfGroup, 128}, - "", true); + MoqtObjectStatus::kEndOfGroup, 128, 0}, + "", /*offset=*/0); EXPECT_TRUE(track_deleted_); } @@ -245,8 +252,9 @@ EXPECT_CALL(*session_, Unsubscribe(kTrackName)); publisher_.OnObjectFragment( kTrackName, - PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 200}, - "object", true); + PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 200, + 6}, + "object", /*offset=*/0); EXPECT_TRUE(track_deleted_); } @@ -262,8 +270,9 @@ EXPECT_CALL(*session_, Unsubscribe(kTrackName)); publisher_.OnObjectFragment( kTrackName, - PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 128}, - "object", true); + PublishedObjectMetadata{location, 0, "", MoqtObjectStatus::kNormal, 128, + 6}, + "object", /*offset=*/0); EXPECT_TRUE(track_deleted_); } #endif @@ -278,8 +287,8 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{first_location, 0, "", MoqtObjectStatus::kNormal, - 128}, - "object", true); + 128, 6}, + "object", /*offset=*/0); // Object is simply ignored; track is not malformed. EXPECT_FALSE(track_deleted_); } @@ -360,8 +369,8 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, - 128}, - "object", /*end_of_message=*/true); + 128, 6}, + "object", /*offset=*/0); // Exact duplicate is ignored. It doesn't matter that the arrival time // changed. EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0); @@ -370,8 +379,8 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, - 128, quic::QuicTime::Infinite()}, - "object", /*end_of_message=*/true); + 128, 6, quic::QuicTime::Infinite()}, + "object", /*offset=*/0); } TEST_F(MoqtRelayTrackPublisherTest, DuplicateObjectChangedMetadata) { @@ -383,16 +392,16 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, - 128}, - "object", /*end_of_message=*/true); + 128, 6}, + "object", /*offset=*/0); // Priority change; malformed track. EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0); EXPECT_CALL(listener_, OnTrackPublisherGone); publisher_.OnObjectFragment( kTrackName, - PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, - 64}, - "object", /*end_of_message=*/true); + PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, 64, + 6}, + "object", /*offset=*/0); EXPECT_TRUE(track_deleted_); } @@ -405,16 +414,16 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, - 128}, - "payload", /*end_of_message=*/true); + 128, 7}, + "payload", /*offset=*/0); // Payload change; malformed track. EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0); EXPECT_CALL(listener_, OnTrackPublisherGone); publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, - 128}, - "foobar", /*end_of_message=*/true); + 128, 6}, + "foobar", /*offset=*/0); EXPECT_TRUE(track_deleted_); } @@ -460,13 +469,104 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, std::nullopt, "", - MoqtObjectStatus::kNormal, 128}, - "object", /*end_of_message=*/true); + MoqtObjectStatus::kNormal, 128, 6}, + "object", /*offset=*/0); std::optional<PublishedObject> object = publisher_.GetCachedObject(location.group, std::nullopt, 0); EXPECT_TRUE(object.has_value() && !object->metadata.subgroup.has_value()); } +TEST_F(MoqtRelayTrackPublisherTest, ObjectArrivalInFragments) { + SubscribeAndOk(); + Location location = kLargestLocation.Next(); + uint64_t subgroup = 0; + // Total size is 15 bytes. + PublishedObjectMetadata metadata = { + location, subgroup, "", MoqtObjectStatus::kNormal, 128, 15}; + + // Fragment 1 arrives. + EXPECT_CALL(listener_, + OnNewObjectAvailable(location, Optional(subgroup), 128)); + publisher_.OnObjectFragment(kTrackName, metadata, "frag1", 0); + + // Fragment 2 arrives. + EXPECT_CALL(listener_, + OnNewObjectAvailable(location, Optional(subgroup), 128)); + publisher_.OnObjectFragment(kTrackName, metadata, "frag2", 5); + + // Session retrieves the object with two fragments. + std::optional<PublishedObject> object = + publisher_.GetCachedObject(location.group, subgroup, location.object, 0); + ASSERT_TRUE(object.has_value()); + std::string payload; + for (const auto& slice : object->payload) { + payload += std::string(slice.AsStringView()); + } + EXPECT_EQ(payload, "frag1frag2"); + + // Fragment 3 arrives. + EXPECT_CALL(listener_, + OnNewObjectAvailable(location, Optional(subgroup), 128)); + publisher_.OnObjectFragment(kTrackName, metadata, "frag3", 10); + + // Third fragment retrieved separately. + object = + publisher_.GetCachedObject(location.group, subgroup, location.object, 10); + ASSERT_TRUE(object.has_value()); + payload.clear(); + for (const auto& slice : object->payload) { + payload += std::string(slice.AsStringView()); + } + EXPECT_EQ(payload, "frag3"); +} + +TEST_F(MoqtRelayTrackPublisherTest, IncompleteDatagram) { + SubscribeAndOk(); + Location location = kLargestLocation.Next(); + PublishedObjectMetadata metadata = { + location, std::nullopt, "", MoqtObjectStatus::kNormal, 128, 10}; + // Fragment length mismatch. + EXPECT_QUICHE_BUG( + publisher_.OnObjectFragment(kTrackName, metadata, "short", 0), + "Received a partial datagram."); + // Non-zero offset for datagram. + EXPECT_QUICHE_BUG( + publisher_.OnObjectFragment(kTrackName, metadata, "payload10", 1), + "Received a partial datagram."); +} + +TEST_F(MoqtRelayTrackPublisherTest, AlreadyReceivedFragment) { + SubscribeAndOk(); + Location location = kLargestLocation.Next(); + uint64_t subgroup = 0; + // Total size is 15 bytes. + PublishedObjectMetadata metadata = { + location, subgroup, "", MoqtObjectStatus::kNormal, 128, 15}; + + // Fragment 1 arrives (first 10 bytes). + EXPECT_CALL(listener_, + OnNewObjectAvailable(location, Optional(subgroup), 128)); + publisher_.OnObjectFragment(kTrackName, metadata, "0123456789", 0); + + // Send a fragment that has already been fully received. + // Partial overlap, matches earlier data. Append() will be called with + // (0, "01234"). + // Since payload_received_ (10) > 0 + 5, Append() returns false. + // OnObjectFragment should just return without notifying listeners. + EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0); + publisher_.OnObjectFragment(kTrackName, metadata, "01234", 0); + + std::optional<PublishedObject> object = + publisher_.GetCachedObject(location.group, subgroup, location.object, 0); + ASSERT_TRUE(object.has_value()); + // Verify that only the first 10 bytes are cached. + std::string payload; + for (const auto& slice : object->payload) { + payload += std::string(slice.AsStringView()); + } + EXPECT_EQ(payload, "0123456789"); +} + } // namespace } // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 499666d..f48d4c1 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -30,6 +30,7 @@ #include "quiche/quic/core/quic_alarm_factory.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/core/quic_types.h" +#include "quiche/quic/core/quic_utils.h" #include "quiche/quic/moqt/moqt_bidi_stream.h" #include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_fetch_task.h" @@ -223,9 +224,10 @@ metadata.subgroup = std::nullopt; metadata.status = message.object_status; metadata.publisher_priority = message.publisher_priority; + metadata.payload_length = payload->size(); metadata.arrival_time = callbacks_.clock->Now(); visitor->OnObjectFragment(track->full_track_name(), metadata, *payload, - true); + /*offset=*/0); } } @@ -666,6 +668,23 @@ << "Got Non-normal object in FETCH"; continue; } + if (last_object_.has_value() && + object.metadata.location == last_object_->location) { + // This is the continuation of the previous object. + webtransport::StreamWriteOptions options; + absl::Status write_status = + stream_->Writev(absl::MakeSpan(object.payload), options); + if (!write_status.ok()) { + QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed) + << "Writing into MoQT stream failed despite CanWrite() being " + "true before; status: " + << write_status; + fetch->session_->Error(MoqtError::kInternalError, + "Data stream write error"); + return; + } + break; + } if (fetch->session_->WriteObjectToStream( stream_, fetch->request_id(), object.metadata, std::move(object.payload), MoqtDataStreamType::Fetch(), @@ -1666,6 +1685,9 @@ payload = absl::string_view(partial_object_); } } + if (payload.empty() && bytes_received_this_object_ > 0 && !end_of_message) { + return; // Nothing arrived. + } if (!parser_.stream_type().has_value()) { QUICHE_BUG(quic_bug_object_with_no_stream_type) << "Object delivered without a stream type"; @@ -1728,9 +1750,11 @@ metadata.extensions = message.extension_headers; metadata.status = message.object_status; metadata.publisher_priority = message.publisher_priority; + metadata.payload_length = message.payload_length; metadata.arrival_time = session_->callbacks_.clock->Now(); subscribe->visitor()->OnObjectFragment(track->full_track_name(), metadata, - payload, end_of_message); + payload, + bytes_received_this_object_); } } else { // FETCH track->OnObjectOrOk(); @@ -1758,6 +1782,11 @@ task->AppendPayloadToObject(payload); } } + if (end_of_message) { + bytes_received_this_object_ = 0; + } else { + bytes_received_this_object_ += payload.size(); + } partial_object_.clear(); } @@ -1808,12 +1837,13 @@ return; } if (task->HasObject() && !task->NeedsMorePayload()) { - return; + return; // The message is complete. Do not read more. } + uint64_t start_length = task->payload_length(); parser_.ReadAtMostOneObject(); // If it read an object, it called OnObjectMessage and may have altered the // task's object state. - if (task->HasObject() && !task->NeedsMorePayload()) { + if (task->payload_length() > start_length) { task->NotifyNewObject(); } } @@ -2364,11 +2394,16 @@ PublishedSubscription& subscription) { while (stream_->CanWrite()) { std::optional<PublishedObject> object = - subscription.publisher().GetCachedObject(index_.group, index_.subgroup, - next_object_); + subscription.publisher().GetCachedObject( + index_.group, index_.subgroup, next_object_, already_delivered_); if (!object.has_value()) { break; } + if (object->metadata.payload_length > 0 && object->payload.empty()) { + QUICHE_BUG(OutgoingDataStream_empty_payload) + << "Received non-empty object with no payload"; + return; + } QUICHE_DCHECK_EQ(object->metadata.location.group, index_.group); QUICHE_DCHECK(object->metadata.subgroup == index_.subgroup); @@ -2390,19 +2425,50 @@ stream_->ResetWithUserCode(kResetCodeDeliveryTimeout); return; } - if (!session_->WriteObjectToStream(stream_, subscription.track_alias(), - object->metadata, - std::move(object->payload), stream_type_, - last_object_, object->fin_after_this)) { - // WriteObjectToStream() closes the connection on error, meaning that - // there is no need to process the stream any further. + uint64_t start_offset = already_delivered_; + already_delivered_ += + quic::MemSliceSpanTotalSize(absl::MakeSpan(object->payload)); + bool fin_after_this = object->fin_after_this && + already_delivered_ == object->metadata.payload_length; + if (start_offset > 0) { // Just send payload. + if (already_delivered_ == start_offset) { + // Partial delivery of an object but the payload is empty. This would + // result in an infinite loop. + QUICHE_BUG(OutgoingDataStream_empty_payload) + << "Empty payload for partial object " << object->metadata.location; + return; + } + webtransport::StreamWriteOptions options; + options.set_send_fin(fin_after_this); + absl::Status write_status = + stream_->Writev(absl::MakeSpan(object->payload), options); + if (!write_status.ok()) { + QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed) + << "Writing into MoQT stream failed despite CanWrite() being true " + "before; status: " + << write_status; + session_->Error(MoqtError::kInternalError, "Data stream write error"); + return; + } + } else { + if (!session_->WriteObjectToStream( + stream_, subscription.track_alias(), object->metadata, + std::move(object->payload), stream_type_, last_object_, + fin_after_this)) { + // WriteObjectToStream() closes the connection on error, meaning that + // there is no need to process the stream any further. + return; + } + last_object_ = object->metadata; + subscription.OnObjectSent(object->metadata.location); + next_object_ = last_object_->location.object; + } + if (already_delivered_ == last_object_->payload_length) { + ++next_object_; + already_delivered_ = 0; + } else { return; } - last_object_ = object->metadata; - - next_object_ = last_object_->location.object + 1; - subscription.OnObjectSent(object->metadata.location); - if (object->fin_after_this && !delivery_timeout.IsInfinite() && !session_->alternate_delivery_timeout_) { CreateAndSetAlarm(object->metadata.arrival_time + delivery_timeout); @@ -2433,9 +2499,9 @@ bool MoqtSession::WriteObjectToStream( webtransport::Stream* stream, uint64_t id, - const PublishedObjectMetadata& metadata, quiche::QuicheMemSlice payload, - MoqtDataStreamType type, std::optional<PublishedObjectMetadata> last_object, - bool fin) { + const PublishedObjectMetadata& metadata, + std::vector<quiche::QuicheMemSlice> payload, MoqtDataStreamType type, + std::optional<PublishedObjectMetadata> last_object, bool fin) { QUICHE_DCHECK(stream->CanWrite()); MoqtObject header; header.track_alias = id; @@ -2445,14 +2511,18 @@ header.publisher_priority = metadata.publisher_priority; header.extension_headers = metadata.extensions; header.object_status = metadata.status; - header.payload_length = payload.length(); + header.payload_length = metadata.payload_length; quiche::QuicheBuffer serialized_header = framer_.SerializeObjectHeader(header, type, last_object); // TODO(vasilvv): add a version of WebTransport write API that accepts // memslices so that we can avoid a copy here. - std::array write_vector = { - quiche::QuicheMemSlice(std::move(serialized_header)), std::move(payload)}; + std::vector<quiche::QuicheMemSlice> write_vector; + write_vector.reserve(payload.size() + 1); + write_vector.push_back(quiche::QuicheMemSlice(std::move(serialized_header))); + for (auto& slice : payload) { + write_vector.push_back(std::move(slice)); + } webtransport::StreamWriteOptions options; options.set_send_fin(fin); absl::Status write_status = @@ -2553,9 +2623,9 @@ header.extension_headers = object->metadata.extensions; header.object_status = object->metadata.status; header.subgroup_id = std::nullopt; - header.payload_length = object->payload.length(); + header.payload_length = object->metadata.payload_length; quiche::QuicheBuffer datagram = session_->framer_.SerializeObjectDatagram( - header, object->payload.AsStringView(), + header, object->payload[0].AsStringView(), default_publisher_priority_.value_or(kDefaultPublisherPriority)); session_->session_->SendOrQueueDatagram(datagram.AsStringView()); OnObjectSent(object->metadata.location);
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 13df94e..7c8d30e 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -359,6 +359,7 @@ quiche::QuicheWeakPtr<RemoteTrack> track_; MoqtDataParser parser_; std::string partial_object_; + uint64_t bytes_received_this_object_ = 0; }; // Represents a record for a single subscription to a local track that is // being sent to the peer. @@ -580,6 +581,9 @@ // exact ID of the next object in the stream because the next object could // be in a different subgroup or simply be skipped. uint64_t next_object_; + // Number of payload bytes from next_object_ that has already been written + // to the stream. + uint64_t already_delivered_ = 0; // Used in subgroup streams to compute the object ID diff. If nullopt, the // stream header has not been written yet. std::optional<PublishedObjectMetadata> last_object_; @@ -753,7 +757,7 @@ // the write was successful. bool WriteObjectToStream(webtransport::Stream* stream, uint64_t id, const PublishedObjectMetadata& metadata, - quiche::QuicheMemSlice payload, + std::vector<quiche::QuicheMemSlice> payload, MoqtDataStreamType type, std::optional<PublishedObjectMetadata> last_object, bool fin);
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h index 1df78b3..af1d6d0 100644 --- a/quiche/quic/moqt/moqt_session_interface.h +++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -41,6 +41,8 @@ MoqtSessionParameters() = default; explicit MoqtSessionParameters(quic::Perspective perspective) : perspective(perspective), using_webtrans(true) {} + explicit MoqtSessionParameters(bool deliver_partial_objects) + : deliver_partial_objects(deliver_partial_objects) {} MoqtSessionParameters(quic::Perspective perspective, std::string path, std::string authority) : perspective(perspective), @@ -101,8 +103,7 @@ // Called when an object fragment (or an entire object) is received. virtual void OnObjectFragment(const FullTrackName& full_track_name, const PublishedObjectMetadata& metadata, - absl::string_view object, - bool end_of_message) = 0; + absl::string_view object, uint64_t offset) = 0; virtual void OnPublishDone(FullTrackName full_track_name) = 0; // Called when the track is malformed per Section 2.5 of // draft-ietf-moqt-moq-transport-12. If the application is a relay, it MUST
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index ecf0e53..e5336cc 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -13,6 +13,7 @@ #include <string> #include <utility> #include <variant> +#include <vector> #include "absl/base/casts.h" #include "absl/memory/memory.h" @@ -74,6 +75,12 @@ constexpr MoqtPriority kDefaultPublisherPriority = 0x80; const TrackExtensions kNoExtensions; +std::vector<quiche::QuicheMemSlice> PayloadFromString(absl::string_view s) { + std::vector<quiche::QuicheMemSlice> payload; + payload.push_back(quiche::QuicheMemSlice::Copy(s)); + return payload; +} + FullTrackName kDefaultTrackName() { return FullTrackName("foo", "bar"); } MoqtSubscribe DefaultSubscribe(uint64_t request_id) { @@ -1155,16 +1162,16 @@ EXPECT_CALL(remote_track_visitor_, OnObjectFragment) .WillOnce([&](const FullTrackName& track_name, const PublishedObjectMetadata& metadata, - const absl::string_view received_payload, - bool end_of_message) { + const absl::string_view received_payload, uint64_t offset) { EXPECT_EQ(track_name, ftn); EXPECT_EQ(metadata.location, Location(0, 0)); EXPECT_EQ(metadata.subgroup, 0); EXPECT_EQ(metadata.extensions, "foo"); EXPECT_EQ(metadata.status, MoqtObjectStatus::kNormal); EXPECT_EQ(metadata.publisher_priority, 0); + EXPECT_EQ(metadata.payload_length, payload.length()); EXPECT_EQ(payload, received_payload); - EXPECT_TRUE(end_of_message); + EXPECT_EQ(offset, 0); }); EXPECT_CALL(mock_stream_, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); @@ -1220,11 +1227,16 @@ std::unique_ptr<MoqtDataParserVisitor> object_stream = MoqtSessionPeer::CreateIncomingDataStream(&session, &mock_stream_, kDefaultSubgroupStreamType); - - EXPECT_CALL(remote_track_visitor_, OnObjectFragment).Times(2); EXPECT_CALL(mock_stream_, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); + EXPECT_CALL(remote_track_visitor_, OnObjectFragment(ftn, _, payload, 0)); object_stream->OnObjectMessage(object, payload, false); + EXPECT_CALL(remote_track_visitor_, + OnObjectFragment(ftn, _, payload, payload.length())); + object_stream->OnObjectMessage(object, payload, true); // complete the object + // New object, check the offset was reset. + ++object.object_id; + EXPECT_CALL(remote_track_visitor_, OnObjectFragment(ftn, _, payload, 0)); object_stream->OnObjectMessage(object, payload, true); // complete the object } @@ -1327,14 +1339,15 @@ fin |= options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { - return PublishedObject{ - PublishedObjectMetadata{Location(5, 0), 0, "extensions", - MoqtObjectStatus::kNormal, 127, - MoqtSessionPeer::Now(&session_)}, - MemSliceFromString("deadbeef"), false}; - }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)) + .WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, "extensions", + MoqtObjectStatus::kNormal, 127, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef"), false}; + }); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, 127); @@ -1381,13 +1394,15 @@ fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { - return PublishedObject{PublishedObjectMetadata{ - Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtSessionPeer::Now(&session_)}, - MemSliceFromString("deadbeef"), true}; - }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)) + .WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, "", + MoqtObjectStatus::kNormal, 127, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef"), true}; + }); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, @@ -1396,6 +1411,79 @@ EXPECT_TRUE(fin); } +TEST_F(MoqtSessionTest, SendFragmentedObject) { + FullTrackName ftn("foo", "bar"); + auto track = + SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2)); + MoqtObjectListener* subscription = + MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0); + + // 1st OnNewObjectAvailable happens when no outgoing unidirectional stream is + // available. + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .WillOnce(Return(false)); + subscription->OnNewObjectAvailable(Location(5, 0), 0, 128); + + // 2nd OnNewObjectAvailable happens when Part 1 ("part1", 5 bytes) and + // Part 2 ("part2", 5 bytes) are available. + bool fin = false; + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .WillOnce(Return(true)); + EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; }); + EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream()) + .WillOnce(Return(&mock_stream_)); + std::unique_ptr<webtransport::StreamVisitor> stream_visitor; + EXPECT_CALL(mock_stream_, SetVisitor(_)) + .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) { + stream_visitor = std::move(visitor); + }); + EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] { + return stream_visitor.get(); + }); + EXPECT_CALL(mock_stream_, GetStreamId()) + .WillRepeatedly(Return(kOutgoingUniStreamId)); + EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId)) + .WillRepeatedly(Return(&mock_stream_)); + PublishedObjectMetadata metadata = { + Location(5, 0), + /*subgroup=*/0, + /*extensions=*/"", MoqtObjectStatus::kNormal, + /*priority=*/128, + /*payload_length=*/15, MoqtSessionPeer::Now(&session_)}; + std::vector<quiche::QuicheMemSlice> payload; + payload.push_back(quiche::QuicheMemSlice::Copy("part1")); + payload.push_back(quiche::QuicheMemSlice::Copy("part2")); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)) + .WillOnce(Return(PublishedObject{metadata, std::move(payload), false})); + // Expect Writev for the header and first two parts. + EXPECT_CALL(mock_stream_, Writev) + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, + const webtransport::StreamWriteOptions& options) { + EXPECT_EQ(data.size(), 3); // Header, part1, and part2. + EXPECT_EQ(data[1].AsStringView(), "part1"); + EXPECT_EQ(data[2].AsStringView(), "part2"); + EXPECT_FALSE(options.send_fin()); + return absl::OkStatus(); + }); + subscription->OnNewObjectAvailable(Location(5, 0), 0, 128); + + // 3rd OnNewObjectAvailable happens when Part 3 ("part3", 5 bytes) is + // available. + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 10)) + .WillOnce( + Return(PublishedObject{metadata, PayloadFromString("part3"), true})); + EXPECT_CALL(mock_stream_, Writev) + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, + const webtransport::StreamWriteOptions& options) { + EXPECT_TRUE(options.send_fin()); + EXPECT_EQ(data.size(), 1); // No header, just part3. + EXPECT_EQ(data[0].AsStringView(), "part3"); + fin = true; + return absl::OkStatus(); + }); + subscription->OnNewObjectAvailable(Location(5, 0), 0, 128); +} + TEST_F(MoqtSessionTest, GroupAbandonedNoDeliveryTimeout) { FullTrackName ftn("foo", "bar"); auto track = @@ -1433,13 +1521,15 @@ fin |= options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { - return PublishedObject{PublishedObjectMetadata{ - Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtSessionPeer::Now(&session_)}, - MemSliceFromString("deadbeef"), true}; - }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)) + .WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, "", + MoqtObjectStatus::kNormal, 127, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef"), true}; + }); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, @@ -1499,13 +1589,15 @@ fin |= options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { - return PublishedObject{PublishedObjectMetadata{ - Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtSessionPeer::Now(&session_)}, - MemSliceFromString("deadbeef"), true}; - }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)) + .WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, "", + MoqtObjectStatus::kNormal, 127, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef"), true}; + }); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, @@ -1567,13 +1659,15 @@ fin |= options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { - return PublishedObject{PublishedObjectMetadata{ - Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtSessionPeer::Now(&session_)}, - MemSliceFromString("deadbeef"), true}; - }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)) + .WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, "", + MoqtObjectStatus::kNormal, 127, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef"), true}; + }); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, @@ -1621,13 +1715,15 @@ fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { - return PublishedObject{PublishedObjectMetadata{ - Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtSessionPeer::Now(&session_)}, - MemSliceFromString("deadbeef"), false}; - }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)) + .WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, "", + MoqtObjectStatus::kNormal, 127, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef"), false}; + }); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, @@ -1682,13 +1778,15 @@ fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { - return PublishedObject{PublishedObjectMetadata{ - Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtSessionPeer::Now(&session_)}, - MemSliceFromString("deadbeef"), false}; - }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)) + .WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, "", + MoqtObjectStatus::kNormal, 127, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef"), false}; + }); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, @@ -1709,14 +1807,15 @@ // object id, extensions, payload length, status. const std::string kExpectedMessage2 = {0x00, 0x00, 0x00, 0x03}; EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return true; }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([&] { - return PublishedObject{ - PublishedObjectMetadata{Location(5, 1), 0, "", - MoqtObjectStatus::kEndOfGroup, 127, - MoqtSessionPeer::Now(&session_)}, - MemSliceFromString(""), true}; - }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 2)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)) + .WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 1), 0, "", + MoqtObjectStatus::kEndOfGroup, 127, 0, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString(""), true}; + }); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 2, 0)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); EXPECT_CALL(mock_stream_, Writev(_, _)) @@ -1769,13 +1868,15 @@ fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { - return PublishedObject{PublishedObjectMetadata{ - Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtSessionPeer::Now(&session_)}, - MemSliceFromString("deadbeef"), false}; - }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)) + .WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, "", + MoqtObjectStatus::kNormal, 127, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef"), false}; + }); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, @@ -1821,13 +1922,13 @@ EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId)) .WillRepeatedly(Return(&mock_stream_)); EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([] { - return PublishedObject{ - PublishedObjectMetadata{Location(5, 0), 0, "", - MoqtObjectStatus::kNormal, 128}, - MemSliceFromString("deadbeef")}; + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)).WillRepeatedly([] { + return PublishedObject{PublishedObjectMetadata{ + Location(5, 0), 0, "", MoqtObjectStatus::kNormal, + 128, 8, quic::QuicTime::Zero()}, + PayloadFromString("deadbeef")}; }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream(); @@ -1871,13 +1972,15 @@ EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId)) .WillRepeatedly(Return(&mock_stream_)); EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 0)).WillRepeatedly([] { - return PublishedObject{ - PublishedObjectMetadata{Location(6, 0), 0, "", - MoqtObjectStatus::kNormal, 128}, - MemSliceFromString("deadbeef")}; - }); - EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 0, 0)) + .WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(6, 0), 0, "", + MoqtObjectStatus::kNormal, 128, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef")}; + }); + EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 1, 0)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream(); @@ -1910,13 +2013,13 @@ .WillRepeatedly(Return(&mock_stream_)); EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)).WillRepeatedly([] { return PublishedObject{ PublishedObjectMetadata{Location(5, 0), 0, "", - MoqtObjectStatus::kNormal, 128}, - MemSliceFromString("deadbeef")}; + MoqtObjectStatus::kNormal, 128, 8}, + PayloadFromString("deadbeef")}; }); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillOnce([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillOnce([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, @@ -1925,7 +2028,7 @@ // disappear by returning nullptr. EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId)) .WillRepeatedly(Return(nullptr)); - EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).Times(0); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).Times(0); subscription->OnNewObjectAvailable(Location(5, 1), 0, kDefaultPublisherPriority); } @@ -1967,12 +2070,13 @@ webtransport::DatagramStatusCode::kSuccess, ""); }); EXPECT_CALL(*track_publisher, - GetCachedObject(5, std::optional<uint64_t>(), 0)) - .WillRepeatedly([] { + GetCachedObject(5, std::optional<uint64_t>(), 0, 0)) + .WillRepeatedly([&] { return PublishedObject{ PublishedObjectMetadata{Location{5, 0}, std::nullopt, "ext", - MoqtObjectStatus::kNormal, 32}, - quiche::QuicheMemSlice::Copy("deadbeef")}; + MoqtObjectStatus::kNormal, 32, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef")}; }); listener->OnNewObjectAvailable(Location(5, 0), std::nullopt, 32); EXPECT_TRUE(correct_message); @@ -1998,15 +2102,16 @@ EXPECT_CALL(remote_track_visitor_, OnObjectFragment) .WillOnce([&](const FullTrackName& track_name, const PublishedObjectMetadata& metadata, - absl::string_view received_payload, bool fin) { + absl::string_view received_payload, uint64_t offset) { EXPECT_EQ(track_name, ftn); EXPECT_EQ(metadata.location, Location(object.group_id, object.object_id)); EXPECT_EQ(metadata.subgroup, object.subgroup_id); EXPECT_EQ(metadata.publisher_priority, object.publisher_priority); EXPECT_EQ(metadata.status, object.object_status); + EXPECT_EQ(metadata.payload_length, payload.length()); EXPECT_EQ(payload, received_payload); - EXPECT_TRUE(fin); + EXPECT_EQ(offset, 0); }); session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram))); } @@ -2033,8 +2138,10 @@ EXPECT_CALL(remote_track_visitor_, OnObjectFragment) .WillOnce([&](const FullTrackName&, const PublishedObjectMetadata& metadata, absl::string_view, - bool) { + uint64_t offset) { EXPECT_EQ(metadata.publisher_priority, kPeerDefaultPriority); + EXPECT_EQ(metadata.payload_length, 8u); + EXPECT_EQ(offset, 0); }); session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram))); // Omit priority from a stream. @@ -2047,8 +2154,10 @@ EXPECT_CALL(remote_track_visitor_, OnObjectFragment) .WillOnce([&](const FullTrackName&, const PublishedObjectMetadata& metadata, absl::string_view, - bool) { + uint64_t offset) { EXPECT_EQ(metadata.publisher_priority, kPeerDefaultPriority); + EXPECT_EQ(metadata.payload_length, 3u); + EXPECT_EQ(offset, 0); }); in_memory_stream.Receive(absl::string_view(stream_data, sizeof(stream_data)), false); @@ -2085,12 +2194,12 @@ return stream_visitor.get(); }); EXPECT_CALL(mock_stream_, CanWrite).WillRepeatedly(Return(true)); - EXPECT_CALL(*track, GetCachedObject) - .WillOnce(Return( - PublishedObject{PublishedObjectMetadata{Location(0, 0), 0, "", - MoqtObjectStatus::kNormal, - kLocalDefaultPriority}, - MemSliceFromString("deadbeef")})) + EXPECT_CALL(*track, GetCachedObject(_, _, _, _)) + .WillOnce(Return(PublishedObject{ + PublishedObjectMetadata{ + Location(0, 0), 0, "", MoqtObjectStatus::kNormal, + kLocalDefaultPriority, 8, MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef")})) .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream_, Writev) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, @@ -2102,12 +2211,12 @@ }); listener->OnNewObjectAvailable(Location(0, 0), 0, kLocalDefaultPriority); // Send a datagram with the default priority. - EXPECT_CALL(*track, GetCachedObject) - .WillOnce(Return( - PublishedObject{PublishedObjectMetadata{Location(0, 1), std::nullopt, - "", MoqtObjectStatus::kNormal, - kLocalDefaultPriority}, - MemSliceFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(_, _, _, _)) + .WillOnce(Return(PublishedObject{ + PublishedObjectMetadata{ + Location(0, 1), std::nullopt, "", MoqtObjectStatus::kNormal, + kLocalDefaultPriority, 8, MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef")})); EXPECT_CALL(mock_session_, SendOrQueueDatagram) .WillOnce([](absl::string_view datagram) { EXPECT_TRUE(static_cast<const uint8_t>(datagram[0]) & @@ -2118,12 +2227,12 @@ listener->OnNewObjectAvailable(Location(0, 1), std::nullopt, kLocalDefaultPriority); // Non-default priority - EXPECT_CALL(*track, GetCachedObject) - .WillOnce(Return( - PublishedObject{PublishedObjectMetadata{Location(0, 2), std::nullopt, - "", MoqtObjectStatus::kNormal, - kLocalDefaultPriority + 1}, - MemSliceFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(_, _, _, _)) + .WillOnce(Return(PublishedObject{ + PublishedObjectMetadata{ + Location(0, 2), std::nullopt, "", MoqtObjectStatus::kNormal, + kLocalDefaultPriority + 1, 8, MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef")})); EXPECT_CALL(mock_session_, SendOrQueueDatagram) .WillOnce([](absl::string_view datagram) { EXPECT_FALSE(static_cast<const uint8_t>(datagram[0]) & @@ -2219,26 +2328,29 @@ EXPECT_CALL(mock_stream2, visitor()).WillOnce([&]() { return stream_visitor[2].get(); }); - EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 0)) + EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 0, 0)) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", - MoqtObjectStatus::kNormal, 127}, - MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 1)) + MoqtObjectStatus::kNormal, 128, 6, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("foobar")})); + EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 1, 0)) .WillOnce(Return(std::nullopt)); - EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 0)) + EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 0, 0)) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(1, 0), 0, "", - MoqtObjectStatus::kNormal, 127}, - MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 1)) + MoqtObjectStatus::kNormal, 127, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 1, 0)) .WillOnce(Return(std::nullopt)); - EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 0)) + EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 0, 0)) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(2, 0), 0, "", - MoqtObjectStatus::kNormal, 127}, - MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 1)) + MoqtObjectStatus::kNormal, 127, 8, + MoqtSessionPeer::Now(&session_)}, + PayloadFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 1, 0)) .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true)); @@ -2331,12 +2443,12 @@ EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() { return stream_visitor0.get(); }); - EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 0)) + EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 0, 0)) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", - MoqtObjectStatus::kNormal, 127}, - MemSliceFromString("foobar")})); - EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 1)) + MoqtObjectStatus::kNormal, 127, 6}, + PayloadFromString("foobar")})); + EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 1, 0)) .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream0, Writev) @@ -2368,12 +2480,12 @@ EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() { return stream_visitor1.get(); }); - EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 0)) + EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 0, 0)) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", - MoqtObjectStatus::kNormal, 127}, - MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 1)) + MoqtObjectStatus::kNormal, 127, 8}, + PayloadFromString("deadbeef")})); + EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 1, 0)) .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream1, Writev(_, _)) @@ -2425,7 +2537,8 @@ output.metadata.subgroup = 0; output.metadata.status = status; output.metadata.publisher_priority = 128; - output.payload = quiche::QuicheMemSlice::Copy(payload); + output.metadata.payload_length = payload.length(); + output.payload = PayloadFromString(payload); output.fin_after_this = true; // should be ignored. return MoqtFetchTask::GetNextObjectResult::kSuccess; }) @@ -2525,6 +2638,82 @@ stream_input->OnFetchMessage(fetch); } +TEST_F(MoqtSessionTest, SendFragmentedFetchObject) { + using ::testing::ByMove; + std::unique_ptr<MoqtControlParserVisitor> stream_input = + MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); + MoqtFetch fetch = DefaultFetch(); + fetch.request_id = 3; // Use an odd ID for peer request in client session. + MockTrackPublisher* track = CreateTrackPublisher(); + + // Disable synchronous callback to have more control. + auto fetch_task_ptr = + std::make_unique<MockFetchTask>(std::nullopt, std::nullopt, false); + MockFetchTask* fetch_task = fetch_task_ptr.get(); + EXPECT_CALL(*track, StandaloneFetch) + .WillOnce(Return(ByMove(std::move(fetch_task_ptr)))); + + // Receive FETCH, send FETCH_OK. + stream_input->OnFetchMessage(fetch); + // FETCH_OK responding to the request. + MoqtFetchOk expected_ok; + expected_ok.request_id = fetch.request_id; + expected_ok.end_location = Location(1, 0); + EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _)); + fetch_task->CallFetchResponseCallback(expected_ok); + + webtransport::test::MockStream data_stream; + std::unique_ptr<webtransport::StreamVisitor> stream_visitor; + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream) + .WillRepeatedly(Return(true)); + EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream()) + .WillOnce(Return(&data_stream)); + EXPECT_CALL(data_stream, SetVisitor) + .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) { + stream_visitor = std::move(visitor); + }); + EXPECT_CALL(data_stream, SetPriority); + EXPECT_CALL(data_stream, CanWrite).WillRepeatedly(Return(true)); + // Trigger stream opening (calls SetObjectAvailableCallback with lambda1). + // Setting the stream visitor will cause a second call to the callback. + PublishedObjectMetadata metadata = { + Location(0, 0), 0, "", MoqtObjectStatus::kNormal, 128, 10}; + EXPECT_CALL(*fetch_task, GetNextObject) + .WillOnce([&](PublishedObject& output) { + output.metadata = metadata; + output.payload = PayloadFromString("part1"); + return MoqtFetchTask::GetNextObjectResult::kSuccess; + }) + .WillOnce(Return(MoqtFetchTask::GetNextObjectResult::kPending)); + EXPECT_CALL(data_stream, Writev) + .WillOnce([&](absl::Span<const quiche::QuicheMemSlice> data, + const webtransport::StreamWriteOptions& options) { + EXPECT_EQ(data.size(), 2); + EXPECT_EQ(data[1].AsStringView(), "part1"); + return absl::OkStatus(); + }); + fetch_task->CallObjectsAvailableCallback(); + // lambda1 ran, data_stream captured, stream_visitor set. + ASSERT_NE(stream_visitor, nullptr); + + // The second fragment is available. + EXPECT_CALL(*fetch_task, GetNextObject) + .WillOnce([&](PublishedObject& output) { + output.metadata = metadata; + output.payload = PayloadFromString("part2"); + return MoqtFetchTask::GetNextObjectResult::kSuccess; + }) + .WillRepeatedly(Return(MoqtFetchTask::GetNextObjectResult::kPending)); + EXPECT_CALL(data_stream, Writev) + .WillOnce([&](absl::Span<const quiche::QuicheMemSlice> data, + const webtransport::StreamWriteOptions& options) { + EXPECT_EQ(data.size(), 1); // No header. + EXPECT_EQ(data[0].AsStringView(), "part2"); + return absl::OkStatus(); + }); + fetch_task->CallObjectsAvailableCallback(); +} + // The publisher has the first object locally, but has to go upstream to get // the rest. TEST_F(MoqtSessionTest, FetchReturnsObjectBeforeOk) { @@ -3190,9 +3379,10 @@ stream.Receive("foo", false); EXPECT_TRUE(task->HasObject()); EXPECT_TRUE(task->NeedsMorePayload()); - EXPECT_FALSE(object_ready); - stream.Receive("bar", false); EXPECT_TRUE(object_ready); + object_ready = false; + stream.Receive("bar", false); + EXPECT_FALSE(object_ready); // No second call to the callback. EXPECT_TRUE(task->HasObject()); EXPECT_FALSE(task->NeedsMorePayload()); task->SetObjectAvailableCallback(nullptr); @@ -3239,17 +3429,18 @@ return stream_visitor.get(); }); EXPECT_CALL(*track_publisher, GetCachedObject) - .WillOnce( - Return(PublishedObject{PublishedObjectMetadata{ - Location(0, 0), - 0, - "", - MoqtObjectStatus::kObjectDoesNotExist, - 0, - MoqtSessionPeer::Now(&session_) - - quic::QuicTimeDelta::FromSeconds(1), - }, - quiche::QuicheMemSlice(), false})); + .WillOnce(Return( + PublishedObject{PublishedObjectMetadata{ + Location(0, 0), + 0, + "", + MoqtObjectStatus::kObjectDoesNotExist, + 0, + 0, + MoqtSessionPeer::Now(&session_) - + quic::QuicTimeDelta::FromSeconds(1), + }, + std::vector<quiche::QuicheMemSlice>(), false})); EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout)) .WillOnce([&](webtransport::StreamErrorCode /*error*/) { stream_visitor.reset(); @@ -3304,9 +3495,9 @@ EXPECT_CALL(*track_publisher, GetCachedObject) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", - MoqtObjectStatus::kObjectDoesNotExist, 0, + MoqtObjectStatus::kObjectDoesNotExist, 0, 0, MoqtSessionPeer::Now(&session_)}, - quiche::QuicheMemSlice(), true})) + PayloadFromString(""), true})) .WillOnce(Return(std::nullopt)); EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus())); EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout)).Times(0); @@ -3354,9 +3545,9 @@ EXPECT_CALL(*track_publisher, GetCachedObject) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", - MoqtObjectStatus::kObjectDoesNotExist, 0, + MoqtObjectStatus::kObjectDoesNotExist, 0, 0, MoqtSessionPeer::Now(&session_)}, - quiche::QuicheMemSlice(), false})) + PayloadFromString(""), false})) .WillOnce(Return(std::nullopt)); EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus())); ON_CALL(*track_publisher, largest_location()) @@ -3407,9 +3598,9 @@ EXPECT_CALL(*track_publisher, GetCachedObject) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", - MoqtObjectStatus::kObjectDoesNotExist, 0, + MoqtObjectStatus::kObjectDoesNotExist, 0, 0, MoqtSessionPeer::Now(&session_)}, - quiche::QuicheMemSlice(), false})) + PayloadFromString(""), false})) .WillOnce(Return(std::nullopt)); EXPECT_CALL(data_mock1, Writev(_, _)).WillOnce(Return(absl::OkStatus())); ON_CALL(*track_publisher, largest_location) @@ -3436,9 +3627,9 @@ EXPECT_CALL(*track_publisher, GetCachedObject) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(1, 0), 0, "", - MoqtObjectStatus::kObjectDoesNotExist, 0, + MoqtObjectStatus::kObjectDoesNotExist, 0, 0, MoqtSessionPeer::Now(&session_)}, - quiche::QuicheMemSlice(), false})) + PayloadFromString(""), false})) .WillOnce(Return(std::nullopt)); EXPECT_CALL(data_mock2, Writev(_, _)).WillOnce(Return(absl::OkStatus())); ON_CALL(*track_publisher, largest_location) @@ -4168,13 +4359,14 @@ /*start_object=*/0); // Send a datagram in window. - EXPECT_CALL(*mock_publisher, GetCachedObject(8, std::optional<uint64_t>(), 0)) + EXPECT_CALL(*mock_publisher, + GetCachedObject(8, std::optional<uint64_t>(), 0, 0)) .WillOnce([&] { return PublishedObject{ PublishedObjectMetadata{Location(8, 0), std::nullopt, "extensions", - MoqtObjectStatus::kNormal, 128, + MoqtObjectStatus::kNormal, 128, 8, MoqtSessionPeer::Now(&session_)}, - quiche::QuicheMemSlice::Copy("deadbeef"), false}; + PayloadFromString("deadbeef"), false}; }); EXPECT_CALL(mock_session_, SendOrQueueDatagram) .WillOnce(Return(webtransport::DatagramStatus( @@ -4216,13 +4408,13 @@ return data_stream_visitor.get(); }); EXPECT_CALL(mock_stream_, CanWrite).WillRepeatedly(Return(true)); - EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 0)) + EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 0, 0)) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 1, "", - MoqtObjectStatus::kNormal, 0x80, + MoqtObjectStatus::kNormal, 0x80, 0, MoqtSessionPeer::Now(&session_)}, - quiche::QuicheMemSlice(), false})); - EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 1)) + PayloadFromString(""), false})); + EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 1, 0)) .WillOnce(Return(std::nullopt)); SetLargestId(track, Location(0, 0)); EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus()));
diff --git a/quiche/quic/moqt/moqt_trace_recorder.cc b/quiche/quic/moqt/moqt_trace_recorder.cc index 6bcc537..b8f1273 100644 --- a/quiche/quic/moqt/moqt_trace_recorder.cc +++ b/quiche/quic/moqt/moqt_trace_recorder.cc
@@ -94,7 +94,7 @@ std::optional<PublishedObject> object_copy = publisher.GetCachedObject(location.group, subgroup, location.object); if (object_copy.has_value() && object_copy->metadata.location == location) { - object->set_payload_size(object_copy->payload.length()); + object->set_payload_size(object_copy->metadata.payload_length); } else { QUICHE_DLOG(WARNING) << "Track " << track_alias << " has marked " << location
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc index acecdee..df1188b 100644 --- a/quiche/quic/moqt/moqt_track.cc +++ b/quiche/quic/moqt/moqt_track.cc
@@ -24,9 +24,7 @@ #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" -#include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/simple_buffer_allocator.h" #include "quiche/web_transport/web_transport.h" namespace moqt { @@ -101,7 +99,6 @@ std::unique_ptr<MoqtFetchTask> fetch_task) { fetch_task_ = std::move(fetch_task); fetch_task_->SetObjectAvailableCallback([this]() { FetchObjects(); }); - FetchObjects(); } void SubscribeRemoteTrack::FetchObjects() { @@ -116,8 +113,27 @@ PublishedObject object; switch (fetch_task_->GetNextObject(object)) { case MoqtFetchTask::GetNextObjectResult::kSuccess: - visitor_->OnObjectFragment(full_track_name(), object.metadata, - object.payload.AsStringView(), true); + if (object.metadata.payload_length == 0) { + QUICHE_DCHECK_EQ(fetch_object_offset_, 0); + visitor_->OnObjectFragment(full_track_name(), object.metadata, "", 0); + break; + } + for (size_t i = 0; i < object.payload.size(); ++i) { + if (fetch_object_offset_ > 0 && object.payload[i].empty()) { + QUICHE_BUG(SubscribeRemoteTrack_empty_payload) + << "Empty payload for partial object " + << object.metadata.location; + continue; + } + visitor_->OnObjectFragment(full_track_name(), object.metadata, + object.payload[i].AsStringView(), + fetch_object_offset_); + fetch_object_offset_ += object.payload[i].length(); + if (fetch_object_offset_ == object.metadata.payload_length) { + fetch_object_offset_ = 0; + break; + } + } break; case MoqtFetchTask::GetNextObjectResult::kError: case MoqtFetchTask::GetNextObjectResult::kEof: @@ -230,29 +246,41 @@ need_object_available_callback_ = true; return kPending; } - if (!payload_.empty()) { - quiche::QuicheMemSlice message_slice(std::move(payload_)); - output.payload = std::move(message_slice); + if (next_object_->payload_length > 0 && payload_.empty()) { + return kPending; + } + while (!payload_.empty()) { + payload_offset_ += payload_.front().length(); + output.payload.push_back(std::move(payload_.front())); + payload_.pop_front(); } output.metadata.location = Location(next_object_->group_id, next_object_->object_id); output.metadata.subgroup = next_object_->subgroup_id; output.metadata.status = next_object_->object_status; output.metadata.publisher_priority = next_object_->publisher_priority; + output.metadata.payload_length = next_object_->payload_length; output.fin_after_this = false; if (output.metadata.location == largest_location_) { // This is the last object. eof_ = true; } - next_object_.reset(); + if (payload_offset_ == next_object_->payload_length) { + next_object_.reset(); + payload_offset_ = 0; + payload_length_ = 0; + } can_read_callback_(); return kSuccess; } void UpstreamFetch::UpstreamFetchTask::NewObject(const MoqtObject& message) { next_object_ = message; - payload_ = quiche::QuicheBuffer(quiche::SimpleBufferAllocator::Get(), - message.payload_length); + while (!payload_.empty()) { + payload_.pop_front(); + }; + payload_offset_ = 0; + payload_length_ = 0; } void UpstreamFetch::UpstreamFetchTask::AppendPayloadToObject( @@ -263,15 +291,11 @@ QUICHE_BUG_IF(quic_bug_AlreadyGotPayload, next_object_->payload_length == 0) << "AppendPayloadToObject called after payload was already full"; // Copy |payload| to the right spot in the buffer. - memcpy(payload_.data() + payload_.size() - next_object_->payload_length, - payload.data(), payload.length()); - next_object_->payload_length -= payload.length(); + payload_length_ += payload.length(); + payload_.push_back(quiche::QuicheMemSlice::Copy(payload)); } void UpstreamFetch::UpstreamFetchTask::NotifyNewObject() { - QUICHE_BUG_IF(quic_bug_NotifyNewObjectCalledEarly, - !next_object_.has_value() || next_object_->payload_length > 0) - << "NotifyNewObject called without a full object in store"; if (need_object_available_callback_ && object_available_callback_) { need_object_available_callback_ = false; object_available_callback_();
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index 5075b33..2e6f66d 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -22,8 +22,9 @@ #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_session_interface.h" #include "quiche/quic/moqt/moqt_types.h" -#include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_callbacks.h" +#include "quiche/common/quiche_circular_deque.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_weak_ptr.h" #include "quiche/web_transport/web_transport.h" @@ -165,6 +166,8 @@ bool dynamic_groups_ = kDefaultDynamicGroups; void FetchObjects(); std::unique_ptr<MoqtFetchTask> fetch_task_; + // If nonzero, fetch_task_ is in mid-object. + uint64_t fetch_object_offset_ = 0; std::optional<const uint64_t> track_alias_; SubscribeVisitor* visitor_; @@ -280,7 +283,8 @@ // MoqtSession calls this for a hint if the object has been read. bool HasObject() const { return next_object_.has_value(); } bool NeedsMorePayload() const { - return next_object_.has_value() && next_object_->payload_length > 0; + return next_object_.has_value() && + payload_length_ < next_object_->payload_length; } // MoqtSession calls NotifyNewObject() after NewObject() because it has to // exit the parser loop before the callback possibly causes another read. @@ -294,6 +298,9 @@ std::optional<webtransport::StreamErrorCode> error, absl::string_view reason_phrase); + uint64_t payload_offset() const { return payload_offset_; } + uint64_t payload_length() const { return payload_length_; } + private: Location largest_location_; absl::Status status_ = absl::OkStatus(); @@ -303,9 +310,11 @@ // payload bytes not yet received. The application receives a // PublishedObject that is constructed from next_object_ and payload_. std::optional<MoqtObject> next_object_; - // Store payload separately. Will be converted into QuicheMemSlice only when - // complete, since QuicheMemSlice is immutable. - quiche::QuicheBuffer payload_; + quiche::QuicheCircularDeque<quiche::QuicheMemSlice> payload_; + // The starting point of payload_. Data is deleted as it is delivered. + uint64_t payload_offset_ = 0; + // Total data delivered for this object. + uint64_t payload_length_ = 0; // The task should only call object_available_callback_ when the last result // was kPending. Otherwise, there can be recursive loops of
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc index 134906f..152c95b 100644 --- a/quiche/quic/moqt/moqt_track_test.cc +++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -20,6 +20,7 @@ #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h" #include "quiche/quic/platform/api/quic_test.h" #include "quiche/common/quiche_mem_slice.h" +#include "quiche/web_transport/web_transport.h" namespace moqt { @@ -27,6 +28,8 @@ namespace { +using ::testing::_; + class AlarmDelegate : public quic::QuicAlarm::DelegateWithoutContext { public: AlarmDelegate(bool* fired) : fired_(fired) {} @@ -80,6 +83,107 @@ EXPECT_FALSE(track_.InWindow(Location(1, 25))); } +TEST_F(SubscribeRemoteTrackTest, JoiningFetchMultiObject) { + auto fetch_task = std::make_unique<MockFetchTask>(); + MockFetchTask* task_ptr = fetch_task.get(); + track_.OnJoiningFetchReady(std::move(fetch_task)); + + PublishedObject o1, o2; + o1.metadata.location = Location(2, 0); + o1.metadata.payload_length = 3; + o1.payload.push_back(quiche::QuicheMemSlice::Copy("abc")); + + o2.metadata.location = Location(2, 1); + o2.metadata.payload_length = 3; + o2.payload.push_back(quiche::QuicheMemSlice::Copy("def")); + + EXPECT_CALL(visitor_, + OnObjectFragment(track_.full_track_name(), _, "abc", 0)); + EXPECT_CALL(visitor_, + OnObjectFragment(track_.full_track_name(), _, "def", 0)); + EXPECT_CALL(*task_ptr, GetNextObject) + .WillOnce([&](PublishedObject& output) { + output = std::move(o1); + return MoqtFetchTask::GetNextObjectResult::kSuccess; + }) + .WillOnce([&](PublishedObject& output) { + output = std::move(o2); + return MoqtFetchTask::GetNextObjectResult::kSuccess; + }) + .WillOnce(testing::Return(MoqtFetchTask::GetNextObjectResult::kPending)); + task_ptr->CallObjectsAvailableCallback(); + EXPECT_NE(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr); + EXPECT_CALL(*task_ptr, GetNextObject) + .WillOnce(testing::Return(MoqtFetchTask::GetNextObjectResult::kEof)); + task_ptr->CallObjectsAvailableCallback(); + EXPECT_EQ(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr); +} + +TEST_F(SubscribeRemoteTrackTest, JoiningFetchFragmented) { + auto fetch_task = std::make_unique<MockFetchTask>(); + MockFetchTask* task_ptr = fetch_task.get(); + track_.OnJoiningFetchReady(std::move(fetch_task)); + + PublishedObject part1, part2; + part1.metadata.location = Location(2, 0); + part1.metadata.payload_length = 6; + part1.payload.push_back(quiche::QuicheMemSlice::Copy("abc")); + + part2.metadata.location = Location(2, 0); + part2.metadata.payload_length = 6; + part2.payload.push_back(quiche::QuicheMemSlice::Copy("def")); + + EXPECT_CALL(visitor_, + OnObjectFragment(track_.full_track_name(), _, "abc", 0)); + EXPECT_CALL(visitor_, + OnObjectFragment(track_.full_track_name(), _, "def", 3)); + EXPECT_CALL(*task_ptr, GetNextObject) + .WillOnce([&](PublishedObject& output) { + output = std::move(part1); + return MoqtFetchTask::GetNextObjectResult::kSuccess; + }) + .WillOnce([&](PublishedObject& output) { + output = std::move(part2); + return MoqtFetchTask::GetNextObjectResult::kSuccess; + }) + .WillOnce(testing::Return(MoqtFetchTask::GetNextObjectResult::kPending)); + task_ptr->CallObjectsAvailableCallback(); +} + +TEST_F(SubscribeRemoteTrackTest, JoiningFetchEmptyPayload) { + auto fetch_task = std::make_unique<MockFetchTask>(); + MockFetchTask* task_ptr = fetch_task.get(); + track_.OnJoiningFetchReady(std::move(fetch_task)); + + PublishedObject o1; + o1.metadata.location = Location(2, 0); + o1.metadata.payload_length = 0; + o1.metadata.status = MoqtObjectStatus::kEndOfGroup; + + // Since object.payload is empty, is called once. + EXPECT_CALL(visitor_, + OnObjectFragment(track_.full_track_name(), o1.metadata, "", 0)); + EXPECT_CALL(*task_ptr, GetNextObject) + .WillOnce([&](PublishedObject& output) { + output = std::move(o1); + return MoqtFetchTask::GetNextObjectResult::kSuccess; + }) + .WillOnce(testing::Return(MoqtFetchTask::GetNextObjectResult::kPending)); + task_ptr->CallObjectsAvailableCallback(); +} + +TEST_F(SubscribeRemoteTrackTest, JoiningFetchError) { + auto fetch_task = std::make_unique<MockFetchTask>(); + MockFetchTask* task_ptr = fetch_task.get(); + track_.OnJoiningFetchReady(std::move(fetch_task)); + + EXPECT_NE(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr); + EXPECT_CALL(*task_ptr, GetNextObject) + .WillOnce(testing::Return(MoqtFetchTask::GetNextObjectResult::kError)); + task_ptr->CallObjectsAvailableCallback(); + EXPECT_EQ(SubscribeRemoteTrackPeer::GetFetchTask(&track_), nullptr); +} + class UpstreamFetchTest : public quic::test::QuicTest { protected: UpstreamFetchTest() @@ -163,7 +267,8 @@ MoqtFetchTask::GetNextObjectResult::kSuccess); EXPECT_EQ(object.metadata.location, Location(3, 0)); EXPECT_EQ(object.metadata.subgroup, 0); - EXPECT_EQ(object.payload.AsStringView(), "foobar"); + EXPECT_EQ(object.payload[0].AsStringView(), "foo"); + EXPECT_EQ(object.payload[1].AsStringView(), "bar"); }); int got_read_callback = 0; fetch_.OnStreamOpened([&]() { ++got_read_callback; }); @@ -187,6 +292,71 @@ EXPECT_TRUE(got_object); } +TEST_F(UpstreamFetchTest, ObjectRetrievalEmptyPayload) { + fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr); + MoqtObject moqt_obj = {1, 3, 0, 128, "", MoqtObjectStatus::kEndOfGroup, 0, 0}; + fetch_.task()->NewObject(moqt_obj); + fetch_.task()->NotifyNewObject(); + fetch_.OnStreamOpened([]() {}); + + PublishedObject output; + EXPECT_EQ(fetch_task_->GetNextObject(output), + MoqtFetchTask::GetNextObjectResult::kSuccess); + EXPECT_TRUE(output.payload.empty()); + EXPECT_EQ(output.metadata.status, MoqtObjectStatus::kEndOfGroup); +} + +TEST_F(UpstreamFetchTest, GetNextObjectAfterEof) { + fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr); + fetch_.task()->OnStreamAndFetchClosed(std::nullopt, ""); + + PublishedObject object; + EXPECT_EQ(fetch_task_->GetNextObject(object), + MoqtFetchTask::GetNextObjectResult::kEof); + // Subsequent calls should still return EOF. + EXPECT_EQ(fetch_task_->GetNextObject(object), + MoqtFetchTask::GetNextObjectResult::kEof); +} + +TEST_F(UpstreamFetchTest, GetNextObjectEofAtLargestLocation) { + Location largest(3, 50); + fetch_.OnFetchResult(largest, absl::OkStatus(), nullptr); + fetch_.OnStreamOpened([]() {}); + + MoqtObject obj1 = {1, 3, 49, 128, "", MoqtObjectStatus::kNormal, 0, 1}; + fetch_.task()->NewObject(obj1); + fetch_.task()->AppendPayloadToObject("a"); + fetch_.task()->NotifyNewObject(); + + PublishedObject out; + EXPECT_EQ(fetch_task_->GetNextObject(out), + MoqtFetchTask::GetNextObjectResult::kSuccess); + // Not at largest location yet. + EXPECT_EQ(fetch_task_->GetNextObject(out), + MoqtFetchTask::GetNextObjectResult::kPending); + + MoqtObject obj2 = {1, 3, 50, 128, "", MoqtObjectStatus::kNormal, 0, 1}; + fetch_.task()->NewObject(obj2); + fetch_.task()->AppendPayloadToObject("b"); + fetch_.task()->NotifyNewObject(); + + EXPECT_EQ(fetch_task_->GetNextObject(out), + MoqtFetchTask::GetNextObjectResult::kSuccess); + // Reached largest location. EOF should be set. + EXPECT_EQ(fetch_task_->GetNextObject(out), + MoqtFetchTask::GetNextObjectResult::kEof); +} + +TEST_F(UpstreamFetchTest, CloseWithError) { + fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr); + fetch_.task()->OnStreamAndFetchClosed( + static_cast<webtransport::StreamErrorCode>(0x123), "reason"); + PublishedObject out; + EXPECT_EQ(fetch_task_->GetNextObject(out), + MoqtFetchTask::GetNextObjectResult::kError); + EXPECT_FALSE(fetch_task_->GetStatus().ok()); +} + TEST_F(UpstreamFetchTest, LocationIsValidOkFirstObjectIdDeclining) { fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr); EXPECT_TRUE(
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h index de9654a..4b87f5e 100644 --- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -6,6 +6,7 @@ #define QUICHE_QUIC_MOQT_TOOLS_MOQT_MOCK_VISITOR_H_ #include <cstdint> +#include <map> #include <memory> #include <optional> #include <utility> @@ -77,7 +78,8 @@ const FullTrackName& GetTrackName() const override { return track_name_; } MOCK_METHOD(std::optional<PublishedObject>, GetCachedObject, - (uint64_t, std::optional<uint64_t>, uint64_t), (const, override)); + (uint64_t, std::optional<uint64_t>, uint64_t, uint64_t), + (const, override)); MOCK_METHOD(void, AddObjectListener, (MoqtObjectListener * listener), (override)); MOCK_METHOD(void, RemoveObjectListener, (MoqtObjectListener * listener), @@ -105,14 +107,14 @@ : track_name_(std::move(name)) {} const FullTrackName& GetTrackName() const override { return track_name_; } std::optional<PublishedObject> GetCachedObject( - uint64_t group, std::optional<uint64_t> subgroup, - uint64_t object) const override { + uint64_t group, std::optional<uint64_t> subgroup, uint64_t object, + uint64_t /*offset*/) const override { Location location(group, object); auto it = objects_.find(location); if (it == objects_.end()) { return std::nullopt; } - return CachedObjectToPublishedObject(it->second); + return it->second.ToPublishedObject(); } void AddObjectListener(MoqtObjectListener* listener) override { listeners_.insert(listener); @@ -146,16 +148,20 @@ } void AddObject(Location location, uint64_t subgroup, absl::string_view payload, bool fin) { - CachedObject object; - object.metadata.location = location; - object.metadata.subgroup = subgroup; - object.metadata.extensions = ""; - object.metadata.status = MoqtObjectStatus::kNormal; - object.metadata.publisher_priority = 128; - object.payload = std::make_shared<quiche::QuicheMemSlice>( - quiche::QuicheMemSlice::Copy(payload)); - object.fin_after_this = fin; - objects_[location] = std::move(object); + PublishedObjectMetadata metadata; + metadata.location = location; + metadata.subgroup = subgroup; + metadata.extensions = ""; + metadata.status = MoqtObjectStatus::kNormal; + metadata.publisher_priority = 128; + metadata.payload_length = payload.length(); + auto it = objects_.find(location); + if (it != objects_.end()) { + it->second.Append(it->second.payload_received(), payload); + } else { + objects_.try_emplace(location, metadata, + quiche::QuicheMemSlice::Copy(payload), fin); + } if (!largest_location_.has_value() || *largest_location_ < location) { largest_location_ = location; } @@ -172,7 +178,7 @@ private: FullTrackName track_name_; absl::flat_hash_set<MoqtObjectListener*> listeners_; - absl::flat_hash_map<Location, CachedObject> objects_; + std::map<Location, CachedObject> objects_; std::optional<Location> largest_location_; TrackExtensions extensions_; }; @@ -189,7 +195,7 @@ MOCK_METHOD(void, OnObjectFragment, (const FullTrackName& full_track_name, const PublishedObjectMetadata& metadata, - absl::string_view object, bool end_of_message), + absl::string_view object, uint64_t offset), (override)); MOCK_METHOD(void, OnPublishDone, (FullTrackName full_track_name), (override)); MOCK_METHOD(void, OnMalformedTrack, (const FullTrackName& full_track_name),
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.cc b/quiche/quic/moqt/test_tools/moqt_simulator.cc index 2ed2240..5090baa 100644 --- a/quiche/quic/moqt/test_tools/moqt_simulator.cc +++ b/quiche/quic/moqt/test_tools/moqt_simulator.cc
@@ -30,13 +30,15 @@ #include "quiche/quic/core/quic_time.h" #include "quiche/quic/core/quic_types.h" #include "quiche/quic/moqt/moqt_bitrate_adjuster.h" +#include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_known_track_publisher.h" -#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_outgoing_queue.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/moqt_session_interface.h" #include "quiche/quic/moqt/moqt_trace_recorder.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h" #include "quiche/quic/test_tools/simulator/actor.h" #include "quiche/quic/test_tools/simulator/link.h" @@ -206,13 +208,14 @@ void ObjectReceiver::OnObjectFragment(const FullTrackName& full_track_name, const PublishedObjectMetadata& metadata, absl::string_view object, - bool end_of_message) { + uint64_t offset) { QUICHE_DCHECK(full_track_name == TrackName()); if (metadata.status != MoqtObjectStatus::kNormal) { - QUICHE_DCHECK(end_of_message); + QUICHE_DCHECK(object.empty() && metadata.payload_length == 0 && + offset == 0); return; } - if (!end_of_message) { + if (metadata.payload_length != object.length() || offset != 0) { QUICHE_LOG(DFATAL) << "Partial receiving of objects wasn't enabled"; return; }
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.h b/quiche/quic/moqt/test_tools/moqt_simulator.h index 8a2e314..bf3b51f 100644 --- a/quiche/quic/moqt/test_tools/moqt_simulator.h +++ b/quiche/quic/moqt/test_tools/moqt_simulator.h
@@ -165,7 +165,7 @@ void OnObjectFragment(const FullTrackName& full_track_name, const PublishedObjectMetadata& metadata, - absl::string_view object, bool end_of_message) override; + absl::string_view object, uint64_t offset) override; void OnPublishDone(FullTrackName /*full_track_name*/) override {} void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {}
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc index c408333..1c4882e 100644 --- a/quiche/quic/moqt/tools/chat_client.cc +++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -7,6 +7,7 @@ #include <poll.h> #include <unistd.h> +#include <cstdint> #include <iostream> #include <memory> #include <optional> @@ -221,9 +222,10 @@ } void ChatClient::RemoteTrackVisitor::OnObjectFragment( - const FullTrackName& full_track_name, const PublishedObjectMetadata&, - absl::string_view object, bool end_of_message) { - if (!end_of_message) { + const FullTrackName& full_track_name, + const PublishedObjectMetadata& metadata, absl::string_view object, + uint64_t offset) { + if (offset != 0 || object.length() != metadata.payload_length) { std::cerr << "Error: received partial message despite requesting " "buffering\n"; }
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h index 516ea07..6e229d5 100644 --- a/quiche/quic/moqt/tools/chat_client.h +++ b/quiche/quic/moqt/tools/chat_client.h
@@ -5,6 +5,7 @@ #ifndef QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H #define QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H +#include <cstdint> #include <memory> #include <optional> #include <variant> @@ -111,8 +112,7 @@ void OnObjectFragment(const moqt::FullTrackName& full_track_name, const PublishedObjectMetadata& metadata, - absl::string_view object, - bool end_of_message) override; + absl::string_view object, uint64_t offset) override; void OnPublishDone(FullTrackName) override {}
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc index b3fd295..74fa035 100644 --- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc +++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -201,8 +201,8 @@ void OnObjectFragment(const FullTrackName& full_track_name, const PublishedObjectMetadata& metadata, - absl::string_view object, - bool /*end_of_message*/) override { + absl::string_view object, uint64_t offset) override { + QUICHE_CHECK(offset == 0 && metadata.payload_length == object.length()); std::string file_name = absl::StrCat(metadata.location.group, "-", metadata.location.object, ".", full_track_name.track_namespace().tuple().back());
diff --git a/quiche/quic/moqt/tools/moqt_relay.cc b/quiche/quic/moqt/tools/moqt_relay.cc index 74e35fa..6d16802 100644 --- a/quiche/quic/moqt/tools/moqt_relay.cc +++ b/quiche/quic/moqt/tools/moqt_relay.cc
@@ -52,9 +52,11 @@ // TODO(martinduke): Extend MoqtServer so that partial objects can be // received. server_(std::make_unique<MoqtServer>( - std::move(proof_source), [this](absl::string_view path) { + std::move(proof_source), + [this](absl::string_view path) { return IncomingSessionHandler(path); - })) { + }, + MoqtSessionParameters(/*deliver_partial_objects=*/true))) { quiche::QuicheIpAddress bind_ip_address; QUICHE_CHECK(bind_ip_address.FromString(bind_address)); // CreateUDPSocketAndListen() creates the event loop that we will pass to @@ -87,8 +89,9 @@ } else { verifier = quic::CreateDefaultProofVerifier(server_id.host()); } - return std::make_unique<moqt::MoqtClient>(peer_address, server_id, - std::move(verifier), event_loop); + MoqtSessionParameters parameters(/*deliver_partial_objects=*/true); + return std::make_unique<moqt::MoqtClient>( + peer_address, server_id, std::move(verifier), event_loop, parameters); } MoqtSessionCallbacks MoqtRelay::CreateClientCallbacks() {