Update MoQT malformed track detection to draft-16.

Also removed some enforcement that tracks need a single forwarding preference.

PiperOrigin-RevId: 854319816
diff --git a/quiche/quic/moqt/moqt_object.h b/quiche/quic/moqt/moqt_object.h
index 1918877..e708e46 100644
--- a/quiche/quic/moqt/moqt_object.h
+++ b/quiche/quic/moqt/moqt_object.h
@@ -24,6 +24,13 @@
   MoqtPriority publisher_priority;
   MoqtForwardingPreference forwarding_preference;
   quic::QuicTime arrival_time = quic::QuicTime::Zero();
+  bool IsMalformed(const PublishedObjectMetadata& other) const {
+    // It's OK for arrival_time to be different when checking immutables.
+    return (location != other.location || subgroup != other.subgroup ||
+            status != other.status ||
+            publisher_priority != other.publisher_priority ||
+            forwarding_preference != other.forwarding_preference);
+  }
 };
 
 // PublishedObject is a description of an object that is sufficient to publish
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.cc b/quiche/quic/moqt/moqt_relay_track_publisher.cc
index 5dfd187..5ca8496 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher.cc
+++ b/quiche/quic/moqt/moqt_relay_track_publisher.cc
@@ -143,12 +143,41 @@
     QUICHE_DCHECK(
         last_object.metadata.status != MoqtObjectStatus::kEndOfGroup &&
         last_object.metadata.status != MoqtObjectStatus::kEndOfTrack);
-    if (last_object.metadata.location.object >= metadata.location.object) {
-      QUICHE_DLOG(INFO) << "Skipping object because it does not increase the "
-                        << "object ID monotonically in the subgroup.";
+    if (last_object.metadata.location.object > metadata.location.object) {
+      QUICHE_DLOG(INFO) << "Skipping object because it decreases the "
+                        << "object ID in the subgroup.";
       return;
     }
   }
+  if (metadata.status == MoqtObjectStatus::kEndOfGroup ||
+      metadata.status == MoqtObjectStatus::kEndOfTrack) {
+    // Anticipate stream FIN.
+    last_object_in_stream = true;
+  }
+  std::shared_ptr<quiche::QuicheMemSlice> slice;
+  if (!object.empty()) {
+    slice = std::make_shared<quiche::QuicheMemSlice>(
+        quiche::QuicheMemSlice::Copy(object));
+  }
+  auto [it, inserted] = subgroup.try_emplace(
+      metadata.location.object,
+      CachedObject{metadata, slice, last_object_in_stream});
+  if (!inserted) {
+    // It's a duplicate object.
+    CachedObject& old_object = it->second;
+    if (metadata.IsMalformed(old_object.metadata)) {
+      // Something besides the arrival time and extension headers changed.
+      OnMalformedTrack(full_track_name);
+      return;
+    }
+    // TODO(b/467718801): Fix this when the class supports partial object
+    // delivery. When objects are complete, we can simply compare payloads.
+    if (old_object.payload->AsStringView() != object) {
+      OnMalformedTrack(full_track_name);
+    }
+    // No need to update state.
+    return;
+  }
   // Object is valid. Update state.
   if (next_location_ <= metadata.location) {
     next_location_ = metadata.location.Next();
@@ -156,26 +185,16 @@
   if (metadata.location.object >= group.next_object) {
     group.next_object = metadata.location.object + 1;
   }
-  // Anticipate stream FIN with most non-normal objects.
   switch (metadata.status) {
     case MoqtObjectStatus::kEndOfTrack:
       end_of_track_ = metadata.location;
-      last_object_in_stream = true;
       ABSL_FALLTHROUGH_INTENDED;
     case MoqtObjectStatus::kEndOfGroup:
       group.complete = true;
-      last_object_in_stream = true;
       break;
     default:
       break;
   }
-  std::shared_ptr<quiche::QuicheMemSlice> slice;
-  if (!object.empty()) {
-    slice = std::make_shared<quiche::QuicheMemSlice>(
-        quiche::QuicheMemSlice::Copy(object));
-  }
-  subgroup.emplace(metadata.location.object,
-                   CachedObject{metadata, slice, last_object_in_stream});
   for (MoqtObjectListener* listener : listeners_) {
     listener->OnNewObjectAvailable(metadata.location, metadata.subgroup,
                                    metadata.publisher_priority,
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
index 68d5bc7..c5add6d 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
+++ b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
@@ -357,6 +357,83 @@
   EXPECT_TRUE(track_deleted_);
 }
 
+TEST_F(MoqtRelayTrackPublisherTest, DuplicateObject) {
+  EXPECT_CALL(*session_, SubscribeCurrentObject)
+      .WillOnce(testing::Return(true));
+  publisher_.AddObjectListener(&listener_);
+  Location location = kLargestLocation.Next();
+  EXPECT_CALL(listener_,
+              OnNewObjectAvailable(location, /*subgroup=*/0,
+                                   /*publisher_priority=*/128,
+                                   MoqtForwardingPreference::kSubgroup));
+  publisher_.OnObjectFragment(
+      kTrackName,
+      PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
+                              128, MoqtForwardingPreference::kSubgroup},
+      "object", /*end_of_message=*/true);
+  // Exact duplicate is ignored. It doesn't matter that the arrival time
+  // changed.
+  EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0);
+  EXPECT_CALL(listener_, OnTrackPublisherGone).Times(0);
+  EXPECT_FALSE(track_deleted_);
+  publisher_.OnObjectFragment(
+      kTrackName,
+      PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
+                              128, MoqtForwardingPreference::kSubgroup,
+                              quic::QuicTime::Infinite()},
+      "object", /*end_of_message=*/true);
+}
+
+TEST_F(MoqtRelayTrackPublisherTest, DuplicateObjectChangedMetadata) {
+  EXPECT_CALL(*session_, SubscribeCurrentObject)
+      .WillOnce(testing::Return(true));
+  publisher_.AddObjectListener(&listener_);
+  Location location = kLargestLocation.Next();
+  EXPECT_CALL(listener_,
+              OnNewObjectAvailable(location, /*subgroup=*/0,
+                                   /*publisher_priority=*/128,
+                                   MoqtForwardingPreference::kSubgroup));
+  publisher_.OnObjectFragment(
+      kTrackName,
+      PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
+                              128, MoqtForwardingPreference::kSubgroup},
+      "object", /*end_of_message=*/true);
+  // Priority change; malformed track.
+  EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0);
+  EXPECT_CALL(listener_, OnTrackPublisherGone);
+  publisher_.OnObjectFragment(
+      kTrackName,
+      PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, 64,
+                              MoqtForwardingPreference::kSubgroup},
+      "object", /*end_of_message=*/true);
+  EXPECT_TRUE(track_deleted_);
+}
+
+TEST_F(MoqtRelayTrackPublisherTest, DuplicateObjectChangedPayload) {
+  EXPECT_CALL(*session_, SubscribeCurrentObject)
+      .WillOnce(testing::Return(true));
+  publisher_.AddObjectListener(&listener_);
+  Location location = kLargestLocation.Next();
+  EXPECT_CALL(listener_,
+              OnNewObjectAvailable(location, /*subgroup=*/0,
+                                   /*publisher_priority=*/128,
+                                   MoqtForwardingPreference::kSubgroup));
+  publisher_.OnObjectFragment(
+      kTrackName,
+      PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
+                              128, MoqtForwardingPreference::kSubgroup},
+      "payload", /*end_of_message=*/true);
+  // Payload change; malformed track.
+  EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0);
+  EXPECT_CALL(listener_, OnTrackPublisherGone);
+  publisher_.OnObjectFragment(
+      kTrackName,
+      PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
+                              128, MoqtForwardingPreference::kSubgroup},
+      "foobar", /*end_of_message=*/true);
+  EXPECT_TRUE(track_deleted_);
+}
+
 TEST_F(MoqtRelayTrackPublisherTest, Fin) {
   SubscribeAndOk();
 
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 1e2ea27..b1c1ef6 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -221,10 +221,7 @@
   if (track == nullptr) {
     return;
   }
-  if (!track->OnObject(/*is_datagram=*/true)) {
-    OnMalformedTrack(track);
-    return;
-  }
+  track->OnObjectOrOk();
   if (!track->InWindow(Location(message.group_id, message.object_id))) {
     // TODO(martinduke): a recent SUBSCRIBE_UPDATE could put us here, and it's
     // not an error.
@@ -1770,7 +1767,9 @@
   }
   if (!track->is_fetch()) {
     if (no_more_objects_) {
-      // Already got a stream-ending object.
+      // Already got a stream-ending object. While the lower layer won't
+      // deliver data after the FIN, there could have been an EndOfGroup or
+      // EndOfTrack signal.
       session_->OnMalformedTrack(track);
       return;
     }
@@ -1782,10 +1781,7 @@
       }
     }
     SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track);
-    if (!subscribe->OnObject(/*is_datagram=*/false)) {
-      session_->OnMalformedTrack(track);
-      return;
-    }
+    subscribe->OnObjectOrOk();
     if (subscribe->visitor() != nullptr) {
       PublishedObjectMetadata metadata;
       metadata.location = Location(message.group_id, message.object_id);
@@ -1803,6 +1799,11 @@
     UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track);
     if (!fetch->LocationIsValid(Location(message.group_id, message.object_id),
                                 message.object_status, end_of_message)) {
+      // TODO(martinduke): in https://github.com/moq-wg/moq-transport/pull/1409
+      // I make the case that this should be a protocol violation. Update if
+      // that proposal is accepted (at which point
+      // QuicSession::OnMalformedTrack can be removed, since all the
+      // remaining conditions are at the application layer).
       session_->OnMalformedTrack(track);
       return;
     }
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index f414556..6ec6709 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -2074,38 +2074,6 @@
   session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
 }
 
-TEST_F(MoqtSessionTest, DataStreamTypeMismatch) {
-  std::string payload = "deadbeef";
-  MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(),
-                                     /*track_alias=*/2, &remote_track_visitor_);
-  MoqtObject object = {
-      /*track_alias=*/2,
-      /*group_sequence=*/0,
-      /*object_sequence=*/0,
-      /*publisher_priority=*/0,
-      /*extension_headers=*/"",
-      /*object_status=*/MoqtObjectStatus::kNormal,
-      /*subgroup_id=*/0,
-      /*payload_length=*/8,
-  };
-  std::unique_ptr<MoqtDataParserVisitor> object_stream =
-      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream_,
-                                                kDefaultSubgroupStreamType);
-
-  EXPECT_CALL(remote_track_visitor_, OnObjectFragment).Times(1);
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kIncomingUniStreamId));
-  object_stream->OnObjectMessage(object, payload, true);
-  char datagram[] = {0x00, 0x02, 0x00, 0x10, 0x00, 0x64, 0x65,
-                     0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
-  // Arrival of a datagram creates a malformed track. Unsubscribe.
-  std::unique_ptr<MoqtControlParserVisitor> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  EXPECT_CALL(mock_stream_,
-              Writev(ControlMessageOfType(MoqtMessageType::kUnsubscribe), _));
-  session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
-}
-
 TEST_F(MoqtSessionTest, StreamObjectOutOfWindow) {
   std::string payload = "deadbeef";
   MoqtSubscribe subscribe = DefaultSubscribe();
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 63d470e..f27ebd1 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -114,15 +114,6 @@
   }
   SubscribeVisitor* visitor() { return visitor_; }
 
-  // Returns false if the forwarding preference is changing on the track.
-  bool OnObject(bool is_datagram) {
-    OnObjectOrOk();
-    if (!is_datagram_.has_value()) {
-      is_datagram_ = is_datagram;
-      return true;
-    }
-    return (is_datagram_ == is_datagram);
-  }
   // Called on SUBSCRIBE_OK or SUBSCRIBE_UPDATE.
   bool TruncateStart(Location start) {
     return window_mutable().TruncateStart(start);
@@ -162,7 +153,6 @@
   std::optional<const uint64_t> track_alias_;
   bool forward_;
   SubscribeVisitor* visitor_;
-  std::optional<bool> is_datagram_;
   int currently_open_streams_ = 0;
   // Every stream that has received FIN or RESET_STREAM.
   uint64_t streams_closed_ = 0;