Implement FETCH object serialization flags. Datagrams now no longer have a subgroup_id, so most Object data structures make the subgroup std::optional. FETCH no longer returns non-normal objects. Due to worsening dependency loops, added moqt_types.h, which will hopefully expand to cover many simple data types. Concludes MOQT draft-16 update except for filed bugs. PiperOrigin-RevId: 879779659
diff --git a/build/source_list.bzl b/build/source_list.bzl index 4c3283f..58ed467 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -1601,6 +1601,7 @@ "quic/moqt/moqt_subscribe_windows.h", "quic/moqt/moqt_trace_recorder.h", "quic/moqt/moqt_track.h", + "quic/moqt/moqt_types.h", "quic/moqt/relay_namespace_tree.h", "quic/moqt/session_namespace_tree.h", "quic/moqt/tools/chat_client.h",
diff --git a/build/source_list.gni b/build/source_list.gni index f4b0dd1..42458e6 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -1605,6 +1605,7 @@ "src/quiche/quic/moqt/moqt_subscribe_windows.h", "src/quiche/quic/moqt/moqt_trace_recorder.h", "src/quiche/quic/moqt/moqt_track.h", + "src/quiche/quic/moqt/moqt_types.h", "src/quiche/quic/moqt/relay_namespace_tree.h", "src/quiche/quic/moqt/session_namespace_tree.h", "src/quiche/quic/moqt/tools/chat_client.h",
diff --git a/build/source_list.json b/build/source_list.json index 66a8052..86a6e76 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -1604,6 +1604,7 @@ "quiche/quic/moqt/moqt_subscribe_windows.h", "quiche/quic/moqt/moqt_trace_recorder.h", "quiche/quic/moqt/moqt_track.h", + "quiche/quic/moqt/moqt_types.h", "quiche/quic/moqt/relay_namespace_tree.h", "quiche/quic/moqt/session_namespace_tree.h", "quiche/quic/moqt/tools/chat_client.h",
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index bded8ff..740d30f 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -24,7 +24,9 @@ #include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_names.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_buffer_allocator.h" @@ -355,72 +357,93 @@ quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader( const MoqtObject& message, MoqtDataStreamType message_type, - std::optional<uint64_t> previous_object_in_stream) { - if (!ValidateObjectMetadata(message, /*is_datagram=*/false)) { + std::optional<PublishedObjectMetadata>& previous_object_in_stream) { + if (!ValidateObjectMetadata(message)) { QUICHE_BUG(QUICHE_BUG_serialize_object_header_01) << "Object metadata is invalid"; return quiche::QuicheBuffer(); } + // Many fields are optional because the stream type or Fetch serialization + // omits them. + std::optional<uint64_t> stream_type; + std::optional<uint64_t> track_id; // Track alias or FETCH ID. + std::optional<uint64_t> group_id; + std::optional<uint64_t> subgroup_id; + std::optional<uint64_t> object_id; + std::optional<uint8_t> publisher_priority; + std::optional<absl::string_view> extension_headers; + uint64_t payload_length = message.payload_length; bool is_first_in_stream = !previous_object_in_stream.has_value(); - // Not all fields will be written to the wire. Keep optional ones in - // std::optional so that they can be excluded. - // Three fields are always optional. - std::optional<uint64_t> stream_type = - is_first_in_stream ? std::optional<uint64_t>(message_type.value()) - : std::nullopt; - std::optional<uint64_t> track_alias = - is_first_in_stream ? std::optional<uint64_t>(message.track_alias) - : std::nullopt; - std::optional<uint64_t> object_status = - (message.payload_length == 0) - ? std::optional<uint64_t>( - static_cast<uint64_t>(message.object_status)) - : std::nullopt; + if (is_first_in_stream) { + stream_type = message_type.value(); + track_id = message.track_alias; + } if (message_type.IsFetch()) { + MoqtFetchSerialization serialization; + if (is_first_in_stream) { + serialization = MoqtFetchSerialization(message); + } else { + serialization = + MoqtFetchSerialization(message, *previous_object_in_stream); + } + if (serialization.has_group_id()) { + group_id = message.group_id; + } + if (serialization.has_subgroup_id()) { + subgroup_id = message.subgroup_id; + } + if (serialization.has_object_id()) { + object_id = message.object_id; + } + if (serialization.has_priority()) { + publisher_priority = message.publisher_priority; + } + if (serialization.has_extensions()) { + extension_headers = message.extension_headers; + } return Serialize( WireOptional<WireVarInt62>(stream_type), - WireOptional<WireVarInt62>(track_alias), WireVarInt62(message.group_id), - WireVarInt62(message.subgroup_id), WireVarInt62(message.object_id), - WireUint8(message.publisher_priority), - WireStringWithVarInt62Length(message.extension_headers), - WireVarInt62(message.payload_length), - WireOptional<WireVarInt62>(object_status)); + WireOptional<WireVarInt62>(track_id), + WireVarInt62(serialization.value()), + WireOptional<WireVarInt62>(group_id), + WireOptional<WireVarInt62>(subgroup_id), + WireOptional<WireVarInt62>(object_id), + WireOptional<WireUint8>(publisher_priority), + WireOptional<WireStringWithVarInt62Length>(extension_headers), + WireVarInt62(payload_length)); } - if (previous_object_in_stream.has_value() && - message.object_id <= *previous_object_in_stream) { + // Subgroup stream. + if (!message.subgroup_id.has_value()) { QUICHE_BUG(QUICHE_BUG_serialize_object_header_02) - << "Object ID is not increasing"; + << "Subgroup ID is missing"; return quiche::QuicheBuffer(); } - // Subgroup headers have more optional fields. - QUICHE_CHECK(message_type.IsSubgroup()); - std::optional<uint64_t> group_id = - previous_object_in_stream.has_value() - ? std::nullopt - : std::optional<uint64_t>(message.group_id); - uint64_t object_id = message.object_id; - if (!is_first_in_stream) { - // The value is actually an object ID delta, not the absolute object ID. - object_id -= (*previous_object_in_stream + 1); + if (is_first_in_stream) { + group_id = message.group_id; + if (message_type.IsSubgroupPresent()) { + subgroup_id = message.subgroup_id; + } + if (!message_type.HasDefaultPriority()) { + publisher_priority = message.publisher_priority; + } } - std::optional<uint64_t> subgroup_id = - (is_first_in_stream && message_type.IsSubgroupPresent()) - ? std::optional<uint64_t>(message.subgroup_id) - : std::nullopt; - std::optional<uint8_t> publisher_priority = - (is_first_in_stream && !message_type.HasDefaultPriority()) - ? std::optional<uint8_t>(message.publisher_priority) - : std::nullopt; - std::optional<absl::string_view> extension_headers = - (message_type.AreExtensionHeadersPresent()) - ? std::optional<absl::string_view>(message.extension_headers) - : std::nullopt; + object_id = message.object_id; + if (!is_first_in_stream) { + *object_id -= (previous_object_in_stream->location.object + 1); + } + if (message_type.AreExtensionHeadersPresent()) { + extension_headers = message.extension_headers; + } + std::optional<uint64_t> object_status; + if (payload_length == 0) { + object_status = static_cast<uint64_t>(message.object_status); + } return Serialize( WireOptional<WireVarInt62>(stream_type), - WireOptional<WireVarInt62>(track_alias), + WireOptional<WireVarInt62>(track_id), WireOptional<WireVarInt62>(group_id), WireOptional<WireVarInt62>(subgroup_id), - WireOptional<WireUint8>(publisher_priority), WireVarInt62(object_id), + WireOptional<WireUint8>(publisher_priority), WireVarInt62(*object_id), WireOptional<WireStringWithVarInt62Length>(extension_headers), WireVarInt62(message.payload_length), WireOptional<WireVarInt62>(object_status)); @@ -429,7 +452,7 @@ quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram( const MoqtObject& message, absl::string_view payload, MoqtPriority default_priority) { - if (!ValidateObjectMetadata(message, /*is_datagram=*/true)) { + if (!ValidateObjectMetadata(message) || message.subgroup_id.has_value()) { QUICHE_BUG(QUICHE_BUG_serialize_object_datagram_01) << "Object metadata is invalid"; return quiche::QuicheBuffer(); @@ -723,17 +746,10 @@ } // static -bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object, - bool is_datagram) { - if (object.object_status != MoqtObjectStatus::kNormal && - object.object_status != MoqtObjectStatus::kEndOfGroup && - object.payload_length > 0) { - return false; - } - if (is_datagram && object.subgroup_id != object.object_id) { - return false; - } - return true; +bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object) { + return (object.object_status == MoqtObjectStatus::kNormal || + object.object_status == MoqtObjectStatus::kEndOfGroup || + object.payload_length == 0); } } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h index 62ae609..f1e56e0 100644 --- a/quiche/quic/moqt/moqt_framer.h +++ b/quiche/quic/moqt/moqt_framer.h
@@ -5,12 +5,12 @@ #ifndef QUICHE_QUIC_MOQT_MOQT_FRAMER_H_ #define QUICHE_QUIC_MOQT_MOQT_FRAMER_H_ -#include <cstdint> #include <optional> #include "absl/strings/string_view.h" #include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/quiche_buffer_allocator.h" @@ -37,7 +37,7 @@ // one otherwise. quiche::QuicheBuffer SerializeObjectHeader( const MoqtObject& message, MoqtDataStreamType message_type, - std::optional<uint64_t> previous_object_in_stream); + std::optional<PublishedObjectMetadata>& previous_object_in_stream); // Serializes both OBJECT and OBJECT_STATUS datagrams. quiche::QuicheBuffer SerializeObjectDatagram(const MoqtObject& message, absl::string_view payload, @@ -86,8 +86,7 @@ const SetupParameters& parameters, KeyValuePairList& out); // Returns true if the metadata is internally consistent. - static bool ValidateObjectMetadata(const MoqtObject& object, - bool is_datagram); + static bool ValidateObjectMetadata(const MoqtObject& object); const bool using_webtrans_; };
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index f9aa333..d63e2b6 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -16,7 +16,8 @@ #include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_names.h" -#include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_object.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/moqt/test_tools/moqt_test_message.h" #include "quiche/quic/platform/api/quic_expect_bug.h" #include "quiche/quic/platform/api/quic_test.h" @@ -91,11 +92,18 @@ MoqtObject adjusted_message = message; adjusted_message.payload_length = payload.size(); QUICHE_DCHECK(message.object_id > change_in_object_id); + std::optional<PublishedObjectMetadata> previous_object; + if (change_in_object_id > 0) { + previous_object.emplace(); + previous_object->location = + Location(message.group_id, message.object_id - change_in_object_id); + previous_object->subgroup = message.subgroup_id; + previous_object->extensions = message.extension_headers; + previous_object->status = message.object_status; + previous_object->publisher_priority = message.publisher_priority; + } quiche::QuicheBuffer header = framer.SerializeObjectHeader( - adjusted_message, stream_type, - change_in_object_id == 0 - ? std::nullopt - : std::optional<uint64_t>(message.object_id - change_in_object_id)); + adjusted_message, stream_type, previous_object); if (header.empty()) { return quiche::QuicheBuffer(); } @@ -269,43 +277,58 @@ } TEST_F(MoqtFramerSimpleTest, FetchMiddler) { - auto header = std::make_unique<StreamHeaderFetchMessage>(); - auto buffer1 = - SerializeObject(framer_, std::get<MoqtObject>(header->structured_data()), - "foo", MoqtDataStreamType::Fetch(), 0); - EXPECT_EQ(buffer1.size(), header->total_message_size()); - EXPECT_EQ(buffer1.AsStringView(), header->PacketSample()); + for (const MoqtFetchSerialization& flags : AllMoqtFetchSerializations()) { + SCOPED_TRACE(testing::Message() << "flags: " << flags.value()); + if (flags.is_datagram() && !flags.zero_subgroup_id()) { + // The framer will not encode these, although they are legal. + continue; + } + auto header = std::make_unique<StreamHeaderFetchMessage>(); + MoqtObject object = std::get<MoqtObject>(header->structured_data()); + std::optional<PublishedObjectMetadata> previous; + auto buffer1 = framer_.SerializeObjectHeader( + object, MoqtDataStreamType::Fetch(), previous); + auto whole_object = + quiche::QuicheBuffer::Copy(quiche::SimpleBufferAllocator::Get(), + absl::StrCat(buffer1.AsStringView(), "foo")); + EXPECT_EQ(whole_object.size(), header->total_message_size()); + EXPECT_EQ(whole_object.AsStringView(), header->PacketSample()); - auto middler = std::make_unique<StreamMiddlerFetchMessage>(); - auto buffer2 = - SerializeObject(framer_, std::get<MoqtObject>(middler->structured_data()), - "bar", MoqtDataStreamType::Fetch(), 3); - EXPECT_EQ(buffer2.size(), middler->total_message_size()); - EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample()); + auto middler = std::make_unique<StreamMiddlerFetchMessage>(flags); + // Populate previous object metadata. + previous.emplace(Location(object.group_id, object.object_id), + object.subgroup_id, object.extension_headers, + object.object_status, object.publisher_priority); + auto buffer2 = framer_.SerializeObjectHeader( + std::get<MoqtObject>(middler->structured_data()), + MoqtDataStreamType::Fetch(), previous); + whole_object = + quiche::QuicheBuffer::Copy(quiche::SimpleBufferAllocator::Get(), + absl::StrCat(buffer2.AsStringView(), "bar")); + EXPECT_EQ(whole_object.size(), middler->total_message_size()); + EXPECT_EQ(whole_object.AsStringView(), middler->PacketSample()); + } } TEST_F(MoqtFramerSimpleTest, BadObjectInput) { MoqtObject object = { - // This is a valid object. + // Invalid: DoesNotExist with non-zero payload length. /*track_alias=*/4, /*group_id=*/5, /*object_id=*/6, /*publisher_priority=*/7, std::string(kDefaultExtensionBlob.data(), kDefaultExtensionBlob.size()), - /*object_status=*/MoqtObjectStatus::kNormal, + /*object_status=*/MoqtObjectStatus::kObjectDoesNotExist, /*subgroup_id=*/8, /*payload_length=*/3, }; quiche::QuicheBuffer buffer; - - // Non-normal status must have no payload. - object.object_status = MoqtObjectStatus::kObjectDoesNotExist; + std::optional<PublishedObjectMetadata> previous; EXPECT_QUIC_BUG( buffer = framer_.SerializeObjectHeader( - object, MoqtDataStreamType::Subgroup(8, 0, false, false), false), + object, MoqtDataStreamType::Subgroup(8, 0, false, false), previous), "Object metadata is invalid"); EXPECT_TRUE(buffer.empty()); - // object.object_status = MoqtObjectStatus::kNormal; } TEST_F(MoqtFramerSimpleTest, BadDatagramInput) { @@ -317,7 +340,7 @@ /*publisher_priority=*/7, std::string(kDefaultExtensionBlob), /*object_status=*/MoqtObjectStatus::kNormal, - /*subgroup_id=*/6, + /*subgroup_id=*/std::nullopt, /*payload_length=*/3, }; quiche::QuicheBuffer buffer; @@ -334,7 +357,7 @@ object, "foo", kDefaultPublisherPriority), "Object metadata is invalid"); EXPECT_TRUE(buffer.empty()); - object.subgroup_id = 6; + object.subgroup_id = std::nullopt; EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram( object, "foobar", kDefaultPublisherPriority),
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 526d54b..6202316 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -32,6 +32,7 @@ #include "quiche/quic/moqt/moqt_session_callbacks.h" #include "quiche/quic/moqt/moqt_session_interface.h" #include "quiche/quic/moqt/moqt_track.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h" #include "quiche/quic/moqt/test_tools/moqt_session_peer.h" #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h" @@ -497,18 +498,12 @@ } EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kSuccess); EXPECT_EQ(object.metadata.location, expected); - if (object.metadata.location.object == 1) { - EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kEndOfGroup); - expected.object = 0; - ++expected.group; - } else { - EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kNormal); - EXPECT_EQ(object.payload.AsStringView(), "object"); - ++expected.object; - } + EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kNormal); + EXPECT_EQ(object.payload.AsStringView(), "object"); + ++expected.group; } while (result == MoqtFetchTask::GetNextObjectResult::kSuccess); EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kEof); - EXPECT_EQ(expected, Location(99, 1)); + EXPECT_EQ(expected, Location(100, 0)); } TEST_F(MoqtIntegrationTest, PublishNamespaceFailure) {
diff --git a/quiche/quic/moqt/moqt_key_value_pair.h b/quiche/quic/moqt/moqt_key_value_pair.h index 09dac22..b0fc0f6 100644 --- a/quiche/quic/moqt/moqt_key_value_pair.h +++ b/quiche/quic/moqt/moqt_key_value_pair.h
@@ -17,6 +17,7 @@ #include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_callbacks.h" @@ -54,40 +55,6 @@ absl::btree_multimap<uint64_t, std::variant<uint64_t, std::string>> map_; }; -inline constexpr uint64_t kMaxGroupId = quiche::kVarInt62MaxValue; -inline constexpr uint64_t kMaxObjectId = quiche::kVarInt62MaxValue; -// Location as defined in -// https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure -struct Location { - uint64_t group = 0; - uint64_t object = 0; - - Location() = default; - Location(uint64_t group, uint64_t object) : group(group), object(object) {} - - // Location order as described in - // https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure - auto operator<=>(const Location&) const = default; - - Location Next() const { - if (object == kMaxObjectId) { - if (group == kMaxObjectId) { - return Location(0, 0); - } - return Location(group + 1, 0); - } - return Location(group, object + 1); - } - - template <typename H> - friend H AbslHashValue(H h, const Location& m); - - template <typename Sink> - friend void AbslStringify(Sink& sink, const Location& sequence) { - absl::Format(&sink, "(%d; %d)", sequence.group, sequence.object); - } -}; - enum AuthTokenType : uint64_t { kOutOfBand = 0x0,
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index 3e75e58..013b1f2 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -164,6 +164,10 @@ type.has_extension() ? "_EXTENSION" : ""); } +std::string MoqtFetchSerializationToString(MoqtFetchSerialization type) { + return absl::StrCat("FETCH_SERIALIZATION_", type.value()); +} + std::string MoqtForwardingPreferenceToString( MoqtForwardingPreference preference) { switch (preference) {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index fdb35e5..c956c8f 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -8,6 +8,7 @@ #define QUICHE_QUIC_MOQT_MOQT_MESSAGES_H_ #include <algorithm> +#include <bit> #include <cstddef> #include <cstdint> #include <initializer_list> @@ -17,7 +18,6 @@ #include <variant> #include <vector> -#include "absl/strings/str_format.h" #include "absl/strings/string_view.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/core/quic_types.h" @@ -25,9 +25,11 @@ #include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_names.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_export.h" -#include "quiche/common/platform/api/quiche_logging.h" +#include "quiche/common/quiche_endian.h" namespace moqt { @@ -99,6 +101,7 @@ static constexpr uint64_t kFirstObjectId = 0x02; static constexpr uint64_t kSubgroupId = 0x04; + MoqtDataStreamType() : value_(0) {} // Factory functions. static std::optional<MoqtDataStreamType> FromValue(uint64_t value) { MoqtDataStreamType stream_type(value); @@ -169,11 +172,12 @@ } uint64_t value() const { return value_; } + MoqtDataStreamType& operator=(const MoqtDataStreamType& other) = default; bool operator==(const MoqtDataStreamType& other) const = default; private: explicit MoqtDataStreamType(uint64_t value) : value_(value) {} - const uint64_t value_; + uint64_t value_; }; class QUICHE_EXPORT MoqtDatagramType { @@ -301,11 +305,6 @@ auto operator<=>(const SubgroupPriority&) const = default; }; -template <typename H> -H AbslHashValue(H h, const Location& m) { - return H::combine(std::move(h), m.group, m.object); -} - // TODO(martinduke): Collapse both Setup messages into SetupParameters. struct QUICHE_EXPORT MoqtClientSetup { SetupParameters parameters; @@ -321,14 +320,6 @@ kDatagram, }; -enum class QUICHE_EXPORT MoqtObjectStatus : uint64_t { - kNormal = 0x0, - kObjectDoesNotExist = 0x1, - kEndOfGroup = 0x3, - kEndOfTrack = 0x4, - kInvalidObjectStatus = 0x5, -}; - MoqtObjectStatus IntegerToObjectStatus(uint64_t integer); // The data contained in every Object message, although the message type @@ -340,10 +331,114 @@ MoqtPriority publisher_priority; std::string extension_headers; // Raw, unparsed extension headers. MoqtObjectStatus object_status; - uint64_t subgroup_id; + std::optional<uint64_t> subgroup_id; // Only for subgroup objects. uint64_t payload_length; }; +class QUICHE_EXPORT MoqtFetchSerialization { + public: + static constexpr uint64_t kSubgroupIdMask = 0x03; + static constexpr uint64_t kSubgroupIdZero = 0x00; + static constexpr uint64_t kPriorSubgroupId = 0x01; + static constexpr uint64_t kPriorSubgroupIdPlusOne = 0x02; + static constexpr uint64_t kHasSubgroupId = 0x03; + static constexpr uint64_t kHasObjectId = 0x04; + static constexpr uint64_t kHasGroupId = 0x08; + static constexpr uint64_t kHasPriority = 0x10; + static constexpr uint64_t kHasExtensions = 0x20; + static constexpr uint64_t kIsDatagram = 0x40; + + static constexpr uint64_t kMaxFetchSerialization = + kIsDatagram | kHasExtensions | kHasPriority | kHasGroupId | kHasObjectId | + kSubgroupIdMask; + + static constexpr uint64_t kEndOfNonExistentRange = 0x8c; + static constexpr uint64_t kEndOfUnknownRange = 0x10c; + + MoqtFetchSerialization() = default; + // Serialization for the first object in a stream. + MoqtFetchSerialization(const MoqtObject& object) { + if (!object.subgroup_id.has_value()) { + value_ |= kIsDatagram; + } else { + if (*object.subgroup_id == 0) { + value_ |= kSubgroupIdZero; + } else { + value_ |= kHasSubgroupId; + } + } + value_ |= (kHasGroupId | kHasObjectId | kHasPriority); + if (!object.extension_headers.empty()) { + value_ |= kHasExtensions; + } + } + // Serialization for a subsequent object in a stream. + MoqtFetchSerialization(const MoqtObject& object, + const PublishedObjectMetadata& previous_object) { + uint64_t value = 0; + if (!object.subgroup_id.has_value()) { + value |= kIsDatagram; + } else if (object.subgroup_id == 0) { + value |= kSubgroupIdZero; + } else if (!previous_object.subgroup.has_value()) { + value |= kHasSubgroupId; // Can't use previous value. + } else if (object.subgroup_id == previous_object.subgroup) { + value |= kPriorSubgroupId; + } else if (*object.subgroup_id == *previous_object.subgroup + 1) { + value |= kPriorSubgroupIdPlusOne; + } else { + value |= kHasSubgroupId; + } + if (object.object_id != previous_object.location.object + 1) { + value |= kHasObjectId; + } + if (object.group_id != previous_object.location.group) { + value |= kHasGroupId; + } + if (object.publisher_priority != previous_object.publisher_priority) { + value |= kHasPriority; + } + if (!object.extension_headers.empty()) { + value |= kHasExtensions; + } + value_ = value; + } + static std::optional<MoqtFetchSerialization> FromValue(uint64_t value) { + if (value > kMaxFetchSerialization && value != kEndOfUnknownRange && + value != kEndOfNonExistentRange) { + return std::nullopt; + } + return MoqtFetchSerialization(value); + } + bool has_subgroup_id() const { + return ((value_ & kSubgroupIdMask) == kHasSubgroupId) && !is_datagram(); + } + bool zero_subgroup_id() const { + return (value_ & kSubgroupIdMask) == kSubgroupIdZero && !is_datagram(); + } + bool prior_subgroup_id() const { + return (value_ & kSubgroupIdMask) == kPriorSubgroupId && !is_datagram(); + } + bool prior_subgroup_id_plus_one() const { + return (value_ & kSubgroupIdMask) == kPriorSubgroupIdPlusOne && + !is_datagram(); + } + bool has_object_id() const { return value_ & kHasObjectId; } + bool has_group_id() const { return value_ & kHasGroupId; } + bool has_priority() const { return value_ & kHasPriority; } + bool has_extensions() const { return value_ & kHasExtensions; } + bool is_datagram() const { return value_ & kIsDatagram; } + bool end_of_non_existent_range() const { + return value_ == kEndOfNonExistentRange; + } + bool end_of_unknown_range() const { return value_ == kEndOfUnknownRange; } + uint64_t value() const { return value_; } + + private: + MoqtFetchSerialization(uint64_t value) : value_(value) {} + uint64_t value_ = 0; +}; + struct QUICHE_EXPORT MoqtRequestError { uint64_t request_id; RequestErrorCode error_code; @@ -534,6 +629,7 @@ std::string MoqtMessageTypeToString(MoqtMessageType message_type); std::string MoqtDataStreamTypeToString(MoqtDataStreamType type); +std::string MoqtFetchSerializationToString(MoqtFetchSerialization type); std::string MoqtDatagramTypeToString(MoqtDatagramType type); std::string MoqtForwardingPreferenceToString(
diff --git a/quiche/quic/moqt/moqt_object.h b/quiche/quic/moqt/moqt_object.h index e708e46..87469d9 100644 --- a/quiche/quic/moqt/moqt_object.h +++ b/quiche/quic/moqt/moqt_object.h
@@ -7,29 +7,28 @@ #include <cstdint> #include <memory> +#include <optional> #include <string> #include "quiche/quic/core/quic_time.h" -#include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/quiche_mem_slice.h" namespace moqt { struct PublishedObjectMetadata { Location location; - uint64_t subgroup; // Equal to object_id for datagrams. + std::optional<uint64_t> subgroup; // nullopt for datagrams. std::string extensions; MoqtObjectStatus status; MoqtPriority publisher_priority; - MoqtForwardingPreference forwarding_preference; quic::QuicTime arrival_time = quic::QuicTime::Zero(); bool IsMalformed(const PublishedObjectMetadata& other) const { // It's OK for arrival_time to be different when checking immutables. return (location != other.location || subgroup != other.subgroup || status != other.status || - publisher_priority != other.publisher_priority || - forwarding_preference != other.forwarding_preference); + publisher_priority != other.publisher_priority); } };
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc index d03954b..00cbc1c 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -14,12 +14,11 @@ #include "absl/algorithm/container.h" #include "absl/status/status.h" #include "quiche/quic/moqt/moqt_fetch_task.h" -#include "quiche/quic/moqt/moqt_key_value_pair.h" -#include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/quiche_mem_slice.h" @@ -77,29 +76,20 @@ Location sequence{current_group_id_, queue_.back().size()}; bool fin = status == MoqtObjectStatus::kEndOfGroup; queue_.back().push_back(CachedObject{ - PublishedObjectMetadata{ - sequence, 0, "", status, default_publisher_priority(), - MoqtForwardingPreference::kSubgroup, clock_->ApproximateNow()}, + PublishedObjectMetadata{sequence, 0, "", status, + default_publisher_priority(), + clock_->ApproximateNow()}, std::make_shared<quiche::QuicheMemSlice>(std::move(payload)), fin}); for (MoqtObjectListener* listener : listeners_) { listener->OnNewObjectAvailable(sequence, /*subgroup=*/0, - default_publisher_priority(), - MoqtForwardingPreference::kSubgroup); + default_publisher_priority()); } } std::optional<PublishedObject> MoqtOutgoingQueue::GetCachedObject( - uint64_t group, uint64_t subgroup, uint64_t object) const { - QUICHE_DCHECK_EQ(subgroup, 0u); + uint64_t group, std::optional<uint64_t> subgroup, uint64_t object) const { + QUICHE_DCHECK(subgroup.has_value() && subgroup == 0u); if (group < first_group_in_queue()) { - if (object == 0) { - return PublishedObject{ - PublishedObjectMetadata{ - Location(group, object), /*subgroup=*/0, "", - MoqtObjectStatus::kEndOfGroup, default_publisher_priority(), - MoqtForwardingPreference::kSubgroup, clock_->ApproximateNow()}, - quiche::QuicheMemSlice{}}; - } return std::nullopt; } if (group > current_group_id_) { @@ -211,45 +201,20 @@ MoqtFetchTask::GetNextObjectResult MoqtOutgoingQueue::FetchTask::GetNextObject( PublishedObject& object) { - MoqtFetchTask::GetNextObjectResult result; - do { - result = GetNextObjectInner(object); - // The specification for FETCH requires that all missing objects are simply - // skipped. - } while (result == MoqtFetchTask::GetNextObjectResult::kSuccess && - object.metadata.status == MoqtObjectStatus::kObjectDoesNotExist); - return result; -} - -MoqtFetchTask::GetNextObjectResult -MoqtOutgoingQueue::FetchTask::GetNextObjectInner(PublishedObject& object) { if (!status_.ok()) { return kError; } - if (objects_.empty()) { - return kEof; + while (!objects_.empty()) { + std::optional<PublishedObject> new_object = queue_->GetCachedObject( + objects_.front().group, 0, objects_.front().object); + objects_.pop_front(); + if (new_object.has_value() && + new_object->metadata.status == MoqtObjectStatus::kNormal) { + object = *std::move(new_object); + return kSuccess; + } } - - std::optional<PublishedObject> result = queue_->GetCachedObject( - objects_.front().group, 0, objects_.front().object); - if (!result.has_value()) { - // Create a synthetic object of status kEndOfGroup (if the object ID is - // zero) or kObjectDoesNotExist, which will result in the Fetch response - // skipping it. - object.metadata.location = objects_.front(); - object.metadata.subgroup = 0; - object.metadata.publisher_priority = queue_->default_publisher_priority(); - object.metadata.status = object.metadata.location.object == 0 - ? MoqtObjectStatus::kEndOfGroup - : MoqtObjectStatus::kObjectDoesNotExist; - object.metadata.arrival_time = queue_->clock_->ApproximateNow(); - object.payload = quiche::QuicheMemSlice(); - object.fin_after_this = false; - } else { - object = *std::move(result); - } - objects_.pop_front(); - return kSuccess; + return kEof; } void MoqtOutgoingQueue::Close() {
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h index 7dbc7ee..d1dca0f 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.h +++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -27,6 +27,7 @@ #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/quiche_circular_deque.h" #include "quiche/common/quiche_mem_slice.h" @@ -57,7 +58,8 @@ // MoqtTrackPublisher implementation. const FullTrackName& GetTrackName() const override { return track_; } std::optional<PublishedObject> GetCachedObject( - uint64_t group, uint64_t subgroup, uint64_t min_object) const override; + uint64_t group, std::optional<uint64_t> subgroup, + uint64_t min_object) const override; void AddObjectListener(MoqtObjectListener* listener) override { listeners_.insert(listener); listener->OnSubscribeAccepted();
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc index 333961c..dd79dcb 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -24,6 +24,7 @@ #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h" #include "quiche/common/platform/api/quiche_expect_bug.h" #include "quiche/common/platform/api/quiche_test.h" @@ -49,11 +50,10 @@ AddObjectListener(this); } - void OnNewObjectAvailable( - Location sequence, uint64_t subgroup, MoqtPriority publisher_priority, - MoqtForwardingPreference forwarding_preference) override { + void OnNewObjectAvailable(Location sequence, std::optional<uint64_t> subgroup, + MoqtPriority publisher_priority) override { // MoqtOutgoingQueue does not create datagrams. - ASSERT_EQ(forwarding_preference, MoqtForwardingPreference::kSubgroup); + ASSERT_THAT(subgroup, testing::Optional(0)); std::optional<PublishedObject> object = GetCachedObject(sequence.group, subgroup, sequence.object); ASSERT_THAT(object, @@ -79,8 +79,7 @@ GetCachedObjectsInRange(Location(0, 0), *largest_location()); for (Location object : objects) { if (window.InWindow(object)) { - OnNewObjectAvailable(object, 0, default_publisher_priority(), - MoqtForwardingPreference::kSubgroup); + OnNewObjectAvailable(object, 0, default_publisher_priority()); } } }
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index e1c663b..3a874e1 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -30,6 +30,7 @@ #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_data_reader.h" @@ -1146,7 +1147,7 @@ } else { object_metadata.object_id = 0; } - object_metadata.subgroup_id = object_metadata.object_id; + object_metadata.subgroup_id = std::nullopt; use_default_priority = datagram_type->has_default_priority(); if (!use_default_priority && !reader.ReadUInt8(&object_metadata.publisher_priority)) { @@ -1214,75 +1215,124 @@ return absl::bit_cast<uint8_t>(buffer[0]); } -void MoqtDataParser::AdvanceParserState() { - if (next_input_ != kStreamType && !type_.has_value()) { - QUICHE_BUG(quic_bug_advance_parser_state_no_type) - << "Advancing parser state without a stream type"; - return; +MoqtDataParser::NextInput MoqtDataParser::AdvanceParserState() { + if (type_.IsFetch()) { + switch (next_input_) { + case kStreamType: + return kRequestId; + case kRequestId: + return kSerializationFlags; + case kSerializationFlags: + if (fetch_serialization_.has_group_id()) { + return kGroupId; + } + [[fallthrough]]; + case kGroupId: + if (fetch_serialization_.is_datagram()) { + metadata_.subgroup_id = std::nullopt; + } else { + if (fetch_serialization_.has_subgroup_id()) { + return kSubgroupId; + } + if (fetch_serialization_.prior_subgroup_id_plus_one()) { + if (!metadata_.subgroup_id.has_value()) { + ParseError("reference to subgroup ID of prior datagram"); + return kFailed; + } + ++(*metadata_.subgroup_id); + } else if (fetch_serialization_.zero_subgroup_id()) { + metadata_.subgroup_id = 0; + } else if (!metadata_.subgroup_id.has_value()) { + QUICHE_DCHECK(fetch_serialization_.prior_subgroup_id()); + ParseError("reference to subgroup ID of prior datagram"); + return kFailed; + } + } + [[fallthrough]]; + case kSubgroupId: + if (fetch_serialization_.has_object_id()) { + return kObjectId; + } + ++metadata_.object_id; + [[fallthrough]]; + case kObjectId: + if (fetch_serialization_.end_of_non_existent_range() || + fetch_serialization_.end_of_unknown_range()) { + return kSerializationFlags; + } + if (fetch_serialization_.has_priority()) { + return kPublisherPriority; + } + [[fallthrough]]; + case kPublisherPriority: + if (fetch_serialization_.has_extensions()) { + return kExtensionSize; + } + metadata_.extension_headers = ""; + return kObjectPayloadLength; + case kExtensionBody: + return kObjectPayloadLength; + case kData: + return kSerializationFlags; + case kTrackAlias: + case kObjectPayloadLength: + case kAwaitingNextByte: + case kStatus: + case kFailed: + case kExtensionSize: + case kPadding: + QUICHE_NOTREACHED(); + return next_input_; + } } switch (next_input_) { // The state table is factored into a separate function (rather than // inlined) in order to separate the order of elements from the way they are // parsed. case kStreamType: - next_input_ = kTrackAlias; - break; + return kTrackAlias; case kTrackAlias: - next_input_ = kGroupId; - break; + return kGroupId; case kGroupId: - QUICHE_CHECK(type_.has_value()); - if (type_->IsFetch() || type_->IsSubgroupPresent()) { - next_input_ = kSubgroupId; - break; + if (type_.IsSubgroupPresent()) { + return kSubgroupId; } - if (type_->SubgroupIsZero()) { + if (type_.SubgroupIsZero()) { metadata_.subgroup_id = 0; } - next_input_ = - type_->HasDefaultPriority() ? kObjectId : kPublisherPriority; - break; + [[fallthrough]]; case kSubgroupId: - QUICHE_CHECK(type_.has_value()); - next_input_ = (type_->IsFetch() || type_->HasDefaultPriority()) - ? kObjectId - : kPublisherPriority; - break; - case kPublisherPriority: - QUICHE_CHECK(type_.has_value()); - next_input_ = type_->IsFetch() ? kExtensionSize : kObjectId; - break; - case kObjectId: - QUICHE_CHECK(type_.has_value()); - if (type_->HasDefaultPriority()) { - metadata_.publisher_priority = default_publisher_priority_; + if (!type_.HasDefaultPriority()) { + return kPublisherPriority; } - if (num_objects_read_ == 0 && type_->SubgroupIsFirstObjectId()) { + metadata_.publisher_priority = default_publisher_priority_; + [[fallthrough]]; + case kPublisherPriority: + return kObjectId; + case kObjectId: + if (num_objects_read_ == 0 && type_.SubgroupIsFirstObjectId()) { metadata_.subgroup_id = metadata_.object_id; } - if (type_->IsFetch()) { - next_input_ = kPublisherPriority; - } else if (type_->AreExtensionHeadersPresent()) { - next_input_ = kExtensionSize; - } else { - next_input_ = kObjectPayloadLength; + if (type_.AreExtensionHeadersPresent()) { + return kExtensionSize; } - break; + [[fallthrough]]; case kExtensionBody: - next_input_ = kObjectPayloadLength; - break; + return kObjectPayloadLength; case kStatus: case kData: case kAwaitingNextByte: - next_input_ = type_->IsFetch() ? kGroupId : kObjectId; - break; - case kExtensionSize: // Either kExtensionBody or - // kObjectPayloadLength. - case kObjectPayloadLength: // Either kStatus or kData depending on length. - case kPadding: // Handled separately. - case kFailed: // Should cause parsing to cease. + return kObjectId; + case kRequestId: + case kSerializationFlags: + case kExtensionSize: + case kObjectPayloadLength: + case kPadding: + case kFailed: + // Other transitions are either Fetch-only or handled in + // ParseNextItemFromStream. QUICHE_NOTREACHED(); - break; + return next_input_; } } @@ -1302,23 +1352,48 @@ ParseError("Invalid stream type supplied"); return; } - type_.emplace(std::move(*type)); - if (type_->IsPadding()) { + type_ = *type; + if (type_.IsPadding()) { next_input_ = kPadding; return; } - if (type_->EndOfGroupInStream()) { + if (type_.EndOfGroupInStream()) { contains_end_of_group_ = true; } - AdvanceParserState(); + next_input_ = AdvanceParserState(); return; } + case kRequestId: case kTrackAlias: { std::optional<uint64_t> value_read = ReadVarInt62NoFin(); if (value_read.has_value()) { metadata_.track_alias = *value_read; - AdvanceParserState(); + next_input_ = AdvanceParserState(); + } + return; + } + + case kSerializationFlags: { + std::optional<uint64_t> value_read = ReadVarInt62NoFin(); + if (value_read.has_value()) { + std::optional<MoqtFetchSerialization> serialization = + MoqtFetchSerialization::FromValue(*value_read); + if (!serialization.has_value()) { + ParseError("Invalid serialization flags"); + return; + } + if (num_objects_read_ == 0 && + (serialization->prior_subgroup_id() || + serialization->prior_subgroup_id_plus_one() || + !serialization->has_object_id() || + !serialization->has_group_id() || + !serialization->has_priority())) { + ParseError("Invalid serialization flags for first object"); + return; + } + fetch_serialization_ = *serialization; + next_input_ = AdvanceParserState(); } return; } @@ -1326,8 +1401,14 @@ case kGroupId: { std::optional<uint64_t> value_read = ReadVarInt62NoFin(); if (value_read.has_value()) { - metadata_.group_id = *value_read; - AdvanceParserState(); + if (type_.IsFetch() || + !fetch_serialization_.end_of_non_existent_range() || + !fetch_serialization_.end_of_unknown_range()) { + // Do not record range indicator group IDs because it will corrupt + // references to the previous object. + metadata_.group_id = *value_read; + } + next_input_ = AdvanceParserState(); } return; } @@ -1336,7 +1417,7 @@ std::optional<uint64_t> value_read = ReadVarInt62NoFin(); if (value_read.has_value()) { metadata_.subgroup_id = *value_read; - AdvanceParserState(); + next_input_ = AdvanceParserState(); } return; } @@ -1345,7 +1426,7 @@ std::optional<uint8_t> value_read = ReadUint8NoFin(); if (value_read.has_value()) { metadata_.publisher_priority = *value_read; - AdvanceParserState(); + next_input_ = AdvanceParserState(); } return; } @@ -1353,15 +1434,22 @@ case kObjectId: { std::optional<uint64_t> value_read = ReadVarInt62NoFin(); if (value_read.has_value()) { - if (type_.has_value() && type_->IsSubgroup() && - last_object_id_.has_value()) { - metadata_.object_id = *value_read + *last_object_id_ + 1; - } else { - metadata_.object_id = *value_read; + if (type_.IsFetch() || + !fetch_serialization_.end_of_non_existent_range() || + !fetch_serialization_.end_of_unknown_range()) { + // Do not record range indicator object IDs because it will corrupt + // references to the previous object. + if (type_.IsSubgroup() && last_object_id_.has_value()) { + metadata_.object_id = *value_read + *last_object_id_ + 1; + } else { + metadata_.object_id = *value_read; + } } last_object_id_ = metadata_.object_id; - AdvanceParserState(); + next_input_ = AdvanceParserState(); } + // TODO(martinduke): Report something if the fetch serialization is an end + // of range indicator. return; } @@ -1409,7 +1497,7 @@ // stream was supposed to conclude with kEndOfGroup and end it with the // encoded status instead. visitor_.OnObjectMessage(metadata_, "", /*end_of_message=*/true); - AdvanceParserState(); + next_input_ = AdvanceParserState(); } if (fin_read) { visitor_.OnFin(); @@ -1454,7 +1542,7 @@ visitor_.OnFin(); } ++num_objects_read_; - AdvanceParserState(); + next_input_ = AdvanceParserState(); } if (stream_.SkipBytes(chunk_size) && !no_more_data_) { // Although there was no FIN, SkipBytes() can return true if the @@ -1474,7 +1562,7 @@ return; } if (done) { - AdvanceParserState(); + next_input_ = AdvanceParserState(); } } } @@ -1500,12 +1588,11 @@ } void MoqtDataParser::ReadStreamType() { - return ReadDataUntil([this]() { return type_.has_value(); }); + return ReadDataUntil([this]() { return next_input_ != kStreamType; }); } void MoqtDataParser::ReadTrackAlias() { - return ReadDataUntil( - [this]() { return type_.has_value() && next_input_ != kTrackAlias; }); + return ReadDataUntil([this]() { return next_input_ > kTrackAlias; }); } void MoqtDataParser::ReadAtMostOneObject() { @@ -1519,17 +1606,17 @@ if (next_input_ == kAwaitingNextByte) { // Data arrived; the last object was not EndOfGroup. visitor_.OnObjectMessage(metadata_, "", /*end_of_message=*/true); - AdvanceParserState(); + next_input_ = AdvanceParserState(); ++num_objects_read_; } return false; } no_more_data_ = true; - const bool valid_state = type_.has_value() && - payload_length_remaining_ == 0 && - ((type_->IsSubgroup() && next_input_ == kObjectId) || - (type_->IsFetch() && next_input_ == kGroupId)); - if (!valid_state || num_objects_read_ == 0) { + const bool valid_state = + payload_length_remaining_ == 0 && + ((type_.IsSubgroup() && next_input_ == kObjectId) || + (type_.IsFetch() && next_input_ == kSerializationFlags)); + if (!valid_state) { ParseError("FIN received at an unexpected point in the stream"); return true; }
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index d0585ea..49248a7 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -218,11 +218,17 @@ void ReadAtMostOneObject(); // Returns the type of the unidirectional stream, if already known. - std::optional<MoqtDataStreamType> stream_type() const { return type_; } + std::optional<MoqtDataStreamType> stream_type() const { + if (next_input_ == kStreamType) { + return std::nullopt; + } + return type_; + } // Returns the track alias, if already known. std::optional<uint64_t> track_alias() const { - return (next_input_ == kStreamType || next_input_ == kTrackAlias) + return (next_input_ == kStreamType || next_input_ == kTrackAlias || + next_input_ == kRequestId) ? std::optional<uint64_t>() : metadata_.track_alias; } @@ -237,7 +243,9 @@ // Current state of the parser. enum NextInput { kStreamType, - kTrackAlias, + kTrackAlias, // SUBSCRIBE/PUBLISH only. + kRequestId, // FETCH only. + kSerializationFlags, // FETCH only. kGroupId, kSubgroupId, kPublisherPriority, @@ -273,7 +281,7 @@ std::optional<uint8_t> ReadUint8NoFin(); // Advances the state machine of the parser to the next expected state. - void AdvanceParserState(); + [[nodiscard]] NextInput AdvanceParserState(); // Reads the next available item from the stream. void ParseNextItemFromStream(); // Checks if we have encountered a FIN without data. If so, processes it and @@ -293,7 +301,8 @@ std::string buffered_message_; - std::optional<MoqtDataStreamType> type_ = std::nullopt; + MoqtDataStreamType type_; + MoqtFetchSerialization fetch_serialization_; NextInput next_input_ = kStreamType; MoqtObject metadata_; std::optional<uint64_t> last_object_id_;
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index bf0aa9d..659006c 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -21,6 +21,7 @@ #include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h" #include "quiche/quic/moqt/test_tools/moqt_test_message.h" #include "quiche/quic/platform/api/quic_test.h" @@ -1512,4 +1513,101 @@ EXPECT_TRUE(visitor_.fin_received_); } +TEST_F(MoqtDataParserStateMachineTest, ReadTypeThenObjectsFetch) { + for (MoqtFetchSerialization serialization : AllMoqtFetchSerializations()) { + SCOPED_TRACE(testing::Message() << "flags: " << serialization.value()); + MoqtParserTestVisitor visitor; + webtransport::test::InMemoryStream stream(/*stream_id=*/0); + MoqtDataParser parser(&stream, &visitor); + StreamHeaderFetchMessage header; + StreamMiddlerFetchMessage middler(serialization); + stream.Receive(header.PacketSample()); + stream.Receive(middler.PacketSample(), /*fin=*/true); + parser.ReadStreamType(); + ASSERT_EQ(visitor.messages_received_, 0); + parser.ReadAtMostOneObject(); + ASSERT_EQ(visitor.messages_received_, 1); + EXPECT_TRUE(header.EqualFieldValues(visitor.last_message_.value())); + EXPECT_EQ(visitor.object_payloads_[0], "foo"); + parser.ReadAtMostOneObject(); + ASSERT_EQ(visitor.messages_received_, 2); + EXPECT_TRUE(middler.EqualFieldValues(visitor.last_message_.value())); + EXPECT_EQ(visitor.object_payloads_[1], "bar"); + EXPECT_EQ(visitor.parsing_error_, std::nullopt); + EXPECT_TRUE(visitor.fin_received_); + } +} + +TEST_F(MoqtDataParserStateMachineTest, StreamHeaderFetchRefersToPrior) { + char data[] = {0x05, 0x01, 0x00}; + // Iterate through the 5 serializations that refer to the prior object. + for (char value : {0x0f, 0x17, 0x1b, 0x1d, 0x1e}) { + data[2] = value; + MoqtParserTestVisitor visitor; + webtransport::test::InMemoryStream stream(/*stream_id=*/0); + MoqtDataParser parser(&stream, &visitor); + stream.Receive(absl::string_view(data, sizeof(data))); + parser.ReadStreamType(); + parser.ReadAtMostOneObject(); + EXPECT_EQ(visitor.parsing_error_, + "Invalid serialization flags for first object"); + EXPECT_EQ(visitor.parsing_error_code_, MoqtError::kProtocolViolation); + } +} + +TEST_F(MoqtDataParserStateMachineTest, DatagramThenPriorSubgroupId) { + char data[] = {0x05, 0x01, 0x40, 0x5c, 0x05, 0x01, // datagram (5, 1) + 0x80, 0x03, 0x61, 0x61, 0x61, // priority, payload + 0xff}; // serialization flag to be overwritten + // Iterate through the 2 serializations that refer to the prior subgroup. + for (char value : {0x01, 0x02}) { + data[11] = value; + MoqtParserTestVisitor visitor; + webtransport::test::InMemoryStream stream(/*stream_id=*/0); + MoqtDataParser parser(&stream, &visitor); + stream.Receive(absl::string_view(data, sizeof(data))); + parser.ReadStreamType(); + parser.ReadAtMostOneObject(); + parser.ReadAtMostOneObject(); + EXPECT_EQ(visitor.parsing_error_, + "reference to subgroup ID of prior datagram"); + EXPECT_EQ(visitor.parsing_error_code_, MoqtError::kProtocolViolation); + } +} + +TEST_F(MoqtDataParserStateMachineTest, InvalidNonexistentRange) { + char data[] = {0x05, 0x01, 0x40, 0x80}; + stream_.Receive(absl::string_view(data, sizeof(data))); + parser_.ReadStreamType(); + parser_.ReadAtMostOneObject(); + EXPECT_EQ(visitor_.parsing_error_, "Invalid serialization flags"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + +TEST_F(MoqtDataParserStateMachineTest, InvalidNonexistentRangeUnknownRange) { + char data[] = {0x05, 0x01, 0x41, 0x8c}; + stream_.Receive(absl::string_view(data, sizeof(data))); + parser_.ReadStreamType(); + parser_.ReadAtMostOneObject(); + EXPECT_EQ(visitor_.parsing_error_, "Invalid serialization flags"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + +TEST_F(MoqtDataParserStateMachineTest, IgnoresEndRangeIndicators) { + // Header, Range Indicator, Middler + stream_.Receive(StreamHeaderFetchMessage().PacketSample()); + char data[] = {0x40, 0x8c, 0x05, 0x07, // non-existent range + 0x41, 0x0c, 0x05, 0x09}; // unknown range + stream_.Receive(absl::string_view(data, sizeof(data))); + std::optional<MoqtFetchSerialization> serialization = + MoqtFetchSerialization::FromValue(0x40); // Datagram + explicit object ID + ASSERT_TRUE(serialization.has_value()); + StreamMiddlerFetchMessage middler(*serialization); + stream_.Receive(middler.PacketSample(), /*fin=*/true); + parser_.ReadAllData(); + EXPECT_EQ(visitor_.messages_received_, 2); + // TODO(martinduke): Once Issue #1506 is resolved, check that the values + // are reported correctly. +} + } // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h index 66f3d7a..25d75ea 100644 --- a/quiche/quic/moqt/moqt_publisher.h +++ b/quiche/quic/moqt/moqt_publisher.h
@@ -14,10 +14,10 @@ #include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_key_value_pair.h" -#include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/web_transport/web_transport.h" namespace moqt { @@ -38,10 +38,10 @@ // Notifies that a new object is available on the track. The object payload // itself may be retrieved via GetCachedObject method of the associated track - // publisher. - virtual void OnNewObjectAvailable( - Location sequence, uint64_t subgroup, MoqtPriority publisher_priority, - MoqtForwardingPreference forwarding_preference) = 0; + // publisher. If |subgroup| is nullopt, the object is a datagram. + virtual void OnNewObjectAvailable(Location sequence, + std::optional<uint64_t> subgroup, + MoqtPriority publisher_priority) = 0; // Notifies that a pure FIN has arrived following |sequence|. Should not be // called unless all objects have already been delivered. If not delivered, // instead set the fin_after_this flag in the PublishedObject. @@ -82,13 +82,11 @@ // whenever they are sent. Once an object is not available via the cache, it // can no longer be sent; this ensures that objects are not buffered forever. // - // This method returns nullopt if the object is not currently available, but - // might become available in the future. If the object is gone forever, - // kEndOfGroup/kObjectDoesNotExist has to be returned instead; - // otherwise, the corresponding QUIC streams will be stuck waiting for objects - // that will never arrive. + // This method returns nullopt if the object is not currently available. + // If |subgroup| is nullopt, the object is a datagram. virtual std::optional<PublishedObject> GetCachedObject( - uint64_t group, uint64_t subgroup, uint64_t min_object) const = 0; + uint64_t group, std::optional<uint64_t> subgroup, + uint64_t min_object) const = 0; // Registers a listener with the track. The listener will be notified of all // newly arriving objects. The pointer to the listener must be valid until
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.cc b/quiche/quic/moqt/moqt_relay_track_publisher.cc index 1b44c8d..6d76d65 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher.cc +++ b/quiche/quic/moqt/moqt_relay_track_publisher.cc
@@ -20,6 +20,7 @@ #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session_interface.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_callbacks.h" @@ -124,57 +125,72 @@ return; } } - auto subgroup_it = group.subgroups.try_emplace(metadata.subgroup); - auto& subgroup = subgroup_it.first->second; - if (!subgroup.empty()) { // Check if the new object is valid - CachedObject& last_object = subgroup.rbegin()->second; - if (last_object.metadata.publisher_priority != - metadata.publisher_priority) { - QUICHE_DLOG(INFO) << "Publisher priority changing in a subgroup"; - OnMalformedTrack(full_track_name); - return; + CachedObject* duplicate_object = nullptr; + if (!metadata.subgroup.has_value()) { // It's a datagram. + std::shared_ptr<quiche::QuicheMemSlice> slice; + if (!object.empty()) { + slice = std::make_shared<quiche::QuicheMemSlice>( + quiche::QuicheMemSlice::Copy(object)); } - if (last_object.fin_after_this) { - QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the " - << "subgroup"; - OnMalformedTrack(full_track_name); - return; + auto [it, inserted] = group.datagrams.try_emplace( + metadata.location.object, CachedObject{metadata, slice, false}); + if (!inserted) { + duplicate_object = &it->second; } - // If last_object has stream-ending status, it should have been caught by - // the fin_after_this check above. - QUICHE_DCHECK( - last_object.metadata.status != MoqtObjectStatus::kEndOfGroup && - last_object.metadata.status != MoqtObjectStatus::kEndOfTrack); - if (last_object.metadata.location.object > metadata.location.object) { - QUICHE_DLOG(INFO) << "Skipping object because it decreases the " - << "object ID in the subgroup."; - return; + } else { + auto subgroup_it = group.subgroups.try_emplace(*metadata.subgroup); + auto& subgroup = subgroup_it.first->second; + if (!subgroup.empty()) { // Check if the new object is valid + CachedObject& last_object = subgroup.rbegin()->second; + if (last_object.metadata.publisher_priority != + metadata.publisher_priority) { + QUICHE_DLOG(INFO) << "Publisher priority changing in a subgroup"; + OnMalformedTrack(full_track_name); + return; + } + if (last_object.fin_after_this) { + QUICHE_DLOG(INFO) << "Skipping object because it is after the end of " + << "the subgroup"; + OnMalformedTrack(full_track_name); + return; + } + // If last_object has stream-ending status, it should have been caught by + // the fin_after_this check above. + QUICHE_DCHECK( + last_object.metadata.status != MoqtObjectStatus::kEndOfGroup && + last_object.metadata.status != MoqtObjectStatus::kEndOfTrack); + if (last_object.metadata.location.object > metadata.location.object) { + QUICHE_DLOG(INFO) << "Skipping object because it decreases the " + << "object ID in the subgroup."; + return; + } + } + if (metadata.status == MoqtObjectStatus::kEndOfGroup || + metadata.status == MoqtObjectStatus::kEndOfTrack) { + // Anticipate stream FIN. + last_object_in_stream = true; + } + std::shared_ptr<quiche::QuicheMemSlice> slice; + if (!object.empty()) { + slice = std::make_shared<quiche::QuicheMemSlice>( + quiche::QuicheMemSlice::Copy(object)); + } + auto [it, inserted] = subgroup.try_emplace( + metadata.location.object, + CachedObject{metadata, slice, last_object_in_stream}); + if (!inserted) { + duplicate_object = &it->second; } } - if (metadata.status == MoqtObjectStatus::kEndOfGroup || - metadata.status == MoqtObjectStatus::kEndOfTrack) { - // Anticipate stream FIN. - last_object_in_stream = true; - } - std::shared_ptr<quiche::QuicheMemSlice> slice; - if (!object.empty()) { - slice = std::make_shared<quiche::QuicheMemSlice>( - quiche::QuicheMemSlice::Copy(object)); - } - auto [it, inserted] = subgroup.try_emplace( - metadata.location.object, - CachedObject{metadata, slice, last_object_in_stream}); - if (!inserted) { - // It's a duplicate object. - CachedObject& old_object = it->second; - if (metadata.IsMalformed(old_object.metadata)) { + if (duplicate_object != nullptr) { + if (metadata.IsMalformed(duplicate_object->metadata)) { // Something besides the arrival time and extension headers changed. OnMalformedTrack(full_track_name); return; } // TODO(b/467718801): Fix this when the class supports partial object // delivery. When objects are complete, we can simply compare payloads. - if (old_object.payload->AsStringView() != object) { + if (duplicate_object->payload->AsStringView() != object) { OnMalformedTrack(full_track_name); } // No need to update state. @@ -199,10 +215,9 @@ } for (MoqtObjectListener* listener : listeners_) { listener->OnNewObjectAvailable(metadata.location, metadata.subgroup, - metadata.publisher_priority, - metadata.forwarding_preference); + metadata.publisher_priority); if (last_object_in_stream) { - listener->OnNewFinAvailable(metadata.location, metadata.subgroup); + listener->OnNewFinAvailable(metadata.location, *(metadata.subgroup)); } } } @@ -270,14 +285,23 @@ } std::optional<PublishedObject> MoqtRelayTrackPublisher::GetCachedObject( - uint64_t group_id, uint64_t subgroup_id, uint64_t min_object_id) const { + uint64_t group_id, std::optional<uint64_t> subgroup_id, + uint64_t min_object_id) const { auto group_it = queue_.find(group_id); if (group_it == queue_.end()) { // Group does not exist. return std::nullopt; } const Group& group = group_it->second; - auto subgroup_it = group.subgroups.find(subgroup_id); + if (!subgroup_id.has_value()) { + auto object_it = group.datagrams.lower_bound(min_object_id); + if (object_it == group.datagrams.end()) { + // No object after the last one received. + return std::nullopt; + } + return CachedObjectToPublishedObject(object_it->second); + } + auto subgroup_it = group.subgroups.find(*subgroup_id); if (subgroup_it == group.subgroups.end()) { // Subgroup does not exist. return std::nullopt;
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.h b/quiche/quic/moqt/moqt_relay_track_publisher.h index 57738c0..e10e5ab 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher.h +++ b/quiche/quic/moqt/moqt_relay_track_publisher.h
@@ -28,6 +28,7 @@ #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session_interface.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/quiche_callbacks.h" #include "quiche/common/quiche_weak_ptr.h" @@ -85,7 +86,7 @@ // MoqtTrackPublisher implementation. const FullTrackName& GetTrackName() const override { return track_; } std::optional<PublishedObject> GetCachedObject( - uint64_t group_id, uint64_t subgroup_id, + uint64_t group_id, std::optional<uint64_t> subgroup_id, uint64_t min_object) const override; void AddObjectListener(MoqtObjectListener* listener) override; void RemoveObjectListener(MoqtObjectListener* listener) override; @@ -127,6 +128,7 @@ uint64_t next_object = 0; bool complete = false; // If true, kEndOfGroup has been received. absl::btree_map<uint64_t, Subgroup> subgroups; // Ordered by subgroup id. + absl::btree_map<uint64_t, CachedObject> datagrams; }; bool is_closing_ = false;
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc index fc1cb5a..8861e7a 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc +++ b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
@@ -18,6 +18,7 @@ #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session_interface.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/moqt/test_tools/mock_moqt_session.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/web_transport/web_transport.h" @@ -26,6 +27,8 @@ namespace { +using ::testing::Optional; + const FullTrackName kTrackName = {"test", "track"}; class MockMoqtObjectListener : public MoqtObjectListener { @@ -34,9 +37,8 @@ MOCK_METHOD(void, OnSubscribeRejected, (MoqtRequestErrorInfo reason), (override)); MOCK_METHOD(void, OnNewObjectAvailable, - (Location sequence, uint64_t subgroup, - MoqtPriority publisher_priority, - MoqtForwardingPreference forwarding_preference), + (Location sequence, std::optional<uint64_t> subgroup, + MoqtPriority publisher_priority), (override)); MOCK_METHOD(void, OnNewFinAvailable, (Location final_object_in_subgroup, uint64_t subgroup_id), @@ -72,8 +74,7 @@ MoqtObjectStatus status, absl::string_view payload, bool fin_after_this = false) { EXPECT_CALL(listener_, - OnNewObjectAvailable(location, subgroup, 128, - MoqtForwardingPreference::kSubgroup)); + OnNewObjectAvailable(location, Optional(subgroup), 128)); if (fin_after_this || status == MoqtObjectStatus::kEndOfTrack || status == MoqtObjectStatus::kEndOfGroup) { EXPECT_CALL(listener_, OnNewFinAvailable(location, subgroup)); @@ -162,8 +163,7 @@ EXPECT_CALL(listener_, OnGroupAbandoned(group - 3)); } EXPECT_CALL(listener_, - OnNewObjectAvailable(Location(group, 0), 0, 128, - MoqtForwardingPreference::kSubgroup)); + OnNewObjectAvailable(Location(group, 0), Optional(0), 128)); publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{Location(group, 0), 0, "", @@ -355,14 +355,12 @@ EXPECT_CALL(*session_, Subscribe).WillOnce(testing::Return(true)); publisher_.AddObjectListener(&listener_); Location location = kLargestLocation.Next(); - EXPECT_CALL(listener_, - OnNewObjectAvailable(location, /*subgroup=*/0, - /*publisher_priority=*/128, - MoqtForwardingPreference::kSubgroup)); + EXPECT_CALL(listener_, OnNewObjectAvailable(location, Optional(0), + /*publisher_priority=*/128)); publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, - 128, MoqtForwardingPreference::kSubgroup}, + 128}, "object", /*end_of_message=*/true); // Exact duplicate is ignored. It doesn't matter that the arrival time // changed. @@ -372,8 +370,7 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, - 128, MoqtForwardingPreference::kSubgroup, - quic::QuicTime::Infinite()}, + 128, quic::QuicTime::Infinite()}, "object", /*end_of_message=*/true); } @@ -381,22 +378,20 @@ EXPECT_CALL(*session_, Subscribe).WillOnce(testing::Return(true)); publisher_.AddObjectListener(&listener_); Location location = kLargestLocation.Next(); - EXPECT_CALL(listener_, - OnNewObjectAvailable(location, /*subgroup=*/0, - /*publisher_priority=*/128, - MoqtForwardingPreference::kSubgroup)); + EXPECT_CALL(listener_, OnNewObjectAvailable(location, Optional(0), + /*publisher_priority=*/128)); publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, - 128, MoqtForwardingPreference::kSubgroup}, + 128}, "object", /*end_of_message=*/true); // Priority change; malformed track. EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0); EXPECT_CALL(listener_, OnTrackPublisherGone); publisher_.OnObjectFragment( kTrackName, - PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, 64, - MoqtForwardingPreference::kSubgroup}, + PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, + 64}, "object", /*end_of_message=*/true); EXPECT_TRUE(track_deleted_); } @@ -405,14 +400,12 @@ EXPECT_CALL(*session_, Subscribe).WillOnce(testing::Return(true)); publisher_.AddObjectListener(&listener_); Location location = kLargestLocation.Next(); - EXPECT_CALL(listener_, - OnNewObjectAvailable(location, /*subgroup=*/0, - /*publisher_priority=*/128, - MoqtForwardingPreference::kSubgroup)); + EXPECT_CALL(listener_, OnNewObjectAvailable(location, Optional(0), + /*publisher_priority=*/128)); publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, - 128, MoqtForwardingPreference::kSubgroup}, + 128}, "payload", /*end_of_message=*/true); // Payload change; malformed track. EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0); @@ -420,7 +413,7 @@ publisher_.OnObjectFragment( kTrackName, PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, - 128, MoqtForwardingPreference::kSubgroup}, + 128}, "foobar", /*end_of_message=*/true); EXPECT_TRUE(track_deleted_); } @@ -462,19 +455,16 @@ SubscribeAndOk(); Location location = kLargestLocation.Next(); EXPECT_CALL(listener_, - OnNewObjectAvailable(location, /*subgroup=*/location.object, - /*publisher_priority=*/128, - MoqtForwardingPreference::kDatagram)); + OnNewObjectAvailable(location, testing::Eq(std::nullopt), + /*publisher_priority=*/128)); publisher_.OnObjectFragment( kTrackName, - PublishedObjectMetadata{location, location.object, "", - MoqtObjectStatus::kNormal, 128, - MoqtForwardingPreference::kDatagram}, + PublishedObjectMetadata{location, std::nullopt, "", + MoqtObjectStatus::kNormal, 128}, "object", /*end_of_message=*/true); std::optional<PublishedObject> object = - publisher_.GetCachedObject(location.group, location.object, 0); - EXPECT_TRUE(object.has_value() && object->metadata.forwarding_preference == - MoqtForwardingPreference::kDatagram); + publisher_.GetCachedObject(location.group, std::nullopt, 0); + EXPECT_TRUE(object.has_value() && !object->metadata.subgroup.has_value()); } } // namespace
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 09354b2..aeb4e1f 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -45,6 +45,7 @@ #include "quiche/quic/moqt/moqt_session_interface.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h" #include "quiche/quic/moqt/moqt_track.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/platform/api/quic_logging.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" @@ -218,10 +219,9 @@ // TODO(martinduke): Handle extension headers. PublishedObjectMetadata metadata; metadata.location = Location(message.group_id, message.object_id); - metadata.subgroup = message.object_id; + metadata.subgroup = std::nullopt; metadata.status = message.object_status; metadata.publisher_priority = message.publisher_priority; - metadata.forwarding_preference = MoqtForwardingPreference::kDatagram; metadata.arrival_time = callbacks_.clock->Now(); visitor->OnObjectFragment(track->full_track_name(), metadata, *payload, true); @@ -660,19 +660,17 @@ switch (result) { case MoqtFetchTask::GetNextObjectResult::kSuccess: // Skip ObjectDoesNotExist in FETCH. - if (object.metadata.status == MoqtObjectStatus::kObjectDoesNotExist) { + if (object.metadata.status != MoqtObjectStatus::kNormal) { QUIC_BUG(quic_bug_got_doesnotexist_in_fetch) - << "Got ObjectDoesNotExist in FETCH"; + << "Got Non-normal object in FETCH"; continue; } if (fetch->session_->WriteObjectToStream( stream_, fetch->request_id(), object.metadata, std::move(object.payload), MoqtDataStreamType::Fetch(), // last Object ID doesn't matter for FETCH, just use zero. - stream_header_written_ ? std::optional<uint64_t>(0) - : std::nullopt, - /*fin=*/false)) { - stream_header_written_ = true; + last_object_, /*fin=*/false)) { + last_object_ = object.metadata; } break; case MoqtFetchTask::GetNextObjectResult::kPending: @@ -1617,9 +1615,6 @@ << " priority " << message.publisher_priority << " length " << payload.size() << " length " << message.payload_length << (end_of_message ? "F" : ""); - if (!index_.has_value()) { - index_ = DataStreamIndex(message.group_id, message.subgroup_id); - } if (!session_->parameters_.deliver_partial_objects) { if (!end_of_message) { // Buffer partial object. if (partial_object_.empty()) { @@ -1665,6 +1660,14 @@ return; } if (!track->is_fetch()) { + if (!index_.has_value()) { + if (!message.subgroup_id.has_value()) { + QUICHE_BUG(quiche_bug_moqt_subgroup_id_missing) + << "Missing subgroup ID on SUBSCRIBE stream"; + return; + } + index_ = DataStreamIndex(message.group_id, *message.subgroup_id); + } if (no_more_objects_) { // Already got a stream-ending object. While the lower layer won't // deliver data after the FIN, there could have been an EndOfGroup or @@ -1689,7 +1692,6 @@ metadata.extensions = message.extension_headers; metadata.status = message.object_status; metadata.publisher_priority = message.publisher_priority; - metadata.forwarding_preference = MoqtForwardingPreference::kSubgroup; metadata.arrival_time = session_->callbacks_.clock->Now(); subscribe->visitor()->OnObjectFragment(track->full_track_name(), metadata, payload, end_of_message); @@ -1965,8 +1967,8 @@ } void MoqtSession::PublishedSubscription::OnNewObjectAvailable( - Location location, uint64_t subgroup, MoqtPriority publisher_priority, - MoqtForwardingPreference forwarding_preference) { + Location location, std::optional<uint64_t> subgroup, + MoqtPriority publisher_priority) { if (!InWindow(location)) { return; } @@ -1987,13 +1989,20 @@ } } + // TODO(vasilvv): This currently sends UINT64_MAX for datagram subgroups. + // Maybe do something more satisfactory? session_->trace_recorder_.RecordNewObjectAvaliable( - track_alias_, *track_publisher_, location, subgroup, publisher_priority); + track_alias_, *track_publisher_, location, subgroup.value_or(UINT64_MAX), + publisher_priority); - DataStreamIndex index(location.group, subgroup); - if (reset_subgroups_.contains(index)) { - // This subgroup has already been reset, ignore. - return; + std::optional<webtransport::StreamId> stream_id; + if (subgroup.has_value()) { + DataStreamIndex index(location.group, *subgroup); + if (reset_subgroups_.contains(index)) { + // This subgroup has already been reset, ignore. + return; + } + stream_id = stream_map().GetStreamFor(index); } if (session_->alternate_delivery_timeout_ && !delivery_timeout().IsInfinite() && largest_sent_.has_value() && @@ -2001,10 +2010,10 @@ // Start the delivery timeout timer on all previous groups. for (uint64_t group = first_active_group_; group < location.group; ++group) { - for (webtransport::StreamId stream_id : + for (webtransport::StreamId stream_to_update : stream_map().GetStreamsForGroup(group)) { webtransport::Stream* raw_stream = - session_->session_->GetStreamById(stream_id); + session_->session_->GetStreamById(stream_to_update); if (raw_stream == nullptr) { continue; } @@ -2016,21 +2025,18 @@ } } QUICHE_DCHECK_GE(location.group, first_active_group_); - - if (forwarding_preference == MoqtForwardingPreference::kDatagram) { + if (!subgroup.has_value()) { SendDatagram(location); return; } - std::optional<webtransport::StreamId> stream_id = - stream_map().GetStreamFor(index); webtransport::Stream* raw_stream = nullptr; if (stream_id.has_value()) { raw_stream = session_->session_->GetStreamById(*stream_id); } else { raw_stream = session_->OpenOrQueueDataStream( request_id_, - NewStreamParameters(location.group, subgroup, location.object, + NewStreamParameters(location.group, *subgroup, location.object, (publisher_priority == default_publisher_priority_) ? std::nullopt : std::make_optional(publisher_priority))); @@ -2341,16 +2347,17 @@ stream_->ResetWithUserCode(kResetCodeDeliveryTimeout); return; } - if (!session_->WriteObjectToStream( - stream_, subscription.track_alias(), object->metadata, - std::move(object->payload), stream_type_, last_object_id_, - object->fin_after_this)) { + if (!session_->WriteObjectToStream(stream_, subscription.track_alias(), + object->metadata, + std::move(object->payload), stream_type_, + last_object_, object->fin_after_this)) { // WriteObjectToStream() closes the connection on error, meaning that // there is no need to process the stream any further. return; } - last_object_id_ = object->metadata.location.object; - next_object_ = *last_object_id_ + 1; + last_object_ = object->metadata; + + next_object_ = last_object_->location.object + 1; subscription.OnObjectSent(object->metadata.location); if (object->fin_after_this && !delivery_timeout.IsInfinite() && @@ -2381,12 +2388,11 @@ } } -bool MoqtSession::WriteObjectToStream(webtransport::Stream* stream, uint64_t id, - const PublishedObjectMetadata& metadata, - quiche::QuicheMemSlice payload, - MoqtDataStreamType type, - std::optional<uint64_t> last_id, - bool fin) { +bool MoqtSession::WriteObjectToStream( + webtransport::Stream* stream, uint64_t id, + const PublishedObjectMetadata& metadata, quiche::QuicheMemSlice payload, + MoqtDataStreamType type, std::optional<PublishedObjectMetadata> last_object, + bool fin) { QUICHE_DCHECK(stream->CanWrite()); MoqtObject header; header.track_alias = id; @@ -2399,7 +2405,7 @@ header.payload_length = payload.length(); quiche::QuicheBuffer serialized_header = - framer_.SerializeObjectHeader(header, type, last_id); + framer_.SerializeObjectHeader(header, type, last_object); // TODO(vasilvv): add a version of WebTransport write API that accepts // memslices so that we can avoid a copy here. std::array write_vector = { @@ -2490,7 +2496,7 @@ void MoqtSession::PublishedSubscription::SendDatagram(Location sequence) { std::optional<PublishedObject> object = track_publisher_->GetCachedObject( - sequence.group, sequence.object, sequence.object); + sequence.group, std::nullopt, sequence.object); if (!object.has_value()) { QUICHE_BUG(PublishedSubscription_SendDatagram_object_not_in_cache) << "Got notification about an object that is not in the cache"; @@ -2503,7 +2509,7 @@ header.publisher_priority = object->metadata.publisher_priority; header.extension_headers = object->metadata.extensions; header.object_status = object->metadata.status; - header.subgroup_id = header.object_id; + header.subgroup_id = std::nullopt; header.payload_length = object->payload.length(); quiche::QuicheBuffer datagram = session_->framer_.SerializeObjectDatagram( header, object->payload.AsStringView(),
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index d7289dd..8bb9060 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -30,6 +30,7 @@ #include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_names.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_parser.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" @@ -38,6 +39,7 @@ #include "quiche/quic/moqt/moqt_subscribe_windows.h" #include "quiche/quic/moqt/moqt_trace_recorder.h" #include "quiche/quic/moqt/moqt_track.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/moqt/session_namespace_tree.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/platform/api/quiche_logging.h" @@ -384,9 +386,9 @@ void OnSubscribeAccepted() override; void OnSubscribeRejected(MoqtRequestErrorInfo info) override; // This is only called for objects that have just arrived. - void OnNewObjectAvailable( - Location location, uint64_t subgroup, MoqtPriority publisher_priority, - MoqtForwardingPreference forwarding_preference) override; + void OnNewObjectAvailable(Location location, + std::optional<uint64_t> subgroup, + MoqtPriority publisher_priority) override; void OnTrackPublisherGone() override; void OnNewFinAvailable(Location location, uint64_t subgroup) override; void OnSubgroupAbandoned(uint64_t group, uint64_t subgroup, @@ -576,7 +578,7 @@ uint64_t next_object_; // Used in subgroup streams to compute the object ID diff. If nullopt, the // stream header has not been written yet. - std::optional<uint64_t> last_object_id_; + std::optional<PublishedObjectMetadata> last_object_; // If this data stream is for SUBSCRIBE, reset it if an object has been // excessively delayed per Section 7.1.1.2. std::unique_ptr<quic::QuicAlarm> delivery_timeout_alarm_; @@ -615,8 +617,8 @@ private: std::weak_ptr<PublishedFetch> fetch_; + std::optional<PublishedObjectMetadata> last_object_; webtransport::Stream* stream_; - bool stream_header_written_ = false; }; MoqtFetchTask* fetch_task() { return fetch_.get(); } @@ -667,8 +669,8 @@ // No class access below this line! } - void OnNewObjectAvailable(Location, uint64_t /*subgroup*/, MoqtPriority, - MoqtForwardingPreference) override {} + void OnNewObjectAvailable(Location, std::optional<uint64_t> /*subgroup*/, + MoqtPriority) override {} void OnNewFinAvailable(Location /*location*/, uint64_t /*subgroup*/) override {} void OnSubgroupAbandoned( @@ -749,7 +751,8 @@ const PublishedObjectMetadata& metadata, quiche::QuicheMemSlice payload, MoqtDataStreamType type, - std::optional<uint64_t> last_id, bool fin); + std::optional<PublishedObjectMetadata> last_object, + bool fin); void CancelFetch(uint64_t request_id);
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index cb65ccd..22b8d38 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -9,6 +9,7 @@ #include <cstring> #include <memory> #include <optional> +#include <queue> #include <string> #include <utility> #include <variant> @@ -37,6 +38,7 @@ #include "quiche/quic/moqt/moqt_session_callbacks.h" #include "quiche/quic/moqt/moqt_session_interface.h" #include "quiche/quic/moqt/moqt_track.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/moqt/session_namespace_tree.h" #include "quiche/quic/moqt/test_tools/moqt_framer_utils.h" #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h" @@ -59,6 +61,7 @@ using ::quic::test::MemSliceFromString; using ::testing::_; +using ::testing::Optional; using ::testing::Return; using ::testing::StrictMock; @@ -206,12 +209,16 @@ std::unique_ptr<webtransport::StreamVisitor>& visitor, MockSubscribeRemoteTrackVisitor* track_visitor) { MoqtFramer framer(true); + std::optional<PublishedObjectMetadata> previous_object; + if (visitor != nullptr) { + previous_object = PublishedObjectMetadata(); + previous_object->location.object = object.object_id - 1; + } quiche::QuicheBuffer buffer = framer.SerializeObjectHeader( object, - MoqtDataStreamType::Subgroup(object.subgroup_id, object.object_id, + MoqtDataStreamType::Subgroup(*object.subgroup_id, object.object_id, false, false), - (visitor == nullptr) ? std::nullopt - : std::optional<uint64_t>(object.object_id - 1)); + previous_object); size_t data_read = 0; if (visitor == nullptr) { // It's the first object in the stream EXPECT_CALL(session, AcceptIncomingUnidirectionalStream()) @@ -598,8 +605,7 @@ // forward=false, so incoming objects are ignored. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .Times(0); - listener->OnNewObjectAvailable(Location(0, 0), 0, kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + listener->OnNewObjectAvailable(Location(0, 0), 0, kDefaultPublisherPriority); } TEST_F(MoqtSessionTest, SubscribeAbsoluteStartNoDataYet) { @@ -613,8 +619,7 @@ // Window was not set to (0, 0) by SUBSCRIBE acceptance. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .Times(0); - listener->OnNewObjectAvailable(Location(0, 0), 0, kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + listener->OnNewObjectAvailable(Location(0, 0), 0, kDefaultPublisherPriority); } TEST_F(MoqtSessionTest, SubscribeNextGroup) { @@ -630,13 +635,12 @@ // Later objects in group 10 ignored. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .Times(0); - listener->OnNewObjectAvailable(Location(10, 21), 0, kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + listener->OnNewObjectAvailable(Location(10, 21), 0, + kDefaultPublisherPriority); // Group 11 is sent. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .WillOnce(Return(false)); - listener->OnNewObjectAvailable(Location(11, 0), 0, kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + listener->OnNewObjectAvailable(Location(11, 0), 0, kDefaultPublisherPriority); } TEST_F(MoqtSessionTest, TwoSubscribesForTrack) { @@ -1159,8 +1163,6 @@ EXPECT_EQ(metadata.extensions, "foo"); EXPECT_EQ(metadata.status, MoqtObjectStatus::kNormal); EXPECT_EQ(metadata.publisher_priority, 0); - EXPECT_EQ(metadata.forwarding_preference, - MoqtForwardingPreference::kSubgroup); EXPECT_EQ(payload, received_payload); EXPECT_TRUE(end_of_message); }); @@ -1325,20 +1327,17 @@ fin |= options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { return PublishedObject{ PublishedObjectMetadata{Location(5, 0), 0, "extensions", MoqtObjectStatus::kNormal, 127, - MoqtForwardingPreference::kSubgroup, MoqtSessionPeer::Now(&session_)}, MemSliceFromString("deadbeef"), false}; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); - subscription->OnNewObjectAvailable(Location(5, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + subscription->OnNewObjectAvailable(Location(5, 0), 0, 127); EXPECT_TRUE(correct_message); EXPECT_FALSE(fin); EXPECT_EQ(MoqtSessionPeer::LargestSentForSubscription(&session_, 0), @@ -1382,19 +1381,17 @@ fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { return PublishedObject{PublishedObjectMetadata{ Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtForwardingPreference::kSubgroup, - MoqtSessionPeer::Now(&session_)}, + 127, MoqtSessionPeer::Now(&session_)}, MemSliceFromString("deadbeef"), true}; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); EXPECT_TRUE(correct_message); EXPECT_TRUE(fin); } @@ -1436,19 +1433,17 @@ fin |= options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { return PublishedObject{PublishedObjectMetadata{ Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtForwardingPreference::kSubgroup, - MoqtSessionPeer::Now(&session_)}, + 127, MoqtSessionPeer::Now(&session_)}, MemSliceFromString("deadbeef"), true}; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); EXPECT_TRUE(correct_message); EXPECT_TRUE(fin); @@ -1504,19 +1499,17 @@ fin |= options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { return PublishedObject{PublishedObjectMetadata{ Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtForwardingPreference::kSubgroup, - MoqtSessionPeer::Now(&session_)}, + 127, MoqtSessionPeer::Now(&session_)}, MemSliceFromString("deadbeef"), true}; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); EXPECT_TRUE(correct_message); EXPECT_TRUE(fin); @@ -1574,19 +1567,17 @@ fin |= options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { return PublishedObject{PublishedObjectMetadata{ Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtForwardingPreference::kSubgroup, - MoqtSessionPeer::Now(&session_)}, + 127, MoqtSessionPeer::Now(&session_)}, MemSliceFromString("deadbeef"), true}; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); EXPECT_TRUE(correct_message); EXPECT_TRUE(fin); EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout)); @@ -1630,19 +1621,17 @@ fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { return PublishedObject{PublishedObjectMetadata{ Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtForwardingPreference::kSubgroup, - MoqtSessionPeer::Now(&session_)}, + 127, MoqtSessionPeer::Now(&session_)}, MemSliceFromString("deadbeef"), false}; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); EXPECT_TRUE(correct_message); EXPECT_FALSE(fin); fin = false; @@ -1693,27 +1682,24 @@ fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { return PublishedObject{PublishedObjectMetadata{ Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtForwardingPreference::kSubgroup, - MoqtSessionPeer::Now(&session_)}, + 127, MoqtSessionPeer::Now(&session_)}, MemSliceFromString("deadbeef"), false}; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); EXPECT_FALSE(fin); // Try to deliver (5,1), but fail. EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return false; }); EXPECT_CALL(*track, GetCachedObject).Times(0); EXPECT_CALL(mock_stream_, Writev).Times(0); subscription->OnNewObjectAvailable(Location(5, 1), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); // Notify that FIN arrived, but do nothing with it because (5, 1) isn't sent. EXPECT_CALL(mock_stream_, Writev).Times(0); subscription->OnNewFinAvailable(Location(5, 1), 0); @@ -1723,15 +1709,14 @@ // object id, extensions, payload length, status. const std::string kExpectedMessage2 = {0x00, 0x00, 0x00, 0x03}; EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return true; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([&] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([&] { return PublishedObject{ PublishedObjectMetadata{Location(5, 1), 0, "", MoqtObjectStatus::kEndOfGroup, 127, - MoqtForwardingPreference::kSubgroup, MoqtSessionPeer::Now(&session_)}, MemSliceFromString(""), true}; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 2)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 2)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); EXPECT_CALL(mock_stream_, Writev(_, _)) @@ -1784,19 +1769,17 @@ fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] { return PublishedObject{PublishedObjectMetadata{ Location(5, 0), 0, "", MoqtObjectStatus::kNormal, - 127, MoqtForwardingPreference::kSubgroup, - MoqtSessionPeer::Now(&session_)}, + 127, MoqtSessionPeer::Now(&session_)}, MemSliceFromString("deadbeef"), false}; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); // Abandon the subgroup. EXPECT_CALL(mock_stream_, ResetWithUserCode(0x1)).Times(1); @@ -1816,8 +1799,7 @@ EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .WillOnce(Return(false)); subscription->OnNewObjectAvailable(Location(5, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); // Unblock the session, and cause the queued stream to be sent. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) @@ -1839,13 +1821,13 @@ EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId)) .WillRepeatedly(Return(&mock_stream_)); EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([] { return PublishedObject{ PublishedObjectMetadata{Location(5, 0), 0, "", MoqtObjectStatus::kNormal, 128}, MemSliceFromString("deadbeef")}; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream(); @@ -1862,11 +1844,9 @@ EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .WillRepeatedly(Return(false)); subscription->OnNewObjectAvailable(Location(5, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); subscription->OnNewObjectAvailable(Location(6, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); subscription->OnGroupAbandoned(5); // Unblock the session, and cause the queued stream to be sent. There should @@ -1891,13 +1871,13 @@ EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId)) .WillRepeatedly(Return(&mock_stream_)); EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - EXPECT_CALL(*track, GetCachedObject(6, 0, 0)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 0)).WillRepeatedly([] { return PublishedObject{ PublishedObjectMetadata{Location(6, 0), 0, "", MoqtObjectStatus::kNormal, 128}, MemSliceFromString("deadbeef")}; }); - EXPECT_CALL(*track, GetCachedObject(6, 0, 1)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream(); @@ -1930,26 +1910,24 @@ .WillRepeatedly(Return(&mock_stream_)); EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([] { return PublishedObject{ PublishedObjectMetadata{Location(5, 0), 0, "", MoqtObjectStatus::kNormal, 128}, MemSliceFromString("deadbeef")}; }); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillOnce([] { + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillOnce([] { return std::optional<PublishedObject>(); }); subscription->OnNewObjectAvailable(Location(5, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); // Now that the stream exists and is recorded within subscription, make it // disappear by returning nullptr. EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId)) .WillRepeatedly(Return(nullptr)); - EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).Times(0); + EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).Times(0); subscription->OnNewObjectAvailable(Location(5, 1), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); } TEST_F(MoqtSessionTest, ReceiveUnsubscribe) { @@ -1979,7 +1957,7 @@ 0x05, 0x02, 0x05, 0x20, 0x03, 0x65, 0x78, 0x74, 0x64, 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66, // "deadbeef" }; - EXPECT_CALL(mock_session_, SendOrQueueDatagram(_)) + EXPECT_CALL(mock_session_, SendOrQueueDatagram) .WillOnce([&](absl::string_view datagram) { if (datagram.size() == sizeof(kExpectedMessage)) { correct_message = (0 == memcmp(datagram.data(), kExpectedMessage, @@ -1988,14 +1966,15 @@ return webtransport::DatagramStatus( webtransport::DatagramStatusCode::kSuccess, ""); }); - EXPECT_CALL(*track_publisher, GetCachedObject(5, 0, 0)).WillRepeatedly([] { - return PublishedObject{ - PublishedObjectMetadata{Location{5, 0}, 0, "ext", - MoqtObjectStatus::kNormal, 32}, - quiche::QuicheMemSlice::Copy("deadbeef")}; - }); - listener->OnNewObjectAvailable(Location(5, 0), 0, kDefaultPublisherPriority, - MoqtForwardingPreference::kDatagram); + EXPECT_CALL(*track_publisher, + GetCachedObject(5, std::optional<uint64_t>(), 0)) + .WillRepeatedly([] { + return PublishedObject{ + PublishedObjectMetadata{Location{5, 0}, std::nullopt, "ext", + MoqtObjectStatus::kNormal, 32}, + quiche::QuicheMemSlice::Copy("deadbeef")}; + }); + listener->OnNewObjectAvailable(Location(5, 0), std::nullopt, 32); EXPECT_TRUE(correct_message); } @@ -2011,7 +1990,7 @@ /*publisher_priority=*/0, /*extension_headers=*/"", /*object_status=*/MoqtObjectStatus::kNormal, - /*subgroup_id=*/0, + /*subgroup_id=*/std::nullopt, /*payload_length=*/8, }; char datagram[] = {0x00, 0x02, 0x00, 0x00, 0x00, 0x64, 0x65, @@ -2023,10 +2002,9 @@ EXPECT_EQ(track_name, ftn); EXPECT_EQ(metadata.location, Location(object.group_id, object.object_id)); + EXPECT_EQ(metadata.subgroup, object.subgroup_id); EXPECT_EQ(metadata.publisher_priority, object.publisher_priority); EXPECT_EQ(metadata.status, object.object_status); - EXPECT_EQ(metadata.forwarding_preference, - MoqtForwardingPreference::kDatagram); EXPECT_EQ(payload, received_payload); EXPECT_TRUE(fin); }); @@ -2122,13 +2100,12 @@ MoqtDataStreamType::kDefaultPriority); return absl::OkStatus(); }); - listener->OnNewObjectAvailable(Location(0, 0), 0, kLocalDefaultPriority, - MoqtForwardingPreference::kSubgroup); + listener->OnNewObjectAvailable(Location(0, 0), 0, kLocalDefaultPriority); // Send a datagram with the default priority. EXPECT_CALL(*track, GetCachedObject) .WillOnce(Return( - PublishedObject{PublishedObjectMetadata{Location(0, 1), 0, "", - MoqtObjectStatus::kNormal, + PublishedObject{PublishedObjectMetadata{Location(0, 1), std::nullopt, + "", MoqtObjectStatus::kNormal, kLocalDefaultPriority}, MemSliceFromString("deadbeef")})); EXPECT_CALL(mock_session_, SendOrQueueDatagram) @@ -2138,13 +2115,13 @@ return webtransport::DatagramStatus{ webtransport::DatagramStatusCode::kSuccess, ""}; }); - listener->OnNewObjectAvailable(Location(0, 1), 0, kLocalDefaultPriority, - MoqtForwardingPreference::kDatagram); + listener->OnNewObjectAvailable(Location(0, 1), std::nullopt, + kLocalDefaultPriority); // Non-default priority EXPECT_CALL(*track, GetCachedObject) .WillOnce(Return( - PublishedObject{PublishedObjectMetadata{Location(0, 2), 0, "", - MoqtObjectStatus::kNormal, + PublishedObject{PublishedObjectMetadata{Location(0, 2), std::nullopt, + "", MoqtObjectStatus::kNormal, kLocalDefaultPriority + 1}, MemSliceFromString("deadbeef")})); EXPECT_CALL(mock_session_, SendOrQueueDatagram) @@ -2154,8 +2131,8 @@ return webtransport::DatagramStatus{ webtransport::DatagramStatusCode::kSuccess, ""}; }); - listener->OnNewObjectAvailable(Location(0, 2), 0, kLocalDefaultPriority + 1, - MoqtForwardingPreference::kDatagram); + listener->OnNewObjectAvailable(Location(0, 2), std::nullopt, + kLocalDefaultPriority + 1); } TEST_F(MoqtSessionTest, StreamObjectOutOfWindow) { @@ -2204,14 +2181,11 @@ .WillOnce(Return(false)) .WillOnce(Return(false)); subscription->OnNewObjectAvailable(Location(1, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); subscription->OnNewObjectAvailable(Location(0, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); subscription->OnNewObjectAvailable(Location(2, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); // These should be opened in the sequence (0, 0), (1, 0), (2, 0). EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .WillRepeatedly(Return(true)); @@ -2245,24 +2219,27 @@ EXPECT_CALL(mock_stream2, visitor()).WillOnce([&]() { return stream_visitor[2].get(); }); - EXPECT_CALL(*track, GetCachedObject(0, 0, 0)) + EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 0)) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", MoqtObjectStatus::kNormal, 127}, MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt)); - EXPECT_CALL(*track, GetCachedObject(1, 0, 0)) + EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 1)) + .WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 0)) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(1, 0), 0, "", MoqtObjectStatus::kNormal, 127}, MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track, GetCachedObject(1, 0, 1)).WillOnce(Return(std::nullopt)); - EXPECT_CALL(*track, GetCachedObject(2, 0, 0)) + EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 1)) + .WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 0)) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(2, 0), 0, "", MoqtObjectStatus::kNormal, 127}, MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track, GetCachedObject(2, 0, 1)).WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 1)) + .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream2, CanWrite()).WillRepeatedly(Return(true)); @@ -2299,8 +2276,7 @@ EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .WillOnce(Return(false)); subscription->OnNewObjectAvailable(Location(0, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); // Delete the subscription, then grant stream credit. MoqtSessionPeer::DeleteSubscription(&session_, 0); @@ -2331,17 +2307,13 @@ .WillOnce(Return(false)) .WillOnce(Return(false)); subscription0->OnNewObjectAvailable(Location(0, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); subscription1->OnNewObjectAvailable(Location(0, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); subscription0->OnNewObjectAvailable(Location(1, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); subscription1->OnNewObjectAvailable(Location(1, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); // Allow one stream to be opened. It will be group 0, subscription 0. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) @@ -2359,12 +2331,13 @@ EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() { return stream_visitor0.get(); }); - EXPECT_CALL(*track1, GetCachedObject(0, 0, 0)) + EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 0)) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", MoqtObjectStatus::kNormal, 127}, MemSliceFromString("foobar")})); - EXPECT_CALL(*track1, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 1)) + .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream0, Writev) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, @@ -2395,12 +2368,13 @@ EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() { return stream_visitor1.get(); }); - EXPECT_CALL(*track2, GetCachedObject(0, 0, 0)) + EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 0)) .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", MoqtObjectStatus::kNormal, 127}, MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track2, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 1)) + .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream1, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, @@ -2812,9 +2786,9 @@ /*payload_length=*/3, }; MoqtFramer framer(true); + std::optional<PublishedObjectMetadata> metadata; quiche::QuicheBuffer header = framer.SerializeObjectHeader( - object, MoqtDataStreamType::Fetch(), std::nullopt); - + object, MoqtDataStreamType::Fetch(), metadata); // Open stream, deliver two objects before FETCH_OK. Neither should be read. webtransport::test::InMemoryStream data_stream(kIncomingUniStreamId); data_stream.SetVisitor( @@ -3038,11 +3012,13 @@ /*payload_length=*/3, }; MoqtFramer framer_(true); + std::optional<PublishedObjectMetadata> metadata; for (int i = 0; i < 4; ++i) { object.object_id = i; headers.push(framer_.SerializeObjectHeader( - object, MoqtDataStreamType::Fetch(), - i == 0 ? std::nullopt : std::optional<uint64_t>(i - 1))); + object, MoqtDataStreamType::Fetch(), metadata)); + metadata = PublishedObjectMetadata(); + metadata->location.object = i; // only object ID matters. payloads.push("foo"); } @@ -3109,11 +3085,13 @@ /*payload_length=*/3, }; MoqtFramer framer_(true); + std::optional<PublishedObjectMetadata> metadata; for (int i = 0; i < 4; ++i) { object.object_id = i; headers.push(framer_.SerializeObjectHeader( - object, MoqtDataStreamType::Fetch(), - i == 0 ? std::nullopt : std::optional<uint64_t>(i - 1))); + object, MoqtDataStreamType::Fetch(), metadata)); + metadata = PublishedObjectMetadata(); + metadata->location.object = i; // only object ID matters. payloads.push("foo"); } @@ -3200,8 +3178,9 @@ /*payload_length=*/6, }; MoqtFramer framer_(true); + std::optional<PublishedObjectMetadata> metadata; quiche::QuicheBuffer header = framer_.SerializeObjectHeader( - object, MoqtDataStreamType::Fetch(), std::nullopt); + object, MoqtDataStreamType::Fetch(), metadata); stream.Receive(header.AsStringView(), false); EXPECT_FALSE(task->HasObject()); EXPECT_FALSE(object_ready); @@ -3264,7 +3243,6 @@ "", MoqtObjectStatus::kObjectDoesNotExist, 0, - MoqtForwardingPreference::kSubgroup, MoqtSessionPeer::Now(&session_) - quic::QuicTimeDelta::FromSeconds(1), }, @@ -3277,8 +3255,7 @@ ON_CALL(*track_publisher, largest_location) .WillByDefault(Return(Location(0, 0))); subscription->OnNewObjectAvailable(Location(0, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); // Subsequent objects for that subgroup are ignored. EXPECT_CALL(*track_publisher, GetCachedObject).Times(0); EXPECT_CALL(mock_session_, GetStreamById(_)).Times(0); @@ -3287,8 +3264,7 @@ ON_CALL(*track_publisher, largest_location) .WillByDefault(Return(Location(0, 1))); subscription->OnNewObjectAvailable(Location(0, 1), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); // Check that reset_subgroups_ is pruned. EXPECT_TRUE(MoqtSessionPeer::SubgroupHasBeenReset(subscription, DataStreamIndex(0, 0))); @@ -3326,7 +3302,6 @@ .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", MoqtObjectStatus::kObjectDoesNotExist, 0, - MoqtForwardingPreference::kSubgroup, MoqtSessionPeer::Now(&session_)}, quiche::QuicheMemSlice(), true})) .WillOnce(Return(std::nullopt)); @@ -3335,8 +3310,7 @@ ON_CALL(*track_publisher, largest_location) .WillByDefault(Return(Location(0, 0))); subscription->OnNewObjectAvailable(Location(0, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); auto* delivery_alarm = absl::down_cast<quic::test::MockAlarmFactory::TestAlarm*>( MoqtSessionPeer::GetAlarm(stream_visitor.get())); @@ -3378,7 +3352,6 @@ .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", MoqtObjectStatus::kObjectDoesNotExist, 0, - MoqtForwardingPreference::kSubgroup, MoqtSessionPeer::Now(&session_)}, quiche::QuicheMemSlice(), false})) .WillOnce(Return(std::nullopt)); @@ -3386,8 +3359,7 @@ ON_CALL(*track_publisher, largest_location()) .WillByDefault(Return(Location(0, 0))); subscription->OnNewObjectAvailable(Location(0, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus())); subscription->OnNewFinAvailable(Location(0, 0), 0); @@ -3433,7 +3405,6 @@ .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(0, 0), 0, "", MoqtObjectStatus::kObjectDoesNotExist, 0, - MoqtForwardingPreference::kSubgroup, MoqtSessionPeer::Now(&session_)}, quiche::QuicheMemSlice(), false})) .WillOnce(Return(std::nullopt)); @@ -3441,8 +3412,7 @@ ON_CALL(*track_publisher, largest_location) .WillByDefault(Return(Location(0, 0))); subscription->OnNewObjectAvailable(Location(0, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); webtransport::test::MockStream data_mock2; EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream()) @@ -3464,7 +3434,6 @@ .WillOnce(Return(PublishedObject{ PublishedObjectMetadata{Location(1, 0), 0, "", MoqtObjectStatus::kObjectDoesNotExist, 0, - MoqtForwardingPreference::kSubgroup, MoqtSessionPeer::Now(&session_)}, quiche::QuicheMemSlice(), false})) .WillOnce(Return(std::nullopt)); @@ -3472,8 +3441,7 @@ ON_CALL(*track_publisher, largest_location) .WillByDefault(Return(Location(1, 0))); subscription->OnNewObjectAvailable(Location(1, 0), 0, - kDefaultPublisherPriority, - MoqtForwardingPreference::kSubgroup); + kDefaultPublisherPriority); // Group 1 should start the timer on the Group 0 stream. auto* delivery_alarm = @@ -4197,20 +4165,19 @@ /*start_object=*/0); // Send a datagram in window. - EXPECT_CALL(*mock_publisher, GetCachedObject(8, 0, 0)).WillOnce([&] { - return PublishedObject{ - PublishedObjectMetadata{Location(8, 0), 0, "extensions", - MoqtObjectStatus::kNormal, 128, - MoqtForwardingPreference::kDatagram, - MoqtSessionPeer::Now(&session_)}, - quiche::QuicheMemSlice::Copy("deadbeef"), false}; - }); + EXPECT_CALL(*mock_publisher, GetCachedObject(8, std::optional<uint64_t>(), 0)) + .WillOnce([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(8, 0), std::nullopt, "extensions", + MoqtObjectStatus::kNormal, 128, + MoqtSessionPeer::Now(&session_)}, + quiche::QuicheMemSlice::Copy("deadbeef"), false}; + }); EXPECT_CALL(mock_session_, SendOrQueueDatagram) .WillOnce(Return(webtransport::DatagramStatus( webtransport::DatagramStatusCode::kSuccess, ""))); - listener->OnNewObjectAvailable(Location(8, 0), 0, 0x80, - MoqtForwardingPreference::kDatagram); + listener->OnNewObjectAvailable(Location(8, 0), std::nullopt, 0x80); std::unique_ptr<MoqtControlParserVisitor> control_stream = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); @@ -4223,8 +4190,7 @@ control_stream->OnRequestUpdateMessage(MoqtRequestUpdate{3, 1, parameters}); EXPECT_CALL(*mock_publisher, GetCachedObject).Times(0); EXPECT_CALL(mock_session_, SendOrQueueDatagram).Times(0); - listener->OnNewObjectAvailable(Location(8, 1), 0, 0x80, - MoqtForwardingPreference::kDatagram); + listener->OnNewObjectAvailable(Location(8, 1), 0, 0x80); } } // namespace test
diff --git a/quiche/quic/moqt/moqt_types.h b/quiche/quic/moqt/moqt_types.h new file mode 100644 index 0000000..4698a3c --- /dev/null +++ b/quiche/quic/moqt/moqt_types.h
@@ -0,0 +1,65 @@ +// Copyright 2026 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_MOQT_MOQT_TYPES_H_ +#define QUICHE_QUIC_MOQT_MOQT_TYPES_H_ + +#include <cstdint> + +#include "absl/strings/str_format.h" +#include "quiche/common/platform/api/quiche_export.h" +#include "quiche/common/quiche_data_writer.h" + +namespace moqt { + +inline constexpr uint64_t kMaxGroupId = quiche::kVarInt62MaxValue; +inline constexpr uint64_t kMaxObjectId = quiche::kVarInt62MaxValue; +// Location as defined in +// https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure +struct Location { + uint64_t group = 0; + uint64_t object = 0; + + Location() = default; + Location(uint64_t group, uint64_t object) : group(group), object(object) {} + + // Location order as described in + // https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure + auto operator<=>(const Location&) const = default; + + Location Next() const { + if (object == kMaxObjectId) { + if (group == kMaxObjectId) { + return Location(0, 0); + } + return Location(group + 1, 0); + } + return Location(group, object + 1); + } + + template <typename H> + friend H AbslHashValue(H h, const Location& m); + + template <typename Sink> + friend void AbslStringify(Sink& sink, const Location& sequence) { + absl::Format(&sink, "(%d; %d)", sequence.group, sequence.object); + } +}; + +template <typename H> +H AbslHashValue(H h, const Location& m) { + return H::combine(std::move(h), m.group, m.object); +} + +enum class QUICHE_EXPORT MoqtObjectStatus : uint64_t { + kNormal = 0x0, + kObjectDoesNotExist = 0x1, + kEndOfGroup = 0x3, + kEndOfTrack = 0x4, + kInvalidObjectStatus = 0x5, +}; + +} // namespace moqt + +#endif // QUICHE_QUIC_MOQT_MOQT_TYPES_H_
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h index 810a107..de9654a 100644 --- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -28,6 +28,7 @@ #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/moqt_session_callbacks.h" #include "quiche/quic/moqt/moqt_session_interface.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_weak_ptr.h" @@ -76,7 +77,7 @@ const FullTrackName& GetTrackName() const override { return track_name_; } MOCK_METHOD(std::optional<PublishedObject>, GetCachedObject, - (uint64_t, uint64_t, uint64_t), (const, override)); + (uint64_t, std::optional<uint64_t>, uint64_t), (const, override)); MOCK_METHOD(void, AddObjectListener, (MoqtObjectListener * listener), (override)); MOCK_METHOD(void, RemoveObjectListener, (MoqtObjectListener * listener), @@ -104,7 +105,8 @@ : track_name_(std::move(name)) {} const FullTrackName& GetTrackName() const override { return track_name_; } std::optional<PublishedObject> GetCachedObject( - uint64_t group, uint64_t subgroup, uint64_t object) const override { + uint64_t group, std::optional<uint64_t> subgroup, + uint64_t object) const override { Location location(group, object); auto it = objects_.find(location); if (it == objects_.end()) { @@ -158,8 +160,7 @@ largest_location_ = location; } for (MoqtObjectListener* listener : listeners_) { - listener->OnNewObjectAvailable(location, subgroup, 128, - MoqtForwardingPreference::kSubgroup); + listener->OnNewObjectAvailable(location, subgroup, 128); } } void RemoveAllSubscriptions() { @@ -304,8 +305,7 @@ MOCK_METHOD(void, OnSubscribeAccepted, (), (override)); MOCK_METHOD(void, OnSubscribeRejected, (MoqtRequestErrorInfo), (override)); MOCK_METHOD(void, OnNewObjectAvailable, - (Location, uint64_t, MoqtPriority, MoqtForwardingPreference), - (override)); + (Location, std::optional<uint64_t>, MoqtPriority), (override)); MOCK_METHOD(void, OnNewFinAvailable, (Location, uint64_t), (override)); MOCK_METHOD(void, OnSubgroupAbandoned, (uint64_t, uint64_t, webtransport::StreamErrorCode), (override));
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h index 6b0e117..6059293 100644 --- a/quiche/quic/moqt/test_tools/moqt_session_peer.h +++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -24,6 +24,7 @@ #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/moqt_track.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" #include "quiche/web_transport/web_transport.h" @@ -32,7 +33,8 @@ class MoqtDataParserPeer { public: static void SetType(MoqtDataParser* parser, MoqtDataStreamType type) { - parser->type_.emplace(std::move(type)); + parser->type_ = type; + parser->next_input_ = MoqtDataParser::NextInput::kTrackAlias; } };
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index ccf040e..813e969 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -24,6 +24,7 @@ #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/platform/api/quic_logging.h" #include "quiche/quic/platform/api/quic_test.h" #include "quiche/common/platform/api/quiche_export.h" @@ -51,6 +52,20 @@ return types; } +inline std::vector<MoqtFetchSerialization> AllMoqtFetchSerializations() { + std::vector<MoqtFetchSerialization> serializations; + for (uint64_t i = 0; i < 128; ++i) { + std::optional<MoqtFetchSerialization> value = + MoqtFetchSerialization::FromValue(i); + if (value.has_value()) { + serializations.push_back(*value); + } else { + break; + } + } + return serializations; +} + inline std::vector<MoqtDataStreamType> AllMoqtDataStreamTypes() { std::vector<MoqtDataStreamType> types; types.push_back(MoqtDataStreamType::Fetch()); @@ -136,7 +151,9 @@ } // Objects might need a different status if at the end of the stream. - virtual void MakeObjectEndOfStream() {} + virtual void MakeObjectEndOfStream() { + QUIC_LOG(INFO) << "MakeObjectEndOfStream not implemented"; + } protected: void SetWireImage(uint8_t* wire_image, size_t wire_image_size) { @@ -284,7 +301,7 @@ object_.extension_headers = datagram_type.has_extension() ? std::string(kDefaultExtensionBlob) : ""; object_.object_id = datagram_type.has_object_id() ? 6 : 0; - object_.subgroup_id = object_.object_id; + object_.subgroup_id = std::nullopt; quic::QuicDataWriter writer(sizeof(raw_packet_), reinterpret_cast<char*>(raw_packet_)); EXPECT_TRUE(writer.WriteVarInt62(datagram_type.value())); @@ -429,7 +446,7 @@ // Used only for tests that process multiple objects on one stream. class QUICHE_NO_EXPORT StreamMiddlerSubgroupMessage : public ObjectMessage { public: - StreamMiddlerSubgroupMessage(MoqtDataStreamType type) + StreamMiddlerSubgroupMessage(const MoqtDataStreamType type) : ObjectMessage(), type_(type) { SetWireImage(reinterpret_cast<uint8_t*>(raw_packet_), sizeof(raw_packet_)); if (type.SubgroupIsZero()) { @@ -473,7 +490,7 @@ } void ExpandVarints() override { - ExpandVarintsImpl("vvvvv-v-------v---", false); + ExpandVarintsImpl("vvvvvv-v-------v---", false); } bool SetPayloadLength(uint8_t payload_length) { @@ -488,10 +505,10 @@ } private: - uint8_t raw_packet_[18] = { + uint8_t raw_packet_[19] = { 0x05, // type field - 0x04, // subscribe ID - // object middler: + 0x04, // request ID + 0x3f, // object serialization flag 0x05, 0x08, 0x06, // sequence 0x07, 0x07, // publisher priority, 7B extensions 0x00, 0x0c, 0x01, 0x03, 0x66, 0x6f, 0x6f, // extensions @@ -502,22 +519,82 @@ // Used only for tests that process multiple objects on one stream. class QUICHE_NO_EXPORT StreamMiddlerFetchMessage : public ObjectMessage { public: - StreamMiddlerFetchMessage() : ObjectMessage() { - SetWireImage(raw_packet_, sizeof(raw_packet_)); - object_.subgroup_id = 8; - object_.object_id = 9; + StreamMiddlerFetchMessage(MoqtFetchSerialization serialization) + : ObjectMessage(), serialization_(serialization) { + size_t length = 0; + if (serialization.is_datagram()) { // Two byte varint. + raw_packet_[length++] = 0x40; + } + raw_packet_[length++] = static_cast<uint8_t>(serialization.value()); + if (serialization.has_group_id()) { + raw_packet_[length++] = 0x06; // group ID + object_.group_id = 6; + } + if (serialization.zero_subgroup_id()) { + object_.subgroup_id = 0; + } else if (serialization.has_subgroup_id()) { + raw_packet_[length++] = 0x0a; + object_.subgroup_id = 10; + } else if (serialization.prior_subgroup_id_plus_one()) { + if (!object_.subgroup_id.has_value()) { + QUICHE_BUG(quiche_bug_moqt_prior_subgroup_id_without_previous_subgroup) + << "prior_subgroup_id_plus_one without previous subgroup ID"; + return; + } + ++(*object_.subgroup_id); + } else if (serialization.is_datagram()) { + object_.subgroup_id = std::nullopt; + } // If prior_subgroup_id, subgroup_id is already set properly. + if (serialization.has_object_id()) { + raw_packet_[length++] = 0x0a; + object_.object_id = 10; + } else { + ++object_.object_id; + } + if (serialization.has_priority()) { + raw_packet_[length++] = 0x09; + object_.publisher_priority = MoqtPriority(0x09); + } + if (serialization.has_extensions()) { + memcpy(&raw_packet_[length], kRawExtensions.data(), + kRawExtensions.length()); + length += kRawExtensions.length(); + } else { + object_.extension_headers = ""; + } + memcpy(&raw_packet_[length], kRawPayload.data(), kRawPayload.length()); + length += kRawPayload.length(); + + SetWireImage(raw_packet_, length); } void ExpandVarints() override { - ExpandVarintsImpl("vvv-v-------v---", false); + std::string varints = "v"; + if (serialization_.has_group_id()) { + varints += "v"; + } + if (serialization_.has_subgroup_id()) { + varints += "v"; + } + if (serialization_.has_object_id()) { + varints += "v"; + } + if (serialization_.has_priority()) { + varints += "-"; + } + if (serialization_.has_extensions()) { + varints += "v-------"; + } + varints += "v---"; + ExpandVarintsImpl(varints, false); } private: - uint8_t raw_packet_[16] = { - 0x05, 0x08, 0x09, 0x07, // Object metadata - 0x07, 0x00, 0x0c, 0x01, 0x03, 0x66, 0x6f, 0x6f, // extensions - 0x03, 0x62, 0x61, 0x72, // Payload = "bar" - }; + MoqtFetchSerialization serialization_; + uint8_t raw_packet_[17]; + static constexpr absl::string_view kRawExtensions{ + "\x07\x00\x0c\x01\x03\x66\x6f\x6f", 8}; // see kDefaultExtensionBlob + static constexpr absl::string_view kRawPayload = "\x03\x62\x61\x72"; }; class QUICHE_NO_EXPORT ClientSetupMessage : public TestMessageBase { @@ -1834,6 +1911,11 @@ return std::make_unique<StreamHeaderSubgroupMessage>(type); } +static inline std::unique_ptr<TestMessageBase> CreateTestFetch( + MoqtFetchSerialization type) { + return std::make_unique<StreamMiddlerFetchMessage>(type); +} + static inline std::unique_ptr<TestMessageBase> CreateTestDatagram( MoqtDatagramType type) { return std::make_unique<ObjectDatagramMessage>(type);