Update MoQT malformed track detection to draft-16. Also removed some enforcement that tracks need a single forwarding preference. PiperOrigin-RevId: 854319816
diff --git a/quiche/quic/moqt/moqt_object.h b/quiche/quic/moqt/moqt_object.h index 1918877..e708e46 100644 --- a/quiche/quic/moqt/moqt_object.h +++ b/quiche/quic/moqt/moqt_object.h
@@ -24,6 +24,13 @@ MoqtPriority publisher_priority; MoqtForwardingPreference forwarding_preference; quic::QuicTime arrival_time = quic::QuicTime::Zero(); + bool IsMalformed(const PublishedObjectMetadata& other) const { + // It's OK for arrival_time to be different when checking immutables. + return (location != other.location || subgroup != other.subgroup || + status != other.status || + publisher_priority != other.publisher_priority || + forwarding_preference != other.forwarding_preference); + } }; // PublishedObject is a description of an object that is sufficient to publish
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.cc b/quiche/quic/moqt/moqt_relay_track_publisher.cc index 5dfd187..5ca8496 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher.cc +++ b/quiche/quic/moqt/moqt_relay_track_publisher.cc
@@ -143,12 +143,41 @@ QUICHE_DCHECK( last_object.metadata.status != MoqtObjectStatus::kEndOfGroup && last_object.metadata.status != MoqtObjectStatus::kEndOfTrack); - if (last_object.metadata.location.object >= metadata.location.object) { - QUICHE_DLOG(INFO) << "Skipping object because it does not increase the " - << "object ID monotonically in the subgroup."; + if (last_object.metadata.location.object > metadata.location.object) { + QUICHE_DLOG(INFO) << "Skipping object because it decreases the " + << "object ID in the subgroup."; return; } } + if (metadata.status == MoqtObjectStatus::kEndOfGroup || + metadata.status == MoqtObjectStatus::kEndOfTrack) { + // Anticipate stream FIN. + last_object_in_stream = true; + } + std::shared_ptr<quiche::QuicheMemSlice> slice; + if (!object.empty()) { + slice = std::make_shared<quiche::QuicheMemSlice>( + quiche::QuicheMemSlice::Copy(object)); + } + auto [it, inserted] = subgroup.try_emplace( + metadata.location.object, + CachedObject{metadata, slice, last_object_in_stream}); + if (!inserted) { + // It's a duplicate object. + CachedObject& old_object = it->second; + if (metadata.IsMalformed(old_object.metadata)) { + // Something besides the arrival time and extension headers changed. + OnMalformedTrack(full_track_name); + return; + } + // TODO(b/467718801): Fix this when the class supports partial object + // delivery. When objects are complete, we can simply compare payloads. + if (old_object.payload->AsStringView() != object) { + OnMalformedTrack(full_track_name); + } + // No need to update state. + return; + } // Object is valid. Update state. if (next_location_ <= metadata.location) { next_location_ = metadata.location.Next(); @@ -156,26 +185,16 @@ if (metadata.location.object >= group.next_object) { group.next_object = metadata.location.object + 1; } - // Anticipate stream FIN with most non-normal objects. switch (metadata.status) { case MoqtObjectStatus::kEndOfTrack: end_of_track_ = metadata.location; - last_object_in_stream = true; ABSL_FALLTHROUGH_INTENDED; case MoqtObjectStatus::kEndOfGroup: group.complete = true; - last_object_in_stream = true; break; default: break; } - std::shared_ptr<quiche::QuicheMemSlice> slice; - if (!object.empty()) { - slice = std::make_shared<quiche::QuicheMemSlice>( - quiche::QuicheMemSlice::Copy(object)); - } - subgroup.emplace(metadata.location.object, - CachedObject{metadata, slice, last_object_in_stream}); for (MoqtObjectListener* listener : listeners_) { listener->OnNewObjectAvailable(metadata.location, metadata.subgroup, metadata.publisher_priority,
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc index 68d5bc7..c5add6d 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc +++ b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
@@ -357,6 +357,83 @@ EXPECT_TRUE(track_deleted_); } +TEST_F(MoqtRelayTrackPublisherTest, DuplicateObject) { + EXPECT_CALL(*session_, SubscribeCurrentObject) + .WillOnce(testing::Return(true)); + publisher_.AddObjectListener(&listener_); + Location location = kLargestLocation.Next(); + EXPECT_CALL(listener_, + OnNewObjectAvailable(location, /*subgroup=*/0, + /*publisher_priority=*/128, + MoqtForwardingPreference::kSubgroup)); + publisher_.OnObjectFragment( + kTrackName, + PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, + 128, MoqtForwardingPreference::kSubgroup}, + "object", /*end_of_message=*/true); + // Exact duplicate is ignored. It doesn't matter that the arrival time + // changed. + EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0); + EXPECT_CALL(listener_, OnTrackPublisherGone).Times(0); + EXPECT_FALSE(track_deleted_); + publisher_.OnObjectFragment( + kTrackName, + PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, + 128, MoqtForwardingPreference::kSubgroup, + quic::QuicTime::Infinite()}, + "object", /*end_of_message=*/true); +} + +TEST_F(MoqtRelayTrackPublisherTest, DuplicateObjectChangedMetadata) { + EXPECT_CALL(*session_, SubscribeCurrentObject) + .WillOnce(testing::Return(true)); + publisher_.AddObjectListener(&listener_); + Location location = kLargestLocation.Next(); + EXPECT_CALL(listener_, + OnNewObjectAvailable(location, /*subgroup=*/0, + /*publisher_priority=*/128, + MoqtForwardingPreference::kSubgroup)); + publisher_.OnObjectFragment( + kTrackName, + PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, + 128, MoqtForwardingPreference::kSubgroup}, + "object", /*end_of_message=*/true); + // Priority change; malformed track. + EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0); + EXPECT_CALL(listener_, OnTrackPublisherGone); + publisher_.OnObjectFragment( + kTrackName, + PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, 64, + MoqtForwardingPreference::kSubgroup}, + "object", /*end_of_message=*/true); + EXPECT_TRUE(track_deleted_); +} + +TEST_F(MoqtRelayTrackPublisherTest, DuplicateObjectChangedPayload) { + EXPECT_CALL(*session_, SubscribeCurrentObject) + .WillOnce(testing::Return(true)); + publisher_.AddObjectListener(&listener_); + Location location = kLargestLocation.Next(); + EXPECT_CALL(listener_, + OnNewObjectAvailable(location, /*subgroup=*/0, + /*publisher_priority=*/128, + MoqtForwardingPreference::kSubgroup)); + publisher_.OnObjectFragment( + kTrackName, + PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, + 128, MoqtForwardingPreference::kSubgroup}, + "payload", /*end_of_message=*/true); + // Payload change; malformed track. + EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0); + EXPECT_CALL(listener_, OnTrackPublisherGone); + publisher_.OnObjectFragment( + kTrackName, + PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, + 128, MoqtForwardingPreference::kSubgroup}, + "foobar", /*end_of_message=*/true); + EXPECT_TRUE(track_deleted_); +} + TEST_F(MoqtRelayTrackPublisherTest, Fin) { SubscribeAndOk();
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 1e2ea27..b1c1ef6 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -221,10 +221,7 @@ if (track == nullptr) { return; } - if (!track->OnObject(/*is_datagram=*/true)) { - OnMalformedTrack(track); - return; - } + track->OnObjectOrOk(); if (!track->InWindow(Location(message.group_id, message.object_id))) { // TODO(martinduke): a recent SUBSCRIBE_UPDATE could put us here, and it's // not an error. @@ -1770,7 +1767,9 @@ } if (!track->is_fetch()) { if (no_more_objects_) { - // Already got a stream-ending object. + // Already got a stream-ending object. While the lower layer won't + // deliver data after the FIN, there could have been an EndOfGroup or + // EndOfTrack signal. session_->OnMalformedTrack(track); return; } @@ -1782,10 +1781,7 @@ } } SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track); - if (!subscribe->OnObject(/*is_datagram=*/false)) { - session_->OnMalformedTrack(track); - return; - } + subscribe->OnObjectOrOk(); if (subscribe->visitor() != nullptr) { PublishedObjectMetadata metadata; metadata.location = Location(message.group_id, message.object_id); @@ -1803,6 +1799,11 @@ UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track); if (!fetch->LocationIsValid(Location(message.group_id, message.object_id), message.object_status, end_of_message)) { + // TODO(martinduke): in https://github.com/moq-wg/moq-transport/pull/1409 + // I make the case that this should be a protocol violation. Update if + // that proposal is accepted (at which point + // QuicSession::OnMalformedTrack can be removed, since all the + // remaining conditions are at the application layer). session_->OnMalformedTrack(track); return; }
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index f414556..6ec6709 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -2074,38 +2074,6 @@ session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram))); } -TEST_F(MoqtSessionTest, DataStreamTypeMismatch) { - std::string payload = "deadbeef"; - MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(), - /*track_alias=*/2, &remote_track_visitor_); - MoqtObject object = { - /*track_alias=*/2, - /*group_sequence=*/0, - /*object_sequence=*/0, - /*publisher_priority=*/0, - /*extension_headers=*/"", - /*object_status=*/MoqtObjectStatus::kNormal, - /*subgroup_id=*/0, - /*payload_length=*/8, - }; - std::unique_ptr<MoqtDataParserVisitor> object_stream = - MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream_, - kDefaultSubgroupStreamType); - - EXPECT_CALL(remote_track_visitor_, OnObjectFragment).Times(1); - EXPECT_CALL(mock_stream_, GetStreamId()) - .WillRepeatedly(Return(kIncomingUniStreamId)); - object_stream->OnObjectMessage(object, payload, true); - char datagram[] = {0x00, 0x02, 0x00, 0x10, 0x00, 0x64, 0x65, - 0x61, 0x64, 0x62, 0x65, 0x65, 0x66}; - // Arrival of a datagram creates a malformed track. Unsubscribe. - std::unique_ptr<MoqtControlParserVisitor> control_stream = - MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); - EXPECT_CALL(mock_stream_, - Writev(ControlMessageOfType(MoqtMessageType::kUnsubscribe), _)); - session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram))); -} - TEST_F(MoqtSessionTest, StreamObjectOutOfWindow) { std::string payload = "deadbeef"; MoqtSubscribe subscribe = DefaultSubscribe();
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index 63d470e..f27ebd1 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -114,15 +114,6 @@ } SubscribeVisitor* visitor() { return visitor_; } - // Returns false if the forwarding preference is changing on the track. - bool OnObject(bool is_datagram) { - OnObjectOrOk(); - if (!is_datagram_.has_value()) { - is_datagram_ = is_datagram; - return true; - } - return (is_datagram_ == is_datagram); - } // Called on SUBSCRIBE_OK or SUBSCRIBE_UPDATE. bool TruncateStart(Location start) { return window_mutable().TruncateStart(start); @@ -162,7 +153,6 @@ std::optional<const uint64_t> track_alias_; bool forward_; SubscribeVisitor* visitor_; - std::optional<bool> is_datagram_; int currently_open_streams_ = 0; // Every stream that has received FIN or RESET_STREAM. uint64_t streams_closed_ = 0;