Add Object Status to MoQT Object Messages. Corrects an oversight in updating to the latest draft. This is just the parser and framer; using the information will be a different CL. PiperOrigin-RevId: 643663252
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 4bb3a04..fd9aeef 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -136,15 +136,31 @@ "length in advance"; return quiche::QuicheBuffer(); } + if (message.object_status != MoqtObjectStatus::kNormal && + message.payload_length.has_value() && *message.payload_length > 0) { + QUIC_BUG(quic_bug_serialize_object_input_03) + << "Object status must be kNormal if payload is non-empty"; + return quiche::QuicheBuffer(); + } if (!is_first_in_stream) { switch (message.forwarding_preference) { case MoqtForwardingPreference::kTrack: - return Serialize(WireVarInt62(message.group_id), - WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length)); + return (*message.payload_length == 0) + ? Serialize(WireVarInt62(message.group_id), + WireVarInt62(message.object_id), + WireVarInt62(*message.payload_length), + WireVarInt62(message.object_status)) + : Serialize(WireVarInt62(message.group_id), + WireVarInt62(message.object_id), + WireVarInt62(*message.payload_length)); case MoqtForwardingPreference::kGroup: - return Serialize(WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length)); + return (*message.payload_length == 0) + ? Serialize(WireVarInt62(message.object_id), + WireVarInt62(*message.payload_length), + WireVarInt62(static_cast<uint64_t>( + message.object_status))) + : Serialize(WireVarInt62(message.object_id), + WireVarInt62(*message.payload_length)); default: QUIC_BUG(quic_bug_serialize_object_input_02) << "Object or Datagram forwarding_preference must be first in " @@ -156,36 +172,64 @@ GetMessageTypeForForwardingPreference(message.forwarding_preference); switch (message.forwarding_preference) { case MoqtForwardingPreference::kTrack: - return Serialize( - WireVarInt62(message_type), WireVarInt62(message.subscribe_id), - WireVarInt62(message.track_alias), - WireVarInt62(message.object_send_order), - WireVarInt62(message.group_id), WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length)); + return (*message.payload_length == 0) + ? Serialize(WireVarInt62(message_type), + WireVarInt62(message.subscribe_id), + WireVarInt62(message.track_alias), + WireVarInt62(message.object_send_order), + WireVarInt62(message.group_id), + WireVarInt62(message.object_id), + WireVarInt62(*message.payload_length), + WireVarInt62(message.object_status)) + : Serialize(WireVarInt62(message_type), + WireVarInt62(message.subscribe_id), + WireVarInt62(message.track_alias), + WireVarInt62(message.object_send_order), + WireVarInt62(message.group_id), + WireVarInt62(message.object_id), + WireVarInt62(*message.payload_length)); case MoqtForwardingPreference::kGroup: - return Serialize( - WireVarInt62(message_type), WireVarInt62(message.subscribe_id), - WireVarInt62(message.track_alias), WireVarInt62(message.group_id), - WireVarInt62(message.object_send_order), - WireVarInt62(message.object_id), - WireVarInt62(*message.payload_length)); + return (*message.payload_length == 0) + ? Serialize(WireVarInt62(message_type), + WireVarInt62(message.subscribe_id), + WireVarInt62(message.track_alias), + WireVarInt62(message.group_id), + WireVarInt62(message.object_send_order), + WireVarInt62(message.object_id), + WireVarInt62(*message.payload_length), + WireVarInt62(message.object_status)) + : Serialize(WireVarInt62(message_type), + WireVarInt62(message.subscribe_id), + WireVarInt62(message.track_alias), + WireVarInt62(message.group_id), + WireVarInt62(message.object_send_order), + WireVarInt62(message.object_id), + WireVarInt62(*message.payload_length)); case MoqtForwardingPreference::kObject: case MoqtForwardingPreference::kDatagram: return Serialize( WireVarInt62(message_type), WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias), WireVarInt62(message.group_id), WireVarInt62(message.object_id), - WireVarInt62(message.object_send_order)); + WireVarInt62(message.object_send_order), + WireVarInt62(message.object_status)); } } quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram( const MoqtObject& message, absl::string_view payload) { + if (message.object_status != MoqtObjectStatus::kNormal && !payload.empty()) { + QUIC_BUG(quic_bug_serialize_object_datagram_01) + << "Object status must be kNormal if payload is non-empty"; + return quiche::QuicheBuffer(); + } return Serialize( WireVarInt62(MoqtMessageType::kObjectDatagram), WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias), WireVarInt62(message.group_id), WireVarInt62(message.object_id), - WireVarInt62(message.object_send_order), WireBytes(payload)); + WireVarInt62(message.object_send_order), + WireVarInt62(static_cast<uint64_t>(message.object_status)), + WireBytes(payload)); } quiche::QuicheBuffer MoqtFramer::SerializeClientSetup(
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index 7907d30..916ac1d 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -241,6 +241,7 @@ /*group_id=*/5, /*object_id=*/6, /*object_send_order=*/7, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kObject, /*payload_length=*/std::nullopt, }; @@ -253,6 +254,11 @@ EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false), "requires knowing the object length"); EXPECT_TRUE(buffer.empty()); + object.payload_length = 5; + object.object_status = MoqtObjectStatus::kEndOfGroup; + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false), + "Object status must be kNormal if payload is non-empty"); + EXPECT_TRUE(buffer.empty()); } TEST_F(MoqtFramerSimpleTest, Datagram) { @@ -263,6 +269,7 @@ /*group_id=*/5, /*object_id=*/6, /*object_send_order=*/7, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kObject, /*payload_length=*/std::nullopt, };
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index 18a73ad..277687f 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -10,6 +10,13 @@ namespace moqt { +MoqtObjectStatus IntegerToObjectStatus(uint64_t integer) { + if (integer >= 0x5) { + return MoqtObjectStatus::kInvalidObjectStatus; + } + return static_cast<MoqtObjectStatus>(integer); +} + MoqtFilterType GetFilterType(const MoqtSubscribe& message) { if (!message.end_group.has_value() && message.end_object.has_value()) { return MoqtFilterType::kNone;
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index c117946..c2a22cd 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -190,6 +190,17 @@ kDatagram = 0x3, }; +enum class QUICHE_EXPORT MoqtObjectStatus : uint64_t { + kNormal = 0x0, + kObjectDoesNotExist = 0x1, + kGroupDoesNotExist = 0x2, + kEndOfGroup = 0x3, + kEndOfTrack = 0x4, + kInvalidObjectStatus = 0x5, +}; + +MoqtObjectStatus IntegerToObjectStatus(uint64_t integer); + // The data contained in every Object message, although the message type // implies some of the values. |payload_length| has no value if the length // is unknown (because it runs to the end of the stream.) @@ -199,6 +210,7 @@ uint64_t group_id; uint64_t object_id; uint64_t object_send_order; + MoqtObjectStatus object_status; MoqtForwardingPreference forwarding_preference; std::optional<uint64_t> payload_length; };
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index e1eba0b..ffa0cfa 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -220,11 +220,33 @@ return processed_data; } object_metadata_->payload_length = length; + uint64_t status = 0; // Defaults to kNormal. + if (length == 0 && !reader.ReadVarInt62(&status)) { + return processed_data; + } + object_metadata_->object_status = IntegerToObjectStatus(status); break; } default: break; } + if (object_metadata_->object_status == + MoqtObjectStatus::kInvalidObjectStatus) { + ParseError("Invalid object status"); + return processed_data; + } + if (object_metadata_->object_status != MoqtObjectStatus::kNormal) { + // It is impossible to express an explicit length with this status. + if ((type == MoqtMessageType::kObjectStream || + type == MoqtMessageType::kObjectDatagram) && + reader.BytesRemaining() > 0) { + // There is additional data in the stream/datagram, which is an error. + ParseError("Object with non-normal status has payload"); + return processed_data; + } + visitor_.OnObjectMessage(*object_metadata_, "", true); + return processed_data; + } bool has_length = object_metadata_->payload_length.has_value(); bool received_complete_message = false; size_t payload_to_draw = reader.BytesRemaining(); @@ -722,6 +744,13 @@ if (!reader.ReadVarInt62(&object.object_send_order)) { return 0; } + uint64_t status = 0; // Defaults to kNormal. + if ((type == MoqtMessageType::kObjectStream || + type == MoqtMessageType::kObjectDatagram) && + !reader.ReadVarInt62(&status)) { + return 0; + } + object.object_status = IntegerToObjectStatus(status); object.forwarding_preference = GetForwardingPreference(type); return reader.PreviouslyReadPayload().length(); }
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index 04a6ef7..bada6cd 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -131,6 +131,7 @@ // for the most recent object. bool ObjectPayloadInProgress() const { return (object_metadata_.has_value() && + object_metadata_->object_status == MoqtObjectStatus::kNormal && (object_metadata_->forwarding_preference == MoqtForwardingPreference::kObject || object_metadata_->forwarding_preference ==
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index 4794f0e..80dfade 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -407,7 +407,9 @@ EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); EXPECT_FALSE(visitor_.end_of_message_); EXPECT_TRUE(visitor_.object_payload_.has_value()); - EXPECT_EQ(visitor_.object_payload_->length(), 94); + // The value "93" is the overall wire image size of 100 minus the non-payload + // part of the message. + EXPECT_EQ(visitor_.object_payload_->length(), 93); // third part includes FIN parser.ProcessData("bar", true); @@ -704,6 +706,33 @@ EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } +TEST_F(MoqtMessageSpecificTest, NonNormalObjectHasPayload) { + MoqtParser parser(kRawQuic, visitor_); + char object_stream[] = { + 0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x02, // varints + 0x66, 0x6f, 0x6f, // payload = "foo" + }; + parser.ProcessData(absl::string_view(object_stream, sizeof(object_stream)), + false); + EXPECT_TRUE(visitor_.parsing_error_.has_value()); + EXPECT_EQ(*visitor_.parsing_error_, + "Object with non-normal status has payload"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + +TEST_F(MoqtMessageSpecificTest, InvalidObjectStatus) { + MoqtParser parser(kRawQuic, visitor_); + char object_stream[] = { + 0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x06, // varints + 0x66, 0x6f, 0x6f, // payload = "foo" + }; + parser.ProcessData(absl::string_view(object_stream, sizeof(object_stream)), + false); + EXPECT_TRUE(visitor_.parsing_error_.has_value()); + EXPECT_EQ(*visitor_.parsing_error_, "Invalid object status"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + TEST_F(MoqtMessageSpecificTest, Setup2KB) { MoqtParser parser(kRawQuic, visitor_); char big_message[2 * kMaxMessageHeaderSize];
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index b3934cc..92bf67f 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -437,6 +437,7 @@ object.group_id = group_id; object.object_id = object_id; object.object_send_order = object_send_order; + object.object_status = MoqtObjectStatus::kNormal; object.forwarding_preference = forwarding_preference; object.payload_length = payload.size(); int failures = 0;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 89877b2..51b60ae 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -571,6 +571,7 @@ /*group_sequence=*/0, /*object_sequence=*/0, /*object_send_order=*/0, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kGroup, /*payload_length=*/8, }; @@ -595,6 +596,7 @@ /*group_sequence=*/0, /*object_sequence=*/0, /*object_send_order=*/0, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kGroup, /*payload_length=*/16, }; @@ -629,6 +631,7 @@ /*group_sequence=*/0, /*object_sequence=*/0, /*object_send_order=*/0, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kGroup, /*payload_length=*/16, }; @@ -664,6 +667,7 @@ /*group_sequence=*/0, /*object_sequence=*/0, /*object_send_order=*/0, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kGroup, /*payload_length=*/8, }; @@ -719,6 +723,7 @@ /*group_sequence=*/0, /*object_sequence=*/0, /*object_send_order=*/0, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kGroup, /*payload_length=*/8, }; @@ -778,6 +783,7 @@ /*group_sequence=*/0, /*object_sequence=*/0, /*object_send_order=*/0, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kGroup, /*payload_length=*/8, }; @@ -828,6 +834,7 @@ /*group_sequence=*/0, /*object_sequence=*/0, /*object_send_order=*/0, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kGroup, /*payload_length=*/8, }; @@ -1103,7 +1110,7 @@ // Publish in window. bool correct_message = false; uint8_t kExpectedMessage[] = { - 0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x64, + 0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x00, 0x64, 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66, }; EXPECT_CALL(mock_session_, SendOrQueueDatagram(_)) @@ -1130,10 +1137,11 @@ /*group_sequence=*/0, /*object_sequence=*/0, /*object_send_order=*/0, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kDatagram, /*payload_length=*/8, }; - char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x64, + char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x64, 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66}; EXPECT_CALL(visitor_, OnObjectFragment(ftn, object.group_id, object.object_id, @@ -1154,6 +1162,7 @@ /*group_sequence=*/0, /*object_sequence=*/0, /*object_send_order=*/0, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kGroup, /*payload_length=*/8, };
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 227d298..2c2c323 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -19,7 +19,6 @@ #include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/platform/api/quic_logging.h" -#include "quiche/quic/platform/api/quic_test.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/quiche_endian.h" @@ -140,6 +139,10 @@ QUIC_LOG(INFO) << "OBJECT Object Send Order mismatch"; return false; } + if (cast.object_status != object_.object_status) { + QUIC_LOG(INFO) << "OBJECT Object Status mismatch"; + return false; + } if (cast.forwarding_preference != object_.forwarding_preference) { QUIC_LOG(INFO) << "OBJECT Object Send Order mismatch"; return false; @@ -162,6 +165,7 @@ /*group_id*/ 5, /*object_id=*/6, /*object_send_order=*/7, + /*object_status=*/MoqtObjectStatus::kNormal, /*forwarding_preference=*/MoqtForwardingPreference::kTrack, /*payload_length=*/std::nullopt, }; @@ -175,13 +179,13 @@ } void ExpandVarints() override { - ExpandVarintsImpl("vvvvvv"); // first six fields are varints + ExpandVarintsImpl("vvvvvvv---"); // first six fields are varints } private: - uint8_t raw_packet_[9] = { - 0x00, 0x03, 0x04, 0x05, 0x06, 0x07, // varints - 0x66, 0x6f, 0x6f, // payload = "foo" + uint8_t raw_packet_[10] = { + 0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, // varints + 0x66, 0x6f, 0x6f, // payload = "foo" }; }; @@ -193,13 +197,13 @@ } void ExpandVarints() override { - ExpandVarintsImpl("vvvvvv"); // first six fields are varints + ExpandVarintsImpl("vvvvvvv---"); // first six fields are varints } private: - uint8_t raw_packet_[9] = { - 0x01, 0x03, 0x04, 0x05, 0x06, 0x07, // varints - 0x66, 0x6f, 0x6f, // payload = "foo" + uint8_t raw_packet_[10] = { + 0x01, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, // varints + 0x66, 0x6f, 0x6f, // payload = "foo" }; }; @@ -247,7 +251,8 @@ private: uint8_t raw_packet_[6] = { - 0x09, 0x0a, 0x03, 0x62, 0x61, 0x72, // object middler; payload = "bar" + 0x09, 0x0a, // object middler + 0x03, 0x62, 0x61, 0x72, // payload = "bar" }; };