Add Object Status to MoQT Object Messages.

Corrects an oversight in updating to the latest draft.

This is just the parser and framer; using the information will be a different CL.

PiperOrigin-RevId: 643663252
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 4bb3a04..fd9aeef 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -136,15 +136,31 @@
            "length in advance";
     return quiche::QuicheBuffer();
   }
+  if (message.object_status != MoqtObjectStatus::kNormal &&
+      message.payload_length.has_value() && *message.payload_length > 0) {
+    QUIC_BUG(quic_bug_serialize_object_input_03)
+        << "Object status must be kNormal if payload is non-empty";
+    return quiche::QuicheBuffer();
+  }
   if (!is_first_in_stream) {
     switch (message.forwarding_preference) {
       case MoqtForwardingPreference::kTrack:
-        return Serialize(WireVarInt62(message.group_id),
-                         WireVarInt62(message.object_id),
-                         WireVarInt62(*message.payload_length));
+        return (*message.payload_length == 0)
+                   ? Serialize(WireVarInt62(message.group_id),
+                               WireVarInt62(message.object_id),
+                               WireVarInt62(*message.payload_length),
+                               WireVarInt62(message.object_status))
+                   : Serialize(WireVarInt62(message.group_id),
+                               WireVarInt62(message.object_id),
+                               WireVarInt62(*message.payload_length));
       case MoqtForwardingPreference::kGroup:
-        return Serialize(WireVarInt62(message.object_id),
-                         WireVarInt62(*message.payload_length));
+        return (*message.payload_length == 0)
+                   ? Serialize(WireVarInt62(message.object_id),
+                               WireVarInt62(*message.payload_length),
+                               WireVarInt62(static_cast<uint64_t>(
+                                   message.object_status)))
+                   : Serialize(WireVarInt62(message.object_id),
+                               WireVarInt62(*message.payload_length));
       default:
         QUIC_BUG(quic_bug_serialize_object_input_02)
             << "Object or Datagram forwarding_preference must be first in "
@@ -156,36 +172,64 @@
       GetMessageTypeForForwardingPreference(message.forwarding_preference);
   switch (message.forwarding_preference) {
     case MoqtForwardingPreference::kTrack:
-      return Serialize(
-          WireVarInt62(message_type), WireVarInt62(message.subscribe_id),
-          WireVarInt62(message.track_alias),
-          WireVarInt62(message.object_send_order),
-          WireVarInt62(message.group_id), WireVarInt62(message.object_id),
-          WireVarInt62(*message.payload_length));
+      return (*message.payload_length == 0)
+                 ? Serialize(WireVarInt62(message_type),
+                             WireVarInt62(message.subscribe_id),
+                             WireVarInt62(message.track_alias),
+                             WireVarInt62(message.object_send_order),
+                             WireVarInt62(message.group_id),
+                             WireVarInt62(message.object_id),
+                             WireVarInt62(*message.payload_length),
+                             WireVarInt62(message.object_status))
+                 : Serialize(WireVarInt62(message_type),
+                             WireVarInt62(message.subscribe_id),
+                             WireVarInt62(message.track_alias),
+                             WireVarInt62(message.object_send_order),
+                             WireVarInt62(message.group_id),
+                             WireVarInt62(message.object_id),
+                             WireVarInt62(*message.payload_length));
     case MoqtForwardingPreference::kGroup:
-      return Serialize(
-          WireVarInt62(message_type), WireVarInt62(message.subscribe_id),
-          WireVarInt62(message.track_alias), WireVarInt62(message.group_id),
-          WireVarInt62(message.object_send_order),
-          WireVarInt62(message.object_id),
-          WireVarInt62(*message.payload_length));
+      return (*message.payload_length == 0)
+                 ? Serialize(WireVarInt62(message_type),
+                             WireVarInt62(message.subscribe_id),
+                             WireVarInt62(message.track_alias),
+                             WireVarInt62(message.group_id),
+                             WireVarInt62(message.object_send_order),
+                             WireVarInt62(message.object_id),
+                             WireVarInt62(*message.payload_length),
+                             WireVarInt62(message.object_status))
+                 : Serialize(WireVarInt62(message_type),
+                             WireVarInt62(message.subscribe_id),
+                             WireVarInt62(message.track_alias),
+                             WireVarInt62(message.group_id),
+                             WireVarInt62(message.object_send_order),
+                             WireVarInt62(message.object_id),
+                             WireVarInt62(*message.payload_length));
     case MoqtForwardingPreference::kObject:
     case MoqtForwardingPreference::kDatagram:
       return Serialize(
           WireVarInt62(message_type), WireVarInt62(message.subscribe_id),
           WireVarInt62(message.track_alias), WireVarInt62(message.group_id),
           WireVarInt62(message.object_id),
-          WireVarInt62(message.object_send_order));
+          WireVarInt62(message.object_send_order),
+          WireVarInt62(message.object_status));
   }
 }
 
 quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram(
     const MoqtObject& message, absl::string_view payload) {
+  if (message.object_status != MoqtObjectStatus::kNormal && !payload.empty()) {
+    QUIC_BUG(quic_bug_serialize_object_datagram_01)
+        << "Object status must be kNormal if payload is non-empty";
+    return quiche::QuicheBuffer();
+  }
   return Serialize(
       WireVarInt62(MoqtMessageType::kObjectDatagram),
       WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias),
       WireVarInt62(message.group_id), WireVarInt62(message.object_id),
-      WireVarInt62(message.object_send_order), WireBytes(payload));
+      WireVarInt62(message.object_send_order),
+      WireVarInt62(static_cast<uint64_t>(message.object_status)),
+      WireBytes(payload));
 }
 
 quiche::QuicheBuffer MoqtFramer::SerializeClientSetup(
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index 7907d30..916ac1d 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -241,6 +241,7 @@
       /*group_id=*/5,
       /*object_id=*/6,
       /*object_send_order=*/7,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kObject,
       /*payload_length=*/std::nullopt,
   };
@@ -253,6 +254,11 @@
   EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
                   "requires knowing the object length");
   EXPECT_TRUE(buffer.empty());
+  object.payload_length = 5;
+  object.object_status = MoqtObjectStatus::kEndOfGroup;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
+                  "Object status must be kNormal if payload is non-empty");
+  EXPECT_TRUE(buffer.empty());
 }
 
 TEST_F(MoqtFramerSimpleTest, Datagram) {
@@ -263,6 +269,7 @@
       /*group_id=*/5,
       /*object_id=*/6,
       /*object_send_order=*/7,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kObject,
       /*payload_length=*/std::nullopt,
   };
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 18a73ad..277687f 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -10,6 +10,13 @@
 
 namespace moqt {
 
+MoqtObjectStatus IntegerToObjectStatus(uint64_t integer) {
+  if (integer >= 0x5) {
+    return MoqtObjectStatus::kInvalidObjectStatus;
+  }
+  return static_cast<MoqtObjectStatus>(integer);
+}
+
 MoqtFilterType GetFilterType(const MoqtSubscribe& message) {
   if (!message.end_group.has_value() && message.end_object.has_value()) {
     return MoqtFilterType::kNone;
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index c117946..c2a22cd 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -190,6 +190,17 @@
   kDatagram = 0x3,
 };
 
+enum class QUICHE_EXPORT MoqtObjectStatus : uint64_t {
+  kNormal = 0x0,
+  kObjectDoesNotExist = 0x1,
+  kGroupDoesNotExist = 0x2,
+  kEndOfGroup = 0x3,
+  kEndOfTrack = 0x4,
+  kInvalidObjectStatus = 0x5,
+};
+
+MoqtObjectStatus IntegerToObjectStatus(uint64_t integer);
+
 // The data contained in every Object message, although the message type
 // implies some of the values. |payload_length| has no value if the length
 // is unknown (because it runs to the end of the stream.)
@@ -199,6 +210,7 @@
   uint64_t group_id;
   uint64_t object_id;
   uint64_t object_send_order;
+  MoqtObjectStatus object_status;
   MoqtForwardingPreference forwarding_preference;
   std::optional<uint64_t> payload_length;
 };
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index e1eba0b..ffa0cfa 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -220,11 +220,33 @@
         return processed_data;
       }
       object_metadata_->payload_length = length;
+      uint64_t status = 0;  // Defaults to kNormal.
+      if (length == 0 && !reader.ReadVarInt62(&status)) {
+        return processed_data;
+      }
+      object_metadata_->object_status = IntegerToObjectStatus(status);
       break;
     }
     default:
       break;
   }
+  if (object_metadata_->object_status ==
+      MoqtObjectStatus::kInvalidObjectStatus) {
+    ParseError("Invalid object status");
+    return processed_data;
+  }
+  if (object_metadata_->object_status != MoqtObjectStatus::kNormal) {
+    // It is impossible to express an explicit length with this status.
+    if ((type == MoqtMessageType::kObjectStream ||
+         type == MoqtMessageType::kObjectDatagram) &&
+        reader.BytesRemaining() > 0) {
+      // There is additional data in the stream/datagram, which is an error.
+      ParseError("Object with non-normal status has payload");
+      return processed_data;
+    }
+    visitor_.OnObjectMessage(*object_metadata_, "", true);
+    return processed_data;
+  }
   bool has_length = object_metadata_->payload_length.has_value();
   bool received_complete_message = false;
   size_t payload_to_draw = reader.BytesRemaining();
@@ -722,6 +744,13 @@
   if (!reader.ReadVarInt62(&object.object_send_order)) {
     return 0;
   }
+  uint64_t status = 0;  // Defaults to kNormal.
+  if ((type == MoqtMessageType::kObjectStream ||
+       type == MoqtMessageType::kObjectDatagram) &&
+      !reader.ReadVarInt62(&status)) {
+    return 0;
+  }
+  object.object_status = IntegerToObjectStatus(status);
   object.forwarding_preference = GetForwardingPreference(type);
   return reader.PreviouslyReadPayload().length();
 }
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 04a6ef7..bada6cd 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -131,6 +131,7 @@
   // for the most recent object.
   bool ObjectPayloadInProgress() const {
     return (object_metadata_.has_value() &&
+            object_metadata_->object_status == MoqtObjectStatus::kNormal &&
             (object_metadata_->forwarding_preference ==
                  MoqtForwardingPreference::kObject ||
              object_metadata_->forwarding_preference ==
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 4794f0e..80dfade 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -407,7 +407,9 @@
   EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
   EXPECT_FALSE(visitor_.end_of_message_);
   EXPECT_TRUE(visitor_.object_payload_.has_value());
-  EXPECT_EQ(visitor_.object_payload_->length(), 94);
+  // The value "93" is the overall wire image size of 100 minus the non-payload
+  // part of the message.
+  EXPECT_EQ(visitor_.object_payload_->length(), 93);
 
   // third part includes FIN
   parser.ProcessData("bar", true);
@@ -704,6 +706,33 @@
   EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
 }
 
+TEST_F(MoqtMessageSpecificTest, NonNormalObjectHasPayload) {
+  MoqtParser parser(kRawQuic, visitor_);
+  char object_stream[] = {
+      0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x02,  // varints
+      0x66, 0x6f, 0x6f,                          // payload = "foo"
+  };
+  parser.ProcessData(absl::string_view(object_stream, sizeof(object_stream)),
+                     false);
+  EXPECT_TRUE(visitor_.parsing_error_.has_value());
+  EXPECT_EQ(*visitor_.parsing_error_,
+            "Object with non-normal status has payload");
+  EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
+}
+
+TEST_F(MoqtMessageSpecificTest, InvalidObjectStatus) {
+  MoqtParser parser(kRawQuic, visitor_);
+  char object_stream[] = {
+      0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x06,  // varints
+      0x66, 0x6f, 0x6f,                          // payload = "foo"
+  };
+  parser.ProcessData(absl::string_view(object_stream, sizeof(object_stream)),
+                     false);
+  EXPECT_TRUE(visitor_.parsing_error_.has_value());
+  EXPECT_EQ(*visitor_.parsing_error_, "Invalid object status");
+  EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
+}
+
 TEST_F(MoqtMessageSpecificTest, Setup2KB) {
   MoqtParser parser(kRawQuic, visitor_);
   char big_message[2 * kMaxMessageHeaderSize];
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index b3934cc..92bf67f 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -437,6 +437,7 @@
   object.group_id = group_id;
   object.object_id = object_id;
   object.object_send_order = object_send_order;
+  object.object_status = MoqtObjectStatus::kNormal;
   object.forwarding_preference = forwarding_preference;
   object.payload_length = payload.size();
   int failures = 0;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 89877b2..51b60ae 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -571,6 +571,7 @@
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/8,
   };
@@ -595,6 +596,7 @@
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/16,
   };
@@ -629,6 +631,7 @@
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/16,
   };
@@ -664,6 +667,7 @@
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/8,
   };
@@ -719,6 +723,7 @@
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/8,
   };
@@ -778,6 +783,7 @@
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/8,
   };
@@ -828,6 +834,7 @@
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/8,
   };
@@ -1103,7 +1110,7 @@
   // Publish in window.
   bool correct_message = false;
   uint8_t kExpectedMessage[] = {
-      0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x64,
+      0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x00, 0x64,
       0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66,
   };
   EXPECT_CALL(mock_session_, SendOrQueueDatagram(_))
@@ -1130,10 +1137,11 @@
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kDatagram,
       /*payload_length=*/8,
   };
-  char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x64,
+  char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x64,
                      0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
   EXPECT_CALL(visitor_,
               OnObjectFragment(ftn, object.group_id, object.object_id,
@@ -1154,6 +1162,7 @@
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/8,
   };
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 227d298..2c2c323 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -19,7 +19,6 @@
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/platform/api/quic_logging.h"
-#include "quiche/quic/platform/api/quic_test.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/quiche_endian.h"
 
@@ -140,6 +139,10 @@
       QUIC_LOG(INFO) << "OBJECT Object Send Order mismatch";
       return false;
     }
+    if (cast.object_status != object_.object_status) {
+      QUIC_LOG(INFO) << "OBJECT Object Status mismatch";
+      return false;
+    }
     if (cast.forwarding_preference != object_.forwarding_preference) {
       QUIC_LOG(INFO) << "OBJECT Object Send Order mismatch";
       return false;
@@ -162,6 +165,7 @@
       /*group_id*/ 5,
       /*object_id=*/6,
       /*object_send_order=*/7,
+      /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kTrack,
       /*payload_length=*/std::nullopt,
   };
@@ -175,13 +179,13 @@
   }
 
   void ExpandVarints() override {
-    ExpandVarintsImpl("vvvvvv");  // first six fields are varints
+    ExpandVarintsImpl("vvvvvvv---");  // first six fields are varints
   }
 
  private:
-  uint8_t raw_packet_[9] = {
-      0x00, 0x03, 0x04, 0x05, 0x06, 0x07,  // varints
-      0x66, 0x6f, 0x6f,                    // payload = "foo"
+  uint8_t raw_packet_[10] = {
+      0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00,  // varints
+      0x66, 0x6f, 0x6f,                          // payload = "foo"
   };
 };
 
@@ -193,13 +197,13 @@
   }
 
   void ExpandVarints() override {
-    ExpandVarintsImpl("vvvvvv");  // first six fields are varints
+    ExpandVarintsImpl("vvvvvvv---");  // first six fields are varints
   }
 
  private:
-  uint8_t raw_packet_[9] = {
-      0x01, 0x03, 0x04, 0x05, 0x06, 0x07,  // varints
-      0x66, 0x6f, 0x6f,                    // payload = "foo"
+  uint8_t raw_packet_[10] = {
+      0x01, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00,  // varints
+      0x66, 0x6f, 0x6f,                          // payload = "foo"
   };
 };
 
@@ -247,7 +251,8 @@
 
  private:
   uint8_t raw_packet_[6] = {
-      0x09, 0x0a, 0x03, 0x62, 0x61, 0x72,  // object middler; payload = "bar"
+      0x09, 0x0a,              // object middler
+      0x03, 0x62, 0x61, 0x72,  // payload = "bar"
   };
 };