Implement MoQT Peeps and Object message changes for draft-06. This is the minimum for interoperability; this code always sends subgroup_id = 0 and ignores the incoming subgroup_id. Also fixed bugs uncovered by fixing a typo in MoqtIntegrationTest. PiperOrigin-RevId: 680684190
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 6974ca6..e7042e7 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -280,43 +280,37 @@ quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader( const MoqtObject& message, bool is_first_in_stream) { - if (!message.payload_length.has_value() && - !(message.forwarding_preference == MoqtForwardingPreference::kObject || - message.forwarding_preference == MoqtForwardingPreference::kDatagram)) { - QUIC_BUG(quic_bug_serialize_object_input_01) - << "Track or Group forwarding preference requires knowing the object " - "length in advance"; + if (!ValidateObjectMetadata(message)) { + QUIC_BUG(quic_bug_serialize_object_header_01) + << "Object metadata is invalid"; return quiche::QuicheBuffer(); } - if (message.object_status != MoqtObjectStatus::kNormal && - message.payload_length.has_value() && *message.payload_length > 0) { - QUIC_BUG(quic_bug_serialize_object_input_03) - << "Object status must be kNormal if payload is non-empty"; + if (message.forwarding_preference == MoqtForwardingPreference::kDatagram) { + QUIC_BUG(quic_bug_serialize_object_header_02) + << "Datagrams use SerializeObjectDatagram()"; return quiche::QuicheBuffer(); } if (!is_first_in_stream) { switch (message.forwarding_preference) { case MoqtForwardingPreference::kTrack: - return (*message.payload_length == 0) + return (message.payload_length == 0) ? Serialize(WireVarInt62(message.group_id), WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length), + WireVarInt62(message.payload_length), WireVarInt62(message.object_status)) : Serialize(WireVarInt62(message.group_id), WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length)); - case MoqtForwardingPreference::kGroup: - return (*message.payload_length == 0) + WireVarInt62(message.payload_length)); + case MoqtForwardingPreference::kSubgroup: + return (message.payload_length == 0) ? Serialize(WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length), + WireVarInt62(message.payload_length), WireVarInt62(static_cast<uint64_t>( message.object_status))) : Serialize(WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length)); + WireVarInt62(message.payload_length)); default: - QUIC_BUG(quic_bug_serialize_object_input_02) - << "Object or Datagram forwarding_preference must be first in " - "stream"; + QUICHE_NOTREACHED(); return quiche::QuicheBuffer(); } } @@ -324,14 +318,14 @@ GetMessageTypeForForwardingPreference(message.forwarding_preference); switch (message.forwarding_preference) { case MoqtForwardingPreference::kTrack: - return (*message.payload_length == 0) + return (message.payload_length == 0) ? Serialize(WireVarInt62(message_type), WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias), WireUint8(message.publisher_priority), WireVarInt62(message.group_id), WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length), + WireVarInt62(message.payload_length), WireVarInt62(message.object_status)) : Serialize(WireVarInt62(message_type), WireVarInt62(message.subscribe_id), @@ -339,49 +333,64 @@ WireUint8(message.publisher_priority), WireVarInt62(message.group_id), WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length)); - case MoqtForwardingPreference::kGroup: - return (*message.payload_length == 0) + WireVarInt62(message.payload_length)); + case MoqtForwardingPreference::kSubgroup: + return (message.payload_length == 0) ? Serialize(WireVarInt62(message_type), WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias), WireVarInt62(message.group_id), + WireVarInt62(*message.subgroup_id), WireUint8(message.publisher_priority), WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length), + WireVarInt62(message.payload_length), WireVarInt62(message.object_status)) : Serialize(WireVarInt62(message_type), WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias), WireVarInt62(message.group_id), + WireVarInt62(*message.subgroup_id), WireUint8(message.publisher_priority), WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length)); - case MoqtForwardingPreference::kObject: + WireVarInt62(message.payload_length)); case MoqtForwardingPreference::kDatagram: - return Serialize( - WireVarInt62(message_type), WireVarInt62(message.subscribe_id), - WireVarInt62(message.track_alias), WireVarInt62(message.group_id), - WireVarInt62(message.object_id), - WireUint8(message.publisher_priority), - WireVarInt62(message.object_status)); + QUICHE_NOTREACHED(); + return quiche::QuicheBuffer(); } } quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram( const MoqtObject& message, absl::string_view payload) { - if (message.object_status != MoqtObjectStatus::kNormal && !payload.empty()) { + if (!ValidateObjectMetadata(message)) { QUIC_BUG(quic_bug_serialize_object_datagram_01) - << "Object status must be kNormal if payload is non-empty"; + << "Object metadata is invalid"; return quiche::QuicheBuffer(); } + if (message.forwarding_preference != MoqtForwardingPreference::kDatagram) { + QUIC_BUG(quic_bug_serialize_object_datagram_02) + << "Only datagrams use SerializeObjectDatagram()"; + return quiche::QuicheBuffer(); + } + if (message.payload_length != payload.length()) { + QUIC_BUG(quic_bug_serialize_object_datagram_03) + << "Payload length does not match payload"; + return quiche::QuicheBuffer(); + } + if (message.payload_length == 0) { + return Serialize( + WireVarInt62(MoqtDataStreamType::kObjectDatagram), + WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias), + WireVarInt62(message.group_id), WireVarInt62(message.object_id), + WireUint8(message.publisher_priority), + WireVarInt62(message.payload_length), + WireVarInt62(message.object_status)); + } return Serialize( WireVarInt62(MoqtDataStreamType::kObjectDatagram), WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias), WireVarInt62(message.group_id), WireVarInt62(message.object_id), WireUint8(message.publisher_priority), - WireVarInt62(static_cast<uint64_t>(message.object_status)), - WireBytes(payload)); + WireVarInt62(message.payload_length), WireBytes(payload)); } quiche::QuicheBuffer MoqtFramer::SerializeClientSetup( @@ -629,4 +638,17 @@ message.delta_from_deadline.ToMicroseconds()))); } +// static +bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object) { + if (object.object_status != MoqtObjectStatus::kNormal && + object.payload_length > 0) { + return false; + } + if ((object.forwarding_preference == MoqtForwardingPreference::kSubgroup) != + object.subgroup_id.has_value()) { + return false; + } + return true; +} + } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h index 40b4dbf..b18a77c 100644 --- a/quiche/quic/moqt/moqt_framer.h +++ b/quiche/quic/moqt/moqt_framer.h
@@ -60,6 +60,9 @@ quiche::QuicheBuffer SerializeObjectAck(const MoqtObjectAck& message); private: + // Returns true if the metadata is internally consistent. + static bool ValidateObjectMetadata(const MoqtObject& object); + quiche::QuicheBufferAllocator* allocator_; bool using_webtrans_; };
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index 510dd17..be6ae32 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -4,7 +4,6 @@ #include "quiche/quic/moqt/moqt_framer.h" -#include <cerrno> #include <cstddef> #include <cstdint> #include <memory> @@ -19,7 +18,6 @@ #include "quiche/quic/platform/api/quic_expect_bug.h" #include "quiche/quic/platform/api/quic_test.h" #include "quiche/common/quiche_buffer_allocator.h" -#include "quiche/common/quiche_text_utils.h" #include "quiche/common/simple_buffer_allocator.h" #include "quiche/common/test_tools/quiche_test_utils.h" @@ -81,7 +79,9 @@ MoqtObject adjusted_message = message; adjusted_message.payload_length = payload.size(); quiche::QuicheBuffer header = - framer.SerializeObjectHeader(adjusted_message, is_first_in_stream); + (message.forwarding_preference == MoqtForwardingPreference::kDatagram) + ? framer.SerializeObjectDatagram(adjusted_message, payload) + : framer.SerializeObjectHeader(adjusted_message, is_first_in_stream); if (header.empty()) { return quiche::QuicheBuffer(); } @@ -217,13 +217,13 @@ }; TEST_F(MoqtFramerSimpleTest, GroupMiddler) { - auto header = std::make_unique<StreamHeaderGroupMessage>(); + auto header = std::make_unique<StreamHeaderSubgroupMessage>(); auto buffer1 = SerializeObject( framer_, std::get<MoqtObject>(header->structured_data()), "foo", true); EXPECT_EQ(buffer1.size(), header->total_message_size()); EXPECT_EQ(buffer1.AsStringView(), header->PacketSample()); - auto middler = std::make_unique<StreamMiddlerGroupMessage>(); + auto middler = std::make_unique<StreamMiddlerSubgroupMessage>(); auto buffer2 = SerializeObject( framer_, std::get<MoqtObject>(middler->structured_data()), "bar", false); EXPECT_EQ(buffer2.size(), middler->total_message_size()); @@ -246,28 +246,80 @@ TEST_F(MoqtFramerSimpleTest, BadObjectInput) { MoqtObject object = { + // This is a valid object. /*subscribe_id=*/3, /*track_alias=*/4, /*group_id=*/5, /*object_id=*/6, /*publisher_priority=*/7, /*object_status=*/MoqtObjectStatus::kNormal, - /*forwarding_preference=*/MoqtForwardingPreference::kObject, - /*payload_length=*/std::nullopt, + /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup, + /*subgroup_id=*/8, + /*payload_length=*/3, }; quiche::QuicheBuffer buffer; - object.forwarding_preference = MoqtForwardingPreference::kDatagram; - EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false), - "must be first"); + + // SerializeObjectDatagram() only accepts kDatagram. + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foo"), + "Only datagrams use SerializeObjectDatagram()"); EXPECT_TRUE(buffer.empty()); - object.forwarding_preference = MoqtForwardingPreference::kGroup; + + // kSubgroup must have a subgroup_id. + object.subgroup_id = std::nullopt; EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false), - "requires knowing the object length"); + "Object metadata is invalid"); EXPECT_TRUE(buffer.empty()); - object.payload_length = 5; + object.subgroup_id = 8; + + // kTrack must not have a subgroup_id. + object.forwarding_preference = MoqtForwardingPreference::kTrack; + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false), + "Object metadata is invalid"); + EXPECT_TRUE(buffer.empty()); + object.forwarding_preference = MoqtForwardingPreference::kSubgroup; + + // Non-normal status must have no payload. object.object_status = MoqtObjectStatus::kEndOfGroup; EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false), - "Object status must be kNormal if payload is non-empty"); + "Object metadata is invalid"); + EXPECT_TRUE(buffer.empty()); + // object.object_status = MoqtObjectStatus::kNormal; +} + +TEST_F(MoqtFramerSimpleTest, BadDatagramInput) { + MoqtObject object = { + // This is a valid datagram. + /*subscribe_id=*/3, + /*track_alias=*/4, + /*group_id=*/5, + /*object_id=*/6, + /*publisher_priority=*/7, + /*object_status=*/MoqtObjectStatus::kNormal, + /*forwarding_preference=*/MoqtForwardingPreference::kDatagram, + /*subgroup_id=*/std::nullopt, + /*payload_length=*/3, + }; + quiche::QuicheBuffer buffer; + + // No datagrams to SerializeObjectHeader(). + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false), + "Datagrams use SerializeObjectDatagram()") + EXPECT_TRUE(buffer.empty()); + + object.object_status = MoqtObjectStatus::kEndOfGroup; + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foo"), + "Object metadata is invalid"); + EXPECT_TRUE(buffer.empty()); + object.object_status = MoqtObjectStatus::kNormal; + + object.subgroup_id = 8; + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foo"), + "Object metadata is invalid"); + EXPECT_TRUE(buffer.empty()); + object.subgroup_id = std::nullopt; + + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foobar"), + "Payload length does not match payload"); EXPECT_TRUE(buffer.empty()); } @@ -280,8 +332,9 @@ /*object_id=*/6, /*publisher_priority=*/7, /*object_status=*/MoqtObjectStatus::kNormal, - /*forwarding_preference=*/MoqtForwardingPreference::kObject, - /*payload_length=*/std::nullopt, + /*forwarding_preference=*/MoqtForwardingPreference::kDatagram, + /*subgroup_id=*/std::nullopt, + /*payload_length=*/3, }; std::string payload = "foo"; quiche::QuicheBuffer buffer;
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 11f6be4..d398331 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -190,7 +190,7 @@ }); auto queue = std::make_shared<MoqtOutgoingQueue>( - FullTrackName{"test", "data"}, MoqtForwardingPreference::kGroup); + FullTrackName{"test", "data"}, MoqtForwardingPreference::kSubgroup); MoqtKnownTrackPublisher known_track_publisher; known_track_publisher.Add(queue); client_->session()->set_publisher(&known_track_publisher); @@ -215,7 +215,7 @@ EXPECT_EQ(group_sequence, 0u); EXPECT_EQ(object_sequence, 0u); EXPECT_EQ(status, MoqtObjectStatus::kNormal); - EXPECT_EQ(forwarding_preference, MoqtForwardingPreference::kGroup); + EXPECT_EQ(forwarding_preference, MoqtForwardingPreference::kSubgroup); EXPECT_EQ(object, "object data"); EXPECT_TRUE(end_of_message); received_object = true; @@ -232,15 +232,14 @@ server_->session()->set_publisher(&publisher); for (MoqtForwardingPreference forwarding_preference : - {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kGroup, - MoqtForwardingPreference::kObject, + {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kSubgroup, MoqtForwardingPreference::kDatagram}) { SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference)); MockRemoteTrackVisitor client_visitor; std::string name = absl::StrCat("pref_", static_cast<int>(forwarding_preference)); auto queue = std::make_shared<MoqtOutgoingQueue>( - FullTrackName{"test", name}, MoqtForwardingPreference::kObject); + FullTrackName{"test", name}, MoqtForwardingPreference::kSubgroup); publisher.Add(queue); queue->AddObject(MemSliceFromString("object 1"), /*key=*/true); queue->AddObject(MemSliceFromString("object 2"), /*key=*/false); @@ -294,15 +293,14 @@ server_->session()->set_publisher(&publisher); for (MoqtForwardingPreference forwarding_preference : - {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kGroup, - MoqtForwardingPreference::kObject, + {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kSubgroup, MoqtForwardingPreference::kDatagram}) { SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference)); MockRemoteTrackVisitor client_visitor; std::string name = absl::StrCat("pref_", static_cast<int>(forwarding_preference)); auto queue = std::make_shared<MoqtOutgoingQueue>( - FullTrackName{"test", name}, MoqtForwardingPreference::kObject); + FullTrackName{"test", name}, forwarding_preference); publisher.Add(queue); for (int i = 0; i < 100; ++i) { queue->AddObject(MemSliceFromString("object"), /*key=*/true);
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc index 5fdcf65..6cee208 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue.cc +++ b/quiche/quic/moqt/moqt_live_relay_queue.cc
@@ -23,6 +23,9 @@ namespace moqt { +// TODO(martinduke): Accept subgroup ID. +// TODO(martinduke): Accept publisher priority. +// TODO(martinduke): Unless Track Forwarding preference goes away, support it. bool MoqtLiveRelayQueue::AddObject(uint64_t group_id, uint64_t object_id, MoqtObjectStatus status, absl::string_view object) {
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h index 8c3fbe3..ae2a58e 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue.h +++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -38,7 +38,8 @@ explicit MoqtLiveRelayQueue(FullTrackName track, MoqtForwardingPreference forwarding_preference) : track_(std::move(track)), - forwarding_preference_(forwarding_preference) {} + forwarding_preference_(forwarding_preference), + next_sequence_(0, 0) {} MoqtLiveRelayQueue(const MoqtLiveRelayQueue&) = delete; MoqtLiveRelayQueue(MoqtLiveRelayQueue&&) = default;
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc index 6d4e765..3734e48 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc +++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -24,7 +24,7 @@ public: TestMoqtLiveRelayQueue() : MoqtLiveRelayQueue(FullTrackName{"test", "track"}, - MoqtForwardingPreference::kGroup) { + MoqtForwardingPreference::kSubgroup) { AddObjectListener(this); }
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index 5c2e503..95d8413 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -4,6 +4,7 @@ #include "quiche/quic/moqt/moqt_messages.h" +#include <cstdint> #include <string> #include <vector> @@ -19,7 +20,8 @@ namespace moqt { MoqtObjectStatus IntegerToObjectStatus(uint64_t integer) { - if (integer >= 0x5) { + if (integer >= + static_cast<uint64_t>(MoqtObjectStatus::kInvalidObjectStatus)) { return MoqtObjectStatus::kInvalidObjectStatus; } return static_cast<MoqtObjectStatus>(integer); @@ -113,14 +115,12 @@ std::string MoqtDataStreamTypeToString(MoqtDataStreamType type) { switch (type) { - case MoqtDataStreamType::kObjectStream: - return "OBJECT_STREAM"; case MoqtDataStreamType::kObjectDatagram: return "OBJECT_PREFER_DATAGRAM"; case MoqtDataStreamType::kStreamHeaderTrack: return "STREAM_HEADER_TRACK"; - case MoqtDataStreamType::kStreamHeaderGroup: - return "STREAM_HEADER_GROUP"; + case MoqtDataStreamType::kStreamHeaderSubgroup: + return "STREAM_HEADER_SUBGROUP"; case MoqtDataStreamType::kPadding: return "PADDING"; } @@ -130,14 +130,12 @@ std::string MoqtForwardingPreferenceToString( MoqtForwardingPreference preference) { switch (preference) { - case MoqtForwardingPreference::kObject: - return "OBJECT"; case MoqtForwardingPreference::kDatagram: return "DATAGRAM"; case MoqtForwardingPreference::kTrack: return "TRACK"; - case MoqtForwardingPreference::kGroup: - return "GROUP"; + case MoqtForwardingPreference::kSubgroup: + return "SUBGROUP"; } QUIC_BUG(quic_bug_bad_moqt_message_type_01) << "Unknown preference " << std::to_string(static_cast<int>(preference)); @@ -146,37 +144,33 @@ MoqtForwardingPreference GetForwardingPreference(MoqtDataStreamType type) { switch (type) { - case MoqtDataStreamType::kObjectStream: - return MoqtForwardingPreference::kObject; case MoqtDataStreamType::kObjectDatagram: return MoqtForwardingPreference::kDatagram; case MoqtDataStreamType::kStreamHeaderTrack: return MoqtForwardingPreference::kTrack; - case MoqtDataStreamType::kStreamHeaderGroup: - return MoqtForwardingPreference::kGroup; + case MoqtDataStreamType::kStreamHeaderSubgroup: + return MoqtForwardingPreference::kSubgroup; default: break; } QUIC_BUG(quic_bug_bad_moqt_message_type_02) << "Message type does not indicate forwarding preference"; - return MoqtForwardingPreference::kObject; + return MoqtForwardingPreference::kSubgroup; }; MoqtDataStreamType GetMessageTypeForForwardingPreference( MoqtForwardingPreference preference) { switch (preference) { - case MoqtForwardingPreference::kObject: - return MoqtDataStreamType::kObjectStream; case MoqtForwardingPreference::kDatagram: return MoqtDataStreamType::kObjectDatagram; case MoqtForwardingPreference::kTrack: return MoqtDataStreamType::kStreamHeaderTrack; - case MoqtForwardingPreference::kGroup: - return MoqtDataStreamType::kStreamHeaderGroup; + case MoqtForwardingPreference::kSubgroup: + return MoqtDataStreamType::kStreamHeaderSubgroup; } QUIC_BUG(quic_bug_bad_moqt_message_type_03) << "Forwarding preference does not indicate message type"; - return MoqtDataStreamType::kObjectStream; + return MoqtDataStreamType::kStreamHeaderSubgroup; } std::string FullTrackName::ToString() const {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 4a08832..57c26ab 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -65,10 +65,9 @@ inline constexpr size_t kMaxMessageHeaderSize = 2048; enum class QUICHE_EXPORT MoqtDataStreamType : uint64_t { - kObjectStream = 0x00, kObjectDatagram = 0x01, - kStreamHeaderTrack = 0x50, - kStreamHeaderGroup = 0x51, + kStreamHeaderTrack = 0x02, + kStreamHeaderSubgroup = 0x04, // Currently QUICHE-specific. All data on a kPadding stream is ignored. kPadding = 0x26d3, @@ -218,11 +217,20 @@ // These are absolute sequence numbers. struct FullSequence { - uint64_t group = 0; - uint64_t object = 0; + uint64_t group; + uint64_t subgroup; + uint64_t object; + FullSequence() : FullSequence(0, 0) {} + // There is a lot of code from before subgroups. Assume there's one subgroup + // with ID 0 per group. + FullSequence(uint64_t group, uint64_t object) + : FullSequence(group, 0, object) {} + FullSequence(uint64_t group, uint64_t subgroup, uint64_t object) + : group(group), subgroup(subgroup), object(object) {} bool operator==(const FullSequence& other) const { return group == other.group && object == other.object; } + // These are temporal ordering comparisons, so subgroup ID doesn't matter. bool operator<(const FullSequence& other) const { return group < other.group || (group == other.group && object < other.object); @@ -237,7 +245,9 @@ object = other.object; return *this; } - FullSequence next() const { return FullSequence{group, object + 1}; } + FullSequence next() const { + return FullSequence{group, subgroup, object + 1}; + } template <typename H> friend H AbslHashValue(H h, const FullSequence& m); @@ -247,6 +257,11 @@ } }; +struct SubgroupPriority { + uint8_t publisher_priority = 0xf0; + uint64_t subgroup_id = 0; +}; + template <typename H> H AbslHashValue(H h, const FullSequence& m) { return H::combine(std::move(h), m.group, m.object); @@ -268,11 +283,10 @@ }; // These codes do not appear on the wire. -enum class QUICHE_EXPORT MoqtForwardingPreference : uint8_t { - kTrack = 0x0, - kGroup = 0x1, - kObject = 0x2, - kDatagram = 0x3, +enum class QUICHE_EXPORT MoqtForwardingPreference { + kTrack, + kSubgroup, + kDatagram, }; enum class QUICHE_EXPORT MoqtObjectStatus : uint64_t { @@ -281,14 +295,14 @@ kGroupDoesNotExist = 0x2, kEndOfGroup = 0x3, kEndOfTrack = 0x4, - kInvalidObjectStatus = 0x5, + kEndOfSubgroup = 0x5, + kInvalidObjectStatus = 0x6, }; MoqtObjectStatus IntegerToObjectStatus(uint64_t integer); // The data contained in every Object message, although the message type -// implies some of the values. |payload_length| has no value if the length -// is unknown (because it runs to the end of the stream.) +// implies some of the values. struct QUICHE_EXPORT MoqtObject { uint64_t subscribe_id; uint64_t track_alias; @@ -297,7 +311,8 @@ MoqtPriority publisher_priority; MoqtObjectStatus object_status; MoqtForwardingPreference forwarding_preference; - std::optional<uint64_t> payload_length; + std::optional<uint64_t> subgroup_id; + uint64_t payload_length; }; enum class QUICHE_EXPORT MoqtFilterType : uint64_t {
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc index d7a9785..f7cf5e6 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -28,7 +28,7 @@ public: TestMoqtOutgoingQueue() : MoqtOutgoingQueue(FullTrackName{"test", "track"}, - MoqtForwardingPreference::kGroup) { + MoqtForwardingPreference::kSubgroup) { AddObjectListener(this); }
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index 541c550..51658d4 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -52,7 +52,7 @@ bool IsAllowedStreamType(uint64_t value) { constexpr std::array kAllowedStreamTypes = { - MoqtDataStreamType::kObjectStream, MoqtDataStreamType::kStreamHeaderGroup, + MoqtDataStreamType::kStreamHeaderSubgroup, MoqtDataStreamType::kStreamHeaderTrack, MoqtDataStreamType::kPadding}; for (MoqtDataStreamType type : kAllowedStreamTypes) { if (static_cast<uint64_t>(type) == value) { @@ -72,18 +72,24 @@ !reader.ReadVarInt62(&object.group_id)) { return 0; } - if (type != MoqtDataStreamType::kStreamHeaderTrack && - type != MoqtDataStreamType::kStreamHeaderGroup && + if (type == MoqtDataStreamType::kStreamHeaderSubgroup) { + uint64_t subgroup_id; + if (!reader.ReadVarInt62(&subgroup_id)) { + return 0; + } + object.subgroup_id = subgroup_id; + } + if (type == MoqtDataStreamType::kObjectDatagram && !reader.ReadVarInt62(&object.object_id)) { return 0; } if (!reader.ReadUInt8(&object.publisher_priority)) { return 0; } - uint64_t status = 0; - if ((type == MoqtDataStreamType::kObjectStream || - type == MoqtDataStreamType::kObjectDatagram) && - !reader.ReadVarInt62(&status)) { + uint64_t status = static_cast<uint64_t>(MoqtObjectStatus::kNormal); + if (type == MoqtDataStreamType::kObjectDatagram && + (!reader.ReadVarInt62(&object.payload_length) || + (object.payload_length == 0 && !reader.ReadVarInt62(&status)))) { return 0; } object.object_status = IntegerToObjectStatus(status); @@ -100,15 +106,13 @@ } [[fallthrough]]; - case MoqtDataStreamType::kStreamHeaderGroup: { - uint64_t length; + case MoqtDataStreamType::kStreamHeaderSubgroup: { if (!reader.ReadVarInt62(&object.object_id) || - !reader.ReadVarInt62(&length)) { + !reader.ReadVarInt62(&object.payload_length)) { return 0; } - object.payload_length = length; - uint64_t status = 0; - if (length == 0 && !reader.ReadVarInt62(&status)) { + uint64_t status = static_cast<uint64_t>(MoqtObjectStatus::kNormal); + if (object.payload_length == 0 && !reader.ReadVarInt62(&status)) { return 0; } object.object_status = IntegerToObjectStatus(status); @@ -902,11 +906,6 @@ return; } - // Annoying path (going away soon): handle kObjectStream receiving a FIN. - if (data.empty() && fin && type_ == MoqtDataStreamType::kObjectStream) { - visitor_.OnObjectMessage(*metadata_, "", true); - } - // Sad path: there is already data buffered. Attempt to transfer a small // chunk from `data` into the buffer, in hope that it will make the contents // of the buffer parsable without any leftover data. This is a reasonable @@ -915,8 +914,7 @@ while (!buffered_message_.empty() && !data.empty()) { absl::string_view chunk = data.substr(0, chunk_size_); absl::StrAppend(&buffered_message_, chunk); - absl::string_view unprocessed = - ProcessDataInner(buffered_message_, fin && data.size() == chunk.size()); + absl::string_view unprocessed = ProcessDataInner(buffered_message_); if (unprocessed.size() >= chunk.size()) { // chunk didn't allow any processing at all. data.remove_prefix(chunk.size()); @@ -928,7 +926,7 @@ // Happy path: there is no buffered data. if (buffered_message_.empty() && !data.empty()) { - buffered_message_.assign(ProcessDataInner(data, fin)); + buffered_message_.assign(ProcessDataInner(data)); } if (fin) { @@ -941,8 +939,7 @@ } } -absl::string_view MoqtDataParser::ProcessDataInner(absl::string_view data, - bool fin) { +absl::string_view MoqtDataParser::ProcessDataInner(absl::string_view data) { quic::QuicDataReader reader(data); while (!reader.IsDoneReading()) { absl::string_view remainder = reader.PeekRemainingPayload(); @@ -966,11 +963,6 @@ if (bytes_read == 0) { return remainder; } - if (type_ == MoqtDataStreamType::kObjectStream && - header.object_status == MoqtObjectStatus::kInvalidObjectStatus) { - ParseError("Invalid object status"); - return ""; - } metadata_ = header; continue; } @@ -985,22 +977,14 @@ ParseError("Invalid object status provided"); return ""; } - payload_length_remaining_ = *metadata_->payload_length; + payload_length_remaining_ = metadata_->payload_length; + if (payload_length_remaining_ == 0) { + visitor_.OnObjectMessage(*metadata_, "", true); + } continue; } case kData: { - if (payload_length_remaining_ == 0) { - // Special case: kObject, which does not have explicit length. - if (metadata_->object_status != MoqtObjectStatus::kNormal) { - ParseError("Object with non-normal status has payload"); - return ""; - } - visitor_.OnObjectMessage(*metadata_, reader.PeekRemainingPayload(), - fin); - return ""; - } - absl::string_view payload = reader.ReadAtMost(payload_length_remaining_); visitor_.OnObjectMessage(*metadata_, payload,
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index 918573a..876b27a 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -206,8 +206,7 @@ if (!metadata_.has_value()) { return kHeader; } - if (payload_length_remaining_ > 0 || - *type_ == MoqtDataStreamType::kObjectStream) { + if (payload_length_remaining_ > 0) { return kData; } return kSubheader; @@ -215,8 +214,7 @@ // Processes all that can be entirely processed, and returns the view for the // data that needs to be buffered. - // TODO: remove the `fin` argument once kObjectStream is gone. - absl::string_view ProcessDataInner(absl::string_view data, bool fin); + absl::string_view ProcessDataInner(absl::string_view data); void ParseError(absl::string_view reason);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index 633f83f..6c34083 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -41,9 +41,9 @@ MoqtMessageType::kServerSetup, MoqtMessageType::kGoAway, MoqtMessageType::kMaxSubscribeId, MoqtMessageType::kObjectAck, }; -constexpr std::array kDataStreamTypes{MoqtDataStreamType::kObjectStream, - MoqtDataStreamType::kStreamHeaderTrack, - MoqtDataStreamType::kStreamHeaderGroup}; +constexpr std::array kDataStreamTypes{ + MoqtDataStreamType::kStreamHeaderTrack, + MoqtDataStreamType::kStreamHeaderSubgroup}; using GeneralizedMessageType = absl::variant<MoqtMessageType, MoqtDataStreamType>; @@ -325,10 +325,6 @@ } TEST_P(MoqtParserTest, EarlyFin) { - if (message_type_ == - GeneralizedMessageType(MoqtDataStreamType::kObjectStream)) { - return; - } std::unique_ptr<TestMessageBase> message = MakeMessage(); size_t first_data_size = message->total_message_size() - 1; ProcessData(message->PacketSample().substr(0, first_data_size), true); @@ -339,10 +335,6 @@ } TEST_P(MoqtParserTest, SeparateEarlyFin) { - if (message_type_ == - GeneralizedMessageType(MoqtDataStreamType::kObjectStream)) { - return; - } std::unique_ptr<TestMessageBase> message = MakeMessage(); size_t first_data_size = message->total_message_size() - 1; ProcessData(message->PacketSample().substr(0, first_data_size), false); @@ -366,29 +358,12 @@ static constexpr bool kRawQuic = false; }; -TEST_F(MoqtMessageSpecificTest, ObjectStreamSeparateFin) { - // OBJECT can return on an unknown-length message even without receiving a - // FIN. - MoqtDataParser parser(&visitor_); - auto message = std::make_unique<ObjectStreamMessage>(); - parser.ProcessData(message->PacketSample(), false); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); - EXPECT_EQ(visitor_.object_payload(), "foo"); - EXPECT_FALSE(visitor_.end_of_message_); - - parser.ProcessData(absl::string_view(), true); // send the FIN - EXPECT_EQ(visitor_.messages_received_, 1); - EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); - EXPECT_TRUE(visitor_.end_of_message_); - EXPECT_FALSE(visitor_.parsing_error_.has_value()); -} - // Send the header + some payload, pure payload, then pure payload to end the // message. TEST_F(MoqtMessageSpecificTest, ThreePartObject) { MoqtDataParser parser(&visitor_); - auto message = std::make_unique<ObjectStreamMessage>(); + auto message = std::make_unique<StreamHeaderSubgroupMessage>(); + EXPECT_TRUE(message->SetPayloadLength(14)); parser.ProcessData(message->PacketSample(), false); EXPECT_EQ(visitor_.messages_received_, 0); EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); @@ -414,23 +389,24 @@ // Send the part of header, rest of header + payload, plus payload. TEST_F(MoqtMessageSpecificTest, ThreePartObjectFirstIncomplete) { MoqtDataParser parser(&visitor_); - auto message = std::make_unique<ObjectStreamMessage>(); + auto message = std::make_unique<StreamHeaderSubgroupMessage>(); + EXPECT_TRUE(message->SetPayloadLength(50)); // first part parser.ProcessData(message->PacketSample().substr(0, 4), false); EXPECT_EQ(visitor_.messages_received_, 0); // second part. Add padding to it. - message->set_wire_image_size(100); + message->set_wire_image_size(55); parser.ProcessData( message->PacketSample().substr(4, message->total_message_size() - 4), false); EXPECT_EQ(visitor_.messages_received_, 0); EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); EXPECT_FALSE(visitor_.end_of_message_); - // The value "93" is the overall wire image size of 100 minus the non-payload + // The value "47" is the overall wire image size of 55 minus the non-payload // part of the message. - EXPECT_EQ(visitor_.object_payload().length(), 93); + EXPECT_EQ(visitor_.object_payload().length(), 47); // third part includes FIN parser.ProcessData("bar", true); @@ -441,10 +417,10 @@ EXPECT_FALSE(visitor_.parsing_error_.has_value()); } -TEST_F(MoqtMessageSpecificTest, StreamHeaderGroupFollowOn) { +TEST_F(MoqtMessageSpecificTest, StreamHeaderSubgroupFollowOn) { MoqtDataParser parser(&visitor_); // first part - auto message1 = std::make_unique<StreamHeaderGroupMessage>(); + auto message1 = std::make_unique<StreamHeaderSubgroupMessage>(); parser.ProcessData(message1->PacketSample(), false); EXPECT_EQ(visitor_.messages_received_, 1); EXPECT_TRUE(message1->EqualFieldValues(*visitor_.last_message_)); @@ -453,7 +429,7 @@ EXPECT_FALSE(visitor_.parsing_error_.has_value()); // second part visitor_.object_payloads_.clear(); - auto message2 = std::make_unique<StreamMiddlerGroupMessage>(); + auto message2 = std::make_unique<StreamMiddlerSubgroupMessage>(); parser.ProcessData(message2->PacketSample(), false); EXPECT_EQ(visitor_.messages_received_, 2); EXPECT_TRUE(message2->EqualFieldValues(*visitor_.last_message_)); @@ -832,7 +808,7 @@ TEST_F(MoqtMessageSpecificTest, FinMidPayload) { MoqtDataParser parser(&visitor_); - auto message = std::make_unique<StreamHeaderGroupMessage>(); + auto message = std::make_unique<StreamHeaderSubgroupMessage>(); parser.ProcessData( message->PacketSample().substr(0, message->total_message_size() - 1), true); @@ -863,28 +839,18 @@ EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } -TEST_F(MoqtMessageSpecificTest, NonNormalObjectHasPayload) { - MoqtDataParser parser(&visitor_); - char object_stream[] = { - 0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x02, // varints - 0x66, 0x6f, 0x6f, // payload = "foo" - }; - parser.ProcessData(absl::string_view(object_stream, sizeof(object_stream)), - false); - EXPECT_EQ(visitor_.parsing_error_, - "Object with non-normal status has payload"); - EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); -} - TEST_F(MoqtMessageSpecificTest, InvalidObjectStatus) { MoqtDataParser parser(&visitor_); - char object_stream[] = { - 0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x06, // varints - 0x66, 0x6f, 0x6f, // payload = "foo" + char stream_header_subgroup[] = { + 0x04, // type field + 0x03, 0x04, 0x05, 0x08, // varints + 0x07, // publisher priority + 0x06, 0x00, 0x0f, // object middler; status = 0x0f }; - parser.ProcessData(absl::string_view(object_stream, sizeof(object_stream)), - false); - EXPECT_EQ(visitor_.parsing_error_, "Invalid object status"); + parser.ProcessData( + absl::string_view(stream_header_subgroup, sizeof(stream_header_subgroup)), + false); + EXPECT_EQ(visitor_.parsing_error_, "Invalid object status provided"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -1232,7 +1198,7 @@ } TEST_F(MoqtMessageSpecificTest, WrongMessageInDatagram) { - ObjectStreamMessage message; + StreamHeaderSubgroupMessage message; MoqtObject object; absl::string_view payload = ParseDatagram(message.PacketSample(), object); EXPECT_TRUE(payload.empty());
diff --git a/quiche/quic/moqt/moqt_priority.cc b/quiche/quic/moqt/moqt_priority.cc index 73e0171..22a8f7a 100644 --- a/quiche/quic/moqt/moqt_priority.cc +++ b/quiche/quic/moqt/moqt_priority.cc
@@ -51,17 +51,17 @@ webtransport::SendOrder SendOrderForStream(MoqtPriority subscriber_priority, MoqtPriority publisher_priority, uint64_t group_id, - uint64_t object_id, + uint64_t subgroup_id, MoqtDeliveryOrder delivery_order) { const int64_t track_bits = (Flip<8>(subscriber_priority) << 54) | (Flip<8>(publisher_priority) << 46); group_id = OnlyLowestNBits<26>(group_id); - object_id = OnlyLowestNBits<20>(object_id); + subgroup_id = OnlyLowestNBits<20>(subgroup_id); if (delivery_order == MoqtDeliveryOrder::kAscending) { group_id = Flip<26>(group_id); } - object_id = Flip<20>(object_id); // Object ID is always ascending. - return track_bits | (group_id << 20) | object_id; + subgroup_id = Flip<20>(subgroup_id); + return track_bits | (group_id << 20) | subgroup_id; } webtransport::SendOrder UpdateSendOrderForSubscriberPriority(
diff --git a/quiche/quic/moqt/moqt_priority.h b/quiche/quic/moqt/moqt_priority.h index 0dfd490..15f8a55 100644 --- a/quiche/quic/moqt/moqt_priority.h +++ b/quiche/quic/moqt/moqt_priority.h
@@ -30,7 +30,7 @@ uint64_t group_id, MoqtDeliveryOrder delivery_order); QUICHE_EXPORT webtransport::SendOrder SendOrderForStream( MoqtPriority subscriber_priority, MoqtPriority publisher_priority, - uint64_t group_id, uint64_t object_id, MoqtDeliveryOrder delivery_order); + uint64_t group_id, uint64_t subgroup_id, MoqtDeliveryOrder delivery_order); // Returns |send_order| updated with the new |subscriber_priority|. QUICHE_EXPORT webtransport::SendOrder UpdateSendOrderForSubscriberPriority(
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 723162c..656b8d0 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -191,10 +191,6 @@ void MoqtSession::OnDatagramReceived(absl::string_view datagram) { MoqtObject message; absl::string_view payload = ParseDatagram(datagram, message); - if (payload.empty()) { - Error(MoqtError::kProtocolViolation, "Malformed datagram"); - return; - } QUICHE_DLOG(INFO) << ENDPOINT << "Received OBJECT message in datagram for subscribe_id " << message.subscribe_id << " for track alias " @@ -892,23 +888,23 @@ void MoqtSession::IncomingDataStream::OnObjectMessage(const MoqtObject& message, absl::string_view payload, bool end_of_message) { - QUICHE_DVLOG(1) - << ENDPOINT << "Received OBJECT message on stream " - << stream_->GetStreamId() << " for subscribe_id " << message.subscribe_id - << " for track alias " << message.track_alias << " with sequence " - << message.group_id << ":" << message.object_id << " priority " - << message.publisher_priority << " forwarding_preference " - << MoqtForwardingPreferenceToString(message.forwarding_preference) - << " length " << payload.size() << " explicit length " - << (message.payload_length.has_value() ? (int)*message.payload_length - : -1) - << (end_of_message ? "F" : ""); + QUICHE_DVLOG(1) << ENDPOINT << "Received OBJECT message on stream " + << stream_->GetStreamId() << " for subscribe_id " + << message.subscribe_id << " for track alias " + << message.track_alias << " with sequence " + << message.group_id << ":" << message.object_id + << " priority " << message.publisher_priority + << " forwarding_preference " + << MoqtForwardingPreferenceToString( + message.forwarding_preference) + << " length " << payload.size() << " length " + << message.payload_length << (end_of_message ? "F" : ""); if (!session_->parameters_.deliver_partial_objects) { if (!end_of_message) { // Buffer partial object. - if (partial_object_.empty() && message.payload_length.has_value()) { + if (partial_object_.empty()) { // Avoid redundant allocations by reserving the appropriate amount of // memory if known. - partial_object_.reserve(*message.payload_length); + partial_object_.reserve(message.payload_length); } absl::StrAppend(&partial_object_, payload); return; @@ -1027,14 +1023,6 @@ if (stream_id.has_value()) { raw_stream = session_->session_->GetStreamById(*stream_id); } else { - if (!window_.IsStreamProvokingObject(sequence, forwarding_preference)) { - QUIC_DLOG(INFO) << ENDPOINT << "Received object " << sequence - << ", but there is no stream that it can be mapped to"; - // It is possible that the we are getting notified of objects out of - // order, but we still have to send objects in a manner consistent with - // the forwarding preference used. - return; - } raw_stream = session_->OpenOrQueueDataStream(subscription_id_, sequence); } if (raw_stream == nullptr) { @@ -1089,13 +1077,9 @@ return SendOrderForStream(subscriber_priority_, publisher_priority, /*group_id=*/0, delivery_order); break; - case MoqtForwardingPreference::kGroup: + case MoqtForwardingPreference::kSubgroup: return SendOrderForStream(subscriber_priority_, publisher_priority, - sequence.group, delivery_order); - break; - case MoqtForwardingPreference::kObject: - return SendOrderForStream(subscriber_priority_, publisher_priority, - sequence.group, sequence.object, + sequence.group, sequence.subgroup, delivery_order); break; case MoqtForwardingPreference::kDatagram: @@ -1270,6 +1254,11 @@ header.publisher_priority = publisher.GetPublisherPriority(); header.object_status = object.status; header.forwarding_preference = forwarding_preference; + // TODO(martinduke): send values other than 0. + header.subgroup_id = + (forwarding_preference == MoqtForwardingPreference::kSubgroup) + ? 0 + : std::optional<uint64_t>(); header.payload_length = object.payload.length(); quiche::QuicheBuffer serialized_header = @@ -1288,19 +1277,15 @@ !subscription.InWindow(next_object_); break; - case MoqtForwardingPreference::kGroup: + case MoqtForwardingPreference::kSubgroup: ++next_object_.object; fin = object.status == MoqtObjectStatus::kEndOfTrack || object.status == MoqtObjectStatus::kEndOfGroup || + object.status == MoqtObjectStatus::kEndOfSubgroup || object.status == MoqtObjectStatus::kGroupDoesNotExist || !subscription.InWindow(next_object_); break; - case MoqtForwardingPreference::kObject: - QUICHE_DCHECK(!stream_header_written_); - fin = true; - break; - case MoqtForwardingPreference::kDatagram: QUICHE_NOTREACHED(); break; @@ -1347,6 +1332,8 @@ header.publisher_priority = track_publisher_->GetPublisherPriority(); header.object_status = object->status; header.forwarding_preference = MoqtForwardingPreference::kDatagram; + header.subgroup_id = std::nullopt; + header.payload_length = object->payload.length(); quiche::QuicheBuffer datagram = session_->framer_.SerializeObjectDatagram( header, object->payload.AsStringView()); session_->session_->SendOrQueueDatagram(datagram.AsStringView());
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 994e85a..106c172 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -717,7 +717,8 @@ /*object_sequence=*/0, /*publisher_priority=*/0, /*object_status=*/MoqtObjectStatus::kNormal, - /*forwarding_preference=*/MoqtForwardingPreference::kGroup, + /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup, + /*subgroup_id=*/0, /*payload_length=*/8, }; webtransport::test::MockStream mock_stream; @@ -742,7 +743,8 @@ /*object_sequence=*/0, /*publisher_priority=*/0, /*object_status=*/MoqtObjectStatus::kNormal, - /*forwarding_preference=*/MoqtForwardingPreference::kGroup, + /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup, + /*subgroup_id=*/0, /*payload_length=*/16, }; webtransport::test::MockStream mock_stream; @@ -772,7 +774,8 @@ /*object_sequence=*/0, /*publisher_priority=*/0, /*object_status=*/MoqtObjectStatus::kNormal, - /*forwarding_preference=*/MoqtForwardingPreference::kGroup, + /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup, + /*subgroup_id=*/0, /*payload_length=*/16, }; webtransport::test::MockStream mock_stream; @@ -809,7 +812,8 @@ /*object_sequence=*/0, /*publisher_priority=*/0, /*object_status=*/MoqtObjectStatus::kNormal, - /*forwarding_preference=*/MoqtForwardingPreference::kGroup, + /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup, + /*subgroup_id=*/0, /*payload_length=*/8, }; webtransport::test::MockStream mock_stream; @@ -867,7 +871,8 @@ /*object_sequence=*/0, /*publisher_priority=*/0, /*object_status=*/MoqtObjectStatus::kNormal, - /*forwarding_preference=*/MoqtForwardingPreference::kGroup, + /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup, + /*subgroup_id=*/0, /*payload_length=*/8, }; webtransport::test::MockStream mock_stream; @@ -928,7 +933,8 @@ /*object_sequence=*/0, /*publisher_priority=*/0, /*object_status=*/MoqtObjectStatus::kNormal, - /*forwarding_preference=*/MoqtForwardingPreference::kGroup, + /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup, + /*subgroup_id=*/0, /*payload_length=*/8, }; webtransport::test::MockStream mock_stream; @@ -948,7 +954,7 @@ EXPECT_CALL(mock_stream, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); object_stream->OnObjectMessage(object, payload, true); - object.forwarding_preference = MoqtForwardingPreference::kObject; + object.forwarding_preference = MoqtForwardingPreference::kTrack; ++object.object_id; EXPECT_CALL(mock_session_, CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation), @@ -980,7 +986,8 @@ /*object_sequence=*/0, /*publisher_priority=*/0, /*object_status=*/MoqtObjectStatus::kNormal, - /*forwarding_preference=*/MoqtForwardingPreference::kGroup, + /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup, + /*subgroup_id=*/0, /*payload_length=*/8, }; webtransport::test::MockStream mock_stream; @@ -1004,7 +1011,7 @@ MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor, 2); // The track already exists, and has a different forwarding preference. MoqtSessionPeer::remote_track(&session_, 2) - .CheckForwardingPreference(MoqtForwardingPreference::kObject); + .CheckForwardingPreference(MoqtForwardingPreference::kTrack); // SUBSCRIBE_OK arrives MoqtSubscribeOk ok = { @@ -1025,7 +1032,7 @@ TEST_F(MoqtSessionTest, CreateIncomingDataStreamAndSend) { FullTrackName ftn("foo", "bar"); - auto track = SetupPublisher(ftn, MoqtForwardingPreference::kObject, + auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, FullSequence(4, 2)); MoqtObjectListener* subscription = MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0); @@ -1052,7 +1059,7 @@ // Verify first six message fields are sent correctly bool correct_message = false; - const std::string kExpectedMessage = {0x00, 0x00, 0x02, 0x05, 0x00, 0x00}; + const std::string kExpectedMessage = {0x04, 0x00, 0x02, 0x05, 0x00, 0x00}; EXPECT_CALL(mock_stream, Writev(_, _)) .WillOnce([&](absl::Span<const absl::string_view> data, const quiche::StreamWriteOptions& options) { @@ -1075,8 +1082,8 @@ TEST_F(MoqtSessionTest, UnidirectionalStreamCannotBeOpened) { FullTrackName ftn("foo", "bar"); - auto track = - SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(4, 2)); + auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, + FullSequence(4, 2)); MoqtObjectListener* subscription = MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0); @@ -1118,8 +1125,8 @@ TEST_F(MoqtSessionTest, OutgoingStreamDisappears) { FullTrackName ftn("foo", "bar"); - auto track = - SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(4, 2)); + auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, + FullSequence(4, 2)); MoqtObjectListener* subscription = MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0); @@ -1277,7 +1284,7 @@ // Publish in window. bool correct_message = false; uint8_t kExpectedMessage[] = { - 0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x00, 0x64, + 0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x08, 0x64, 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66, }; EXPECT_CALL(mock_session_, SendOrQueueDatagram(_)) @@ -1311,9 +1318,10 @@ /*publisher_priority=*/0, /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kDatagram, + /*subgroup_id=*/std::nullopt, /*payload_length=*/8, }; - char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x64, + char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x08, 0x64, 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66}; EXPECT_CALL(visitor_, OnObjectFragment(ftn, object.group_id, object.object_id, @@ -1335,7 +1343,8 @@ /*object_sequence=*/0, /*publisher_priority=*/0, /*object_status=*/MoqtObjectStatus::kNormal, - /*forwarding_preference=*/MoqtForwardingPreference::kGroup, + /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup, + /*subgroup_id=*/0, /*payload_length=*/8, }; webtransport::test::MockStream mock_stream; @@ -1410,8 +1419,8 @@ TEST_F(MoqtSessionTest, QueuedStreamsOpenedInOrder) { FullTrackName ftn("foo", "bar"); - auto track = - SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0)); + auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, + FullSequence(0, 0)); EXPECT_CALL(*track, GetTrackStatus()) .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun)); MoqtObjectListener* subscription = @@ -1482,22 +1491,22 @@ EXPECT_CALL(mock_stream0, Writev(_, _)) .WillOnce([&](absl::Span<const absl::string_view> data, const quiche::StreamWriteOptions& options) { - // The Group ID is the 5th byte of the stream. + // The Group ID is the 4th byte of the stream. EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0); return absl::OkStatus(); }); EXPECT_CALL(mock_stream1, Writev(_, _)) .WillOnce([&](absl::Span<const absl::string_view> data, const quiche::StreamWriteOptions& options) { - // The Group ID is the 5th byte of the stream. - EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 1); + // The Group ID is the 4th byte of the stream. + EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 1); return absl::OkStatus(); }); EXPECT_CALL(mock_stream2, Writev(_, _)) .WillOnce([&](absl::Span<const absl::string_view> data, const quiche::StreamWriteOptions& options) { - // The Group ID is the 5th byte of the stream. - EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 2); + // The Group ID is the 4th byte of the stream. + EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 2); return absl::OkStatus(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream(); @@ -1505,8 +1514,8 @@ TEST_F(MoqtSessionTest, StreamQueuedForSubscriptionThatDoesntExist) { FullTrackName ftn("foo", "bar"); - auto track = - SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0)); + auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, + FullSequence(0, 0)); EXPECT_CALL(*track, GetTrackStatus()) .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun)); MoqtObjectListener* subscription = @@ -1527,8 +1536,8 @@ TEST_F(MoqtSessionTest, QueuedStreamPriorityChanged) { FullTrackName ftn("foo", "bar"); - auto track = - SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0)); + auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, + FullSequence(0, 0)); EXPECT_CALL(*track, GetTrackStatus()) .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun)); // Create two identical subscriptions with different priorities. @@ -1579,9 +1588,9 @@ .WillOnce([&](absl::Span<const absl::string_view> data, const quiche::StreamWriteOptions& options) { // Check subscribe ID is 0. - EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 0); + EXPECT_EQ(static_cast<const uint8_t>(data[0][1]), 0); // Check Group ID is 0 - EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0); + EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 0); return absl::OkStatus(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream(); @@ -1615,9 +1624,9 @@ .WillOnce([&](absl::Span<const absl::string_view> data, const quiche::StreamWriteOptions& options) { // Check subscribe ID is 0. - EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 1); + EXPECT_EQ(static_cast<const uint8_t>(data[0][1]), 1); // Check Group ID is 0 - EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0); + EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 0); return absl::OkStatus(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream();
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc index 0c4a637..4637bd1 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows.cc +++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -66,38 +66,18 @@ return true; } -bool SubscribeWindow::IsStreamProvokingObject( - FullSequence sequence, MoqtForwardingPreference preference) const { - if (sequence == start_) { - return true; - } - switch (preference) { - case MoqtForwardingPreference::kTrack: - return false; - case MoqtForwardingPreference::kGroup: - // Note: this assumes that the group starts with object 0. - return sequence.object == 0; - case MoqtForwardingPreference::kObject: - return true; - case MoqtForwardingPreference::kDatagram: - QUICHE_DCHECK(false); - return true; - } -} - ReducedSequenceIndex::ReducedSequenceIndex( FullSequence sequence, MoqtForwardingPreference preference) { switch (preference) { case MoqtForwardingPreference::kTrack: sequence_ = FullSequence(0, 0); break; - case MoqtForwardingPreference::kGroup: + case MoqtForwardingPreference::kSubgroup: sequence_ = FullSequence(sequence.group, 0); break; - case MoqtForwardingPreference::kObject: case MoqtForwardingPreference::kDatagram: sequence_ = sequence; - break; + return; } }
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h index 5e0307d..b850f3e 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows.h +++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -43,11 +43,6 @@ // MoQT, subscription windows are only allowed to shrink, not to expand). bool UpdateStartEnd(FullSequence start, std::optional<FullSequence> end); - // Returns true if for a given forwarding preference, the specified sequence - // number might be the first object on a stream. - bool IsStreamProvokingObject(FullSequence sequence, - MoqtForwardingPreference preference) const; - private: FullSequence start_; std::optional<FullSequence> end_;
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc index 49213ff..ba8687e 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc +++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -44,8 +44,8 @@ EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 0)), std::nullopt); } -TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdGroup) { - SendStreamMap stream_map(MoqtForwardingPreference::kGroup); +TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdSubgroup) { + SendStreamMap stream_map(MoqtForwardingPreference::kSubgroup); stream_map.AddStream(FullSequence{4, 0}, 2); EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), std::nullopt); stream_map.AddStream(FullSequence{5, 2}, 6); @@ -57,21 +57,6 @@ EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 2)), std::nullopt); } -TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdObject) { - SendStreamMap stream_map(MoqtForwardingPreference::kObject); - stream_map.AddStream(FullSequence{4, 0}, 2); - stream_map.AddStream(FullSequence{4, 1}, 6); - stream_map.AddStream(FullSequence{4, 2}, 10); - EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{4, 2}, 14), - "Stream already added"); - EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 0)), 2); - EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 2)), 10); - EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 4)), std::nullopt); - EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), std::nullopt); - stream_map.RemoveStream(FullSequence(4, 2), 10); - EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 2)), std::nullopt); -} - TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdDatagram) { SendStreamMap stream_map(MoqtForwardingPreference::kDatagram); EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{4, 0}, 2),
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc index 4b0ff92..cd62dd8 100644 --- a/quiche/quic/moqt/moqt_track_test.cc +++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -27,9 +27,9 @@ TEST_F(RemoteTrackTest, UpdateForwardingPreference) { EXPECT_TRUE( - track_.CheckForwardingPreference(MoqtForwardingPreference::kObject)); + track_.CheckForwardingPreference(MoqtForwardingPreference::kSubgroup)); EXPECT_TRUE( - track_.CheckForwardingPreference(MoqtForwardingPreference::kObject)); + track_.CheckForwardingPreference(MoqtForwardingPreference::kSubgroup)); EXPECT_FALSE( track_.CheckForwardingPreference(MoqtForwardingPreference::kDatagram)); }
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 7e4fafe..b274194 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -139,6 +139,10 @@ QUIC_LOG(INFO) << "OBJECT Object Send Order mismatch"; return false; } + if (cast.subgroup_id != object_.subgroup_id) { + QUIC_LOG(INFO) << "OBJECT Subgroup ID mismatch"; + return false; + } if (cast.payload_length != object_.payload_length) { QUIC_LOG(INFO) << "OBJECT Payload Length mismatch"; return false; @@ -159,23 +163,8 @@ /*publisher_priority=*/7, /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kTrack, - /*payload_length=*/std::nullopt, - }; -}; - -class QUICHE_NO_EXPORT ObjectStreamMessage : public ObjectMessage { - public: - ObjectStreamMessage() : ObjectMessage() { - SetWireImage(raw_packet_, sizeof(raw_packet_)); - object_.forwarding_preference = MoqtForwardingPreference::kObject; - } - - void ExpandVarints() override { ExpandVarintsImpl("vvvvv-v---"); } - - private: - uint8_t raw_packet_[10] = { - 0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, // varints - 0x66, 0x6f, 0x6f, // payload = "foo" + /*subgroup_id=*/std::nullopt, + /*payload_length=*/3, }; }; @@ -190,8 +179,9 @@ private: uint8_t raw_packet_[10] = { - 0x01, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, // varints - 0x66, 0x6f, 0x6f, // payload = "foo" + 0x01, 0x03, 0x04, 0x05, 0x06, // varints + 0x07, // publisher priority + 0x03, 0x66, 0x6f, 0x6f, // payload = "foo" }; }; @@ -205,16 +195,17 @@ object_.payload_length = 3; } - void ExpandVarints() override { ExpandVarintsImpl("--vv-vvv"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv-vvv"); } private: // Some tests check that a FIN sent at the halfway point of a message results // in an error. Without the unnecessary expanded varint 0x0405, the halfway // point falls at the end of the Stream Header, which is legal. Expand the // varint so that the FIN would be illegal. - uint8_t raw_packet_[11] = { - 0x40, 0x50, // two byte type field - 0x03, 0x04, 0x07, // varints + uint8_t raw_packet_[10] = { + 0x02, // type field + 0x03, 0x04, // varints + 0x07, // publisher priority 0x05, 0x06, // object middler 0x03, 0x66, 0x6f, 0x6f, // payload = "foo" }; @@ -226,7 +217,6 @@ StreamMiddlerTrackMessage() : ObjectMessage() { SetWireImage(raw_packet_, sizeof(raw_packet_)); object_.forwarding_preference = MoqtForwardingPreference::kTrack; - object_.payload_length = 3; object_.group_id = 9; object_.object_id = 10; } @@ -240,35 +230,47 @@ }; }; -class QUICHE_NO_EXPORT StreamHeaderGroupMessage : public ObjectMessage { +class QUICHE_NO_EXPORT StreamHeaderSubgroupMessage : public ObjectMessage { public: - StreamHeaderGroupMessage() : ObjectMessage() { + StreamHeaderSubgroupMessage() : ObjectMessage() { SetWireImage(raw_packet_, sizeof(raw_packet_)); - object_.forwarding_preference = MoqtForwardingPreference::kGroup; - object_.payload_length = 3; + object_.forwarding_preference = MoqtForwardingPreference::kSubgroup; + object_.subgroup_id = 8; } - void ExpandVarints() override { ExpandVarintsImpl("--vvv-vv"); } + void ExpandVarints() override { ExpandVarintsImpl("vvvvv-vv"); } + + bool SetPayloadLength(uint8_t payload_length) { + if (payload_length > 63) { + // This only supports one-byte varints. + return false; + } + object_.payload_length = payload_length; + raw_packet_[7] = payload_length; + SetWireImage(raw_packet_, sizeof(raw_packet_)); + return true; + } private: uint8_t raw_packet_[11] = { - 0x40, 0x51, // two-byte type field - 0x03, 0x04, 0x05, 0x07, // varints + 0x04, // type field + 0x03, 0x04, 0x05, 0x08, // varints + 0x07, // publisher priority 0x06, 0x03, 0x66, 0x6f, 0x6f, // object middler; payload = "foo" }; }; // Used only for tests that process multiple objects on one stream. -class QUICHE_NO_EXPORT StreamMiddlerGroupMessage : public ObjectMessage { +class QUICHE_NO_EXPORT StreamMiddlerSubgroupMessage : public ObjectMessage { public: - StreamMiddlerGroupMessage() : ObjectMessage() { + StreamMiddlerSubgroupMessage() : ObjectMessage() { SetWireImage(raw_packet_, sizeof(raw_packet_)); - object_.forwarding_preference = MoqtForwardingPreference::kGroup; - object_.payload_length = 3; + object_.forwarding_preference = MoqtForwardingPreference::kSubgroup; + object_.subgroup_id = 8; object_.object_id = 9; } - void ExpandVarints() override { ExpandVarintsImpl("vvv"); } + void ExpandVarints() override { ExpandVarintsImpl("vv"); } private: uint8_t raw_packet_[5] = { @@ -1187,14 +1189,12 @@ static inline std::unique_ptr<TestMessageBase> CreateTestDataStream( MoqtDataStreamType type) { switch (type) { - case MoqtDataStreamType::kObjectStream: - return std::make_unique<ObjectStreamMessage>(); case MoqtDataStreamType::kObjectDatagram: return std::make_unique<ObjectDatagramMessage>(); case MoqtDataStreamType::kStreamHeaderTrack: return std::make_unique<StreamHeaderTrackMessage>(); - case MoqtDataStreamType::kStreamHeaderGroup: - return std::make_unique<StreamHeaderGroupMessage>(); + case MoqtDataStreamType::kStreamHeaderSubgroup: + return std::make_unique<StreamHeaderSubgroupMessage>(); case MoqtDataStreamType::kPadding: return nullptr; }
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc index b706469..68208b1 100644 --- a/quiche/quic/moqt/tools/chat_client.cc +++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -165,7 +165,7 @@ FullTrackName my_track_name = chat_strings_->GetFullTrackNameFromUsername(username_); queue_ = std::make_shared<MoqtOutgoingQueue>( - my_track_name, MoqtForwardingPreference::kObject); + my_track_name, MoqtForwardingPreference::kSubgroup); publisher_.Add(queue_); session_->set_publisher(&publisher_); MoqtOutgoingAnnounceCallback announce_callback =
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc index 29d0842..028786e 100644 --- a/quiche/quic/moqt/tools/chat_server.cc +++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -60,6 +60,9 @@ } ChatServer::ChatServerSessionHandler::~ChatServerSessionHandler() { + if (!server_->is_running_) { + return; + } if (username_.has_value()) { server_->DeleteUser(*username_); } @@ -125,7 +128,7 @@ : server_(std::move(proof_source), std::move(incoming_session_callback_)), strings_(chat_id), catalog_(std::make_shared<MoqtOutgoingQueue>( - strings_.GetCatalogName(), MoqtForwardingPreference::kGroup)), + strings_.GetCatalogName(), MoqtForwardingPreference::kSubgroup)), remote_track_visitor_(this) { catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy( quiche::SimpleBufferAllocator::Get(), @@ -145,6 +148,7 @@ ChatServer::~ChatServer() { // Kill all sessions so that the callback doesn't fire when the server is // destroyed. + is_running_ = false; server_.quic_server().Shutdown(); } @@ -156,7 +160,7 @@ // Add a local track. user_queues_[username] = std::make_shared<MoqtLiveRelayQueue>( strings_.GetFullTrackNameFromUsername(username), - MoqtForwardingPreference::kObject); + MoqtForwardingPreference::kSubgroup); publisher_.Add(user_queues_[username]); }
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h index a399af9..7bb4f13 100644 --- a/quiche/quic/moqt/tools/chat_server.h +++ b/quiche/quic/moqt/tools/chat_server.h
@@ -103,6 +103,7 @@ MoqtIncomingSessionCallback incoming_session_callback_ = [&](absl::string_view path) { return IncomingSessionHandler(path); }; + bool is_running_ = true; MoqtServer server_; std::list<ChatServerSessionHandler> sessions_; MoqChatStrings strings_;
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc index e84a37d..d111d16 100644 --- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc +++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -118,7 +118,7 @@ QuicBandwidth bitrate) : Actor(simulator, actor_name), queue_(std::make_shared<MoqtOutgoingQueue>( - track_name, MoqtForwardingPreference::kGroup)), + track_name, MoqtForwardingPreference::kSubgroup)), keyframe_interval_(keyframe_interval), time_between_frames_(QuicTimeDelta::FromMicroseconds(1.0e6 / fps)), i_to_p_ratio_(i_to_p_ratio),