Send and handle MoQT datagrams when required.

draft-03 requires objects with this preference to be sent via datagram, not on a stream.

Not in production.

PiperOrigin-RevId: 613930667
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index b69456a..9137418 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -30,6 +30,7 @@
 namespace {
 
 using ::quiche::QuicheBuffer;
+using ::quiche::WireBytes;
 using ::quiche::WireOptional;
 using ::quiche::WireSpan;
 using ::quiche::WireStringWithVarInt62Length;
@@ -213,6 +214,15 @@
   }
 }
 
+quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram(
+    const MoqtObject& message, absl::string_view payload) {
+  return Serialize(
+      WireVarInt62(MoqtMessageType::kObjectDatagram),
+      WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias),
+      WireVarInt62(message.group_id), WireVarInt62(message.object_id),
+      WireVarInt62(message.object_send_order), WireBytes(payload));
+}
+
 quiche::QuicheBuffer MoqtFramer::SerializeClientSetup(
     const MoqtClientSetup& message) {
   absl::InlinedVector<IntParameter, 1> int_parameters;
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index e4070c5..bf17364 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -36,6 +36,8 @@
   // header if `is_first_in_stream` is set to true.
   quiche::QuicheBuffer SerializeObjectHeader(const MoqtObject& message,
                                              bool is_first_in_stream);
+  quiche::QuicheBuffer SerializeObjectDatagram(const MoqtObject& message,
+                                               absl::string_view payload);
   quiche::QuicheBuffer SerializeClientSetup(const MoqtClientSetup& message);
   quiche::QuicheBuffer SerializeServerSetup(const MoqtServerSetup& message);
   // Returns an empty buffer if there is an illegal combination of locations.
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index fe152d2..36ab8ea 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -31,7 +31,6 @@
   std::vector<MoqtFramerTestParams> params;
   std::vector<MoqtMessageType> message_types = {
       MoqtMessageType::kObjectStream,
-      MoqtMessageType::kObjectPreferDatagram,
       MoqtMessageType::kSubscribe,
       MoqtMessageType::kSubscribeOk,
       MoqtMessageType::kSubscribeError,
@@ -106,7 +105,6 @@
       TestMessageBase::MessageStructuredData& structured_data) {
     switch (message_type_) {
       case MoqtMessageType::kObjectStream:
-      case MoqtMessageType::kObjectPreferDatagram:
       case MoqtMessageType::kStreamHeaderTrack:
       case MoqtMessageType::kStreamHeaderGroup: {
         MoqtObject data = std::get<MoqtObject>(structured_data);
@@ -164,6 +162,9 @@
         auto data = std::get<MoqtServerSetup>(structured_data);
         return framer_.SerializeServerSetup(data);
       }
+      default:
+        // kObjectDatagram is a totally different code path.
+        return quiche::QuicheBuffer();
     }
   }
 
@@ -244,4 +245,22 @@
   EXPECT_TRUE(buffer.empty());
 }
 
+TEST_F(MoqtFramerSimpleTest, Datagram) {
+  auto datagram = std::make_unique<ObjectDatagramMessage>();
+  MoqtObject object = {
+      /*subscribe_id=*/3,
+      /*track_alias=*/4,
+      /*group_id=*/5,
+      /*object_id=*/6,
+      /*object_send_order=*/7,
+      /*forwarding_preference=*/MoqtForwardingPreference::kObject,
+      /*payload_length=*/std::nullopt,
+  };
+  std::string payload = "foo";
+  quiche::QuicheBuffer buffer;
+  buffer = framer_.SerializeObjectDatagram(object, payload);
+  EXPECT_EQ(buffer.size(), datagram->total_message_size());
+  EXPECT_EQ(buffer.AsStringView(), datagram->PacketSample());
+}
+
 }  // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 0a27832..ab18885 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -14,7 +14,7 @@
   switch (message_type) {
     case MoqtMessageType::kObjectStream:
       return "OBJECT_STREAM";
-    case MoqtMessageType::kObjectPreferDatagram:
+    case MoqtMessageType::kObjectDatagram:
       return "OBJECT_PREFER_DATAGRAM";
     case MoqtMessageType::kClientSetup:
       return "CLIENT_SETUP";
@@ -71,7 +71,7 @@
   switch (type) {
     case MoqtMessageType::kObjectStream:
       return MoqtForwardingPreference::kObject;
-    case MoqtMessageType::kObjectPreferDatagram:
+    case MoqtMessageType::kObjectDatagram:
       return MoqtForwardingPreference::kDatagram;
     case MoqtMessageType::kStreamHeaderTrack:
       return MoqtForwardingPreference::kTrack;
@@ -91,7 +91,7 @@
     case MoqtForwardingPreference::kObject:
       return MoqtMessageType::kObjectStream;
     case MoqtForwardingPreference::kDatagram:
-      return MoqtMessageType::kObjectPreferDatagram;
+      return MoqtMessageType::kObjectDatagram;
     case MoqtForwardingPreference::kTrack:
       return MoqtMessageType::kStreamHeaderTrack;
     case MoqtForwardingPreference::kGroup:
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 81d2c4d..2c20105 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -47,7 +47,7 @@
 
 enum class QUICHE_EXPORT MoqtMessageType : uint64_t {
   kObjectStream = 0x00,
-  kObjectPreferDatagram = 0x01,
+  kObjectDatagram = 0x01,
   kSubscribe = 0x03,
   kSubscribeOk = 0x04,
   kSubscribeError = 0x05,
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index fe49c7a..c529088 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -113,6 +113,25 @@
   }
 }
 
+// static
+absl::string_view MoqtParser::ProcessDatagram(absl::string_view data,
+                                              MoqtObject& object_metadata) {
+  uint64_t value;
+  quic::QuicDataReader reader(data);
+  if (!reader.ReadVarInt62(&value)) {
+    return absl::string_view();
+  }
+  if (static_cast<MoqtMessageType>(value) != MoqtMessageType::kObjectDatagram) {
+    return absl::string_view();
+  }
+  size_t processed_data = ParseObjectHeader(reader, object_metadata,
+                                            MoqtMessageType::kObjectDatagram);
+  if (processed_data == 0) {  // Incomplete header
+    return absl::string_view();
+  }
+  return reader.PeekRemainingPayload();
+}
+
 size_t MoqtParser::ProcessMessage(absl::string_view data, bool fin) {
   uint64_t value;
   quic::QuicDataReader reader(data);
@@ -128,8 +147,10 @@
   }
   auto type = static_cast<MoqtMessageType>(value);
   switch (type) {
+    case MoqtMessageType::kObjectDatagram:
+      ParseError("Received OBJECT_DATAGRAM on stream");
+      return 0;
     case MoqtMessageType::kObjectStream:
-    case MoqtMessageType::kObjectPreferDatagram:
     case MoqtMessageType::kStreamHeaderTrack:
     case MoqtMessageType::kStreamHeaderGroup:
       return ProcessObject(reader, type, fin);
@@ -167,36 +188,18 @@
 
 size_t MoqtParser::ProcessObject(quic::QuicDataReader& reader,
                                  MoqtMessageType type, bool fin) {
-  size_t processed_data;
+  size_t processed_data = 0;
   QUICHE_DCHECK(!ObjectPayloadInProgress());
   if (!ObjectStreamInitialized()) {
-    // nothing has been processed on the stream.
     object_metadata_ = MoqtObject();
-    if (!reader.ReadVarInt62(&object_metadata_->subscribe_id) ||
-        !reader.ReadVarInt62(&object_metadata_->track_alias)) {
+    processed_data = ParseObjectHeader(reader, object_metadata_.value(), type);
+    if (processed_data == 0) {
       object_metadata_.reset();
       return 0;
     }
-    if (type != MoqtMessageType::kStreamHeaderTrack &&
-        !reader.ReadVarInt62(&object_metadata_->group_id)) {
-      object_metadata_.reset();
-      return 0;
-    }
-    if (type != MoqtMessageType::kStreamHeaderTrack &&
-        type != MoqtMessageType::kStreamHeaderGroup &&
-        !reader.ReadVarInt62(&object_metadata_->object_id)) {
-      object_metadata_.reset();
-      return 0;
-    }
-    if (!reader.ReadVarInt62(&object_metadata_->object_send_order)) {
-      object_metadata_.reset();
-      return 0;
-    }
-    object_metadata_->forwarding_preference = GetForwardingPreference(type);
   }
   // At this point, enough data has been processed to store in object_metadata_,
   // even if there's nothing else in the buffer.
-  processed_data = reader.PreviouslyReadPayload().length();
   QUICHE_DCHECK(payload_length_remaining_ == 0);
   switch (type) {
     case MoqtMessageType::kStreamHeaderTrack:
@@ -548,6 +551,29 @@
   return reader.PreviouslyReadPayload().length();
 }
 
+// static
+size_t MoqtParser::ParseObjectHeader(quic::QuicDataReader& reader,
+                                     MoqtObject& object, MoqtMessageType type) {
+  if (!reader.ReadVarInt62(&object.subscribe_id) ||
+      !reader.ReadVarInt62(&object.track_alias)) {
+    return 0;
+  }
+  if (type != MoqtMessageType::kStreamHeaderTrack &&
+      !reader.ReadVarInt62(&object.group_id)) {
+    return 0;
+  }
+  if (type != MoqtMessageType::kStreamHeaderTrack &&
+      type != MoqtMessageType::kStreamHeaderGroup &&
+      !reader.ReadVarInt62(&object.object_id)) {
+    return 0;
+  }
+  if (!reader.ReadVarInt62(&object.object_send_order)) {
+    return 0;
+  }
+  object.forwarding_preference = GetForwardingPreference(type);
+  return reader.PreviouslyReadPayload().length();
+}
+
 void MoqtParser::ParseError(absl::string_view reason) {
   ParseError(MoqtError::kProtocolViolation, reason);
 }
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 3bb7f47..4cd19ca 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -66,6 +66,12 @@
   // datagram rather than a stream.
   void ProcessData(absl::string_view data, bool fin);
 
+  // Provide a separate path for datagrams. Returns the payload bytes, or empty
+  // string_view on error. The caller provides the whole datagram in |data|.
+  // The function puts the object metadata in |object_metadata|.
+  static absl::string_view ProcessDatagram(absl::string_view data,
+                                           MoqtObject& object_metadata);
+
  private:
   // The central switch statement to dispatch a message to the correct
   // Process* function. Returns 0 if it could not parse the full messsage
@@ -92,6 +98,9 @@
   size_t ProcessUnannounce(quic::QuicDataReader& reader);
   size_t ProcessGoAway(quic::QuicDataReader& reader);
 
+  static size_t ParseObjectHeader(quic::QuicDataReader& reader,
+                                  MoqtObject& object, MoqtMessageType type);
+
   // If |error| is not provided, assumes kProtocolViolation.
   void ParseError(absl::string_view reason);
   void ParseError(MoqtError error, absl::string_view reason);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 48fa22a..4fcd7dd 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -25,19 +25,19 @@
 
 inline bool IsObjectMessage(MoqtMessageType type) {
   return (type == MoqtMessageType::kObjectStream ||
-          type == MoqtMessageType::kObjectPreferDatagram ||
+          type == MoqtMessageType::kObjectDatagram ||
           type == MoqtMessageType::kStreamHeaderTrack ||
           type == MoqtMessageType::kStreamHeaderGroup);
 }
 
 inline bool IsObjectWithoutPayloadLength(MoqtMessageType type) {
   return (type == MoqtMessageType::kObjectStream ||
-          type == MoqtMessageType::kObjectPreferDatagram);
+          type == MoqtMessageType::kObjectDatagram);
 }
 
 std::vector<MoqtMessageType> message_types = {
     MoqtMessageType::kObjectStream,
-    MoqtMessageType::kObjectPreferDatagram,
+    // kObjectDatagram is a unique set of tests.
     MoqtMessageType::kSubscribe,
     MoqtMessageType::kSubscribeOk,
     MoqtMessageType::kSubscribeError,
@@ -346,27 +346,6 @@
   EXPECT_FALSE(visitor_.parsing_error_.has_value());
 }
 
-TEST_F(MoqtMessageSpecificTest, ObjectPreferDatagramSeparateFin) {
-  // OBJECT can return on an unknown-length message even without receiving a
-  // FIN.
-  MoqtParser parser(kRawQuic, visitor_);
-  auto message = std::make_unique<ObjectPreferDatagramMessage>();
-  parser.ProcessData(message->PacketSample(), false);
-  EXPECT_EQ(visitor_.messages_received_, 1);
-  EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
-  EXPECT_TRUE(visitor_.object_payload_.has_value());
-  EXPECT_EQ(*(visitor_.object_payload_), "foo");
-  EXPECT_FALSE(visitor_.end_of_message_);
-
-  parser.ProcessData(absl::string_view(), true);  // send the FIN
-  EXPECT_EQ(visitor_.messages_received_, 2);
-  EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
-  EXPECT_TRUE(visitor_.object_payload_.has_value());
-  EXPECT_EQ(*(visitor_.object_payload_), "");
-  EXPECT_TRUE(visitor_.end_of_message_);
-  EXPECT_FALSE(visitor_.parsing_error_.has_value());
-}
-
 // Send the header + some payload, pure payload, then pure payload to end the
 // message.
 TEST_F(MoqtMessageSpecificTest, ThreePartObject) {
@@ -807,4 +786,43 @@
   EXPECT_EQ(message.start_object->relative_value, 1);
 }
 
+TEST_F(MoqtMessageSpecificTest, DatagramSuccessful) {
+  ObjectDatagramMessage message;
+  MoqtObject object;
+  absl::string_view payload =
+      MoqtParser::ProcessDatagram(message.PacketSample(), object);
+  TestMessageBase::MessageStructuredData object_metadata =
+      TestMessageBase::MessageStructuredData(object);
+  EXPECT_TRUE(message.EqualFieldValues(object_metadata));
+  EXPECT_EQ(payload, "foo");
+}
+
+TEST_F(MoqtMessageSpecificTest, WrongMessageInDatagram) {
+  MoqtParser parser(kRawQuic, visitor_);
+  ObjectStreamMessage message;
+  MoqtObject object;
+  absl::string_view payload =
+      MoqtParser::ProcessDatagram(message.PacketSample(), object);
+  EXPECT_TRUE(payload.empty());
+}
+
+TEST_F(MoqtMessageSpecificTest, TruncatedDatagram) {
+  MoqtParser parser(kRawQuic, visitor_);
+  ObjectDatagramMessage message;
+  message.set_wire_image_size(4);
+  MoqtObject object;
+  absl::string_view payload =
+      MoqtParser::ProcessDatagram(message.PacketSample(), object);
+  EXPECT_TRUE(payload.empty());
+}
+
+TEST_F(MoqtMessageSpecificTest, VeryTruncatedDatagram) {
+  MoqtParser parser(kRawQuic, visitor_);
+  char message = 0x40;
+  MoqtObject object;
+  absl::string_view payload = MoqtParser::ProcessDatagram(
+      absl::string_view(&message, sizeof(message)), object);
+  EXPECT_TRUE(payload.empty());
+}
+
 }  // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 2340ab3..994c530 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -20,6 +20,7 @@
 #include "absl/types/span.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_parser.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/platform/api/quic_bug_tracker.h"
@@ -113,6 +114,28 @@
   }
 }
 
+void MoqtSession::OnDatagramReceived(absl::string_view datagram) {
+  MoqtObject message;
+  absl::string_view payload = MoqtParser::ProcessDatagram(datagram, message);
+  if (payload.empty()) {
+    Error(MoqtError::kProtocolViolation, "Malformed datagram");
+    return;
+  }
+  QUICHE_DLOG(INFO) << ENDPOINT
+                    << "Received OBJECT message in datagram for subscribe_id "
+                    << message.subscribe_id << " for track alias "
+                    << message.track_alias << " with sequence "
+                    << message.group_id << ":" << message.object_id
+                    << " send_order " << message.object_send_order << " length "
+                    << payload.size();
+  auto [full_track_name, visitor] = TrackPropertiesFromAlias(message);
+  if (visitor != nullptr) {
+    visitor->OnObjectFragment(full_track_name, message.group_id,
+                              message.object_id, message.object_send_order,
+                              message.forwarding_preference, payload, true);
+  }
+}
+
 void MoqtSession::Error(MoqtError code, absl::string_view error) {
   if (!error_.empty()) {
     // Avoid erroring out twice.
@@ -271,6 +294,29 @@
   return new_stream->GetStreamId();
 }
 
+std::pair<FullTrackName, RemoteTrack::Visitor*>
+MoqtSession::TrackPropertiesFromAlias(const MoqtObject& message) {
+  auto it = remote_tracks_.find(message.track_alias);
+  RemoteTrack::Visitor* visitor = nullptr;
+  if (it == remote_tracks_.end()) {
+    // SUBSCRIBE_OK has not arrived yet, but deliver it.
+    auto subscribe_it = active_subscribes_.find(message.subscribe_id);
+    if (subscribe_it == active_subscribes_.end()) {
+      return std::pair<FullTrackName, RemoteTrack::Visitor*>(
+          {{"", ""}, nullptr});
+    }
+    visitor = subscribe_it->second.visitor;
+    return std::pair<FullTrackName, RemoteTrack::Visitor*>(
+        {{subscribe_it->second.message.track_namespace,
+          subscribe_it->second.message.track_name},
+         subscribe_it->second.visitor});
+  }
+  return std::pair<FullTrackName, RemoteTrack::Visitor*>(
+      {{it->second.full_track_name().track_namespace,
+        it->second.full_track_name().track_name},
+       it->second.visitor()});
+}
+
 bool MoqtSession::PublishObject(const FullTrackName& full_track_name,
                                 uint64_t group_id, uint64_t object_id,
                                 uint64_t object_send_order,
@@ -308,8 +354,14 @@
   quiche::StreamWriteOptions write_options;
   write_options.set_send_fin(end_of_stream);
   for (auto subscription : subscriptions) {
-    // TODO: kPreferDatagram should bypass stream stuff. For now, send it on the
-    // stream.
+    if (forwarding_preference == MoqtForwardingPreference::kDatagram) {
+      quiche::QuicheBuffer datagram =
+          framer_.SerializeObjectDatagram(object, payload);
+      // TODO(martinduke): It's OK to just silently fail, but better to notify
+      // the app on errors.
+      session_->SendOrQueueDatagram(datagram.AsStringView());
+      continue;
+    }
     bool new_stream = false;
     std::optional<webtransport::StreamId> stream_id =
         subscription->GetStreamForSequence({group_id, object_id},
@@ -412,29 +464,12 @@
       payload = absl::string_view(partial_object_);
     }
   }
-  auto it = session_->remote_tracks_.find(message.track_alias);
-  RemoteTrack::Visitor* visitor = nullptr;
-  absl::string_view track_namespace;
-  absl::string_view track_name;
-  if (it == session_->remote_tracks_.end()) {
-    // SUBSCRIBE_OK has not arrived yet, but deliver it.
-    auto subscribe_it = session_->active_subscribes_.find(message.subscribe_id);
-    if (subscribe_it == session_->active_subscribes_.end()) {
-      return;
-    }
-    visitor = subscribe_it->second.visitor;
-    track_namespace = subscribe_it->second.message.track_namespace;
-    track_name = subscribe_it->second.message.track_name;
-  } else {
-    visitor = it->second.visitor();
-    track_namespace = it->second.full_track_name().track_namespace;
-    track_name = it->second.full_track_name().track_name;
-  }
+  auto [full_track_name, visitor] = session_->TrackPropertiesFromAlias(message);
   if (visitor != nullptr) {
-    visitor->OnObjectFragment(
-        FullTrackName(track_namespace, track_name), message.group_id,
-        message.object_id, message.object_send_order,
-        message.forwarding_preference, payload, end_of_message);
+    visitor->OnObjectFragment(full_track_name, message.group_id,
+                              message.object_id, message.object_send_order,
+                              message.forwarding_preference, payload,
+                              end_of_message);
   }
   partial_object_.clear();
 }
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 73a92e0..adf5cb8 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -77,7 +77,7 @@
                        const std::string&) override;
   void OnIncomingBidirectionalStreamAvailable() override;
   void OnIncomingUnidirectionalStreamAvailable() override;
-  void OnDatagramReceived(absl::string_view /*datagram*/) override {}
+  void OnDatagramReceived(absl::string_view datagram) override;
   void OnCanCreateNewOutgoingBidirectionalStream() override {}
   void OnCanCreateNewOutgoingUnidirectionalStream() override {}
 
@@ -222,6 +222,11 @@
   // TODO: Add a callback if stream creation is delayed.
   std::optional<webtransport::StreamId> OpenUnidirectionalStream();
 
+  // Get FullTrackName and visitor for a subscribe_id and track_alias. Returns
+  // nullptr if not present.
+  std::pair<FullTrackName, RemoteTrack::Visitor*> TrackPropertiesFromAlias(
+      const MoqtObject& message);
+
   webtransport::Session* session_;
   MoqtSessionParameters parameters_;
   MoqtSessionCallbacks callbacks_;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 87c1909..eee03bf 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -895,6 +895,56 @@
   EXPECT_FALSE(session_.HasSubscribers(ftn));
 }
 
+TEST_F(MoqtSessionTest, SendDatagram) {
+  FullTrackName ftn("foo", "bar");
+  MockLocalTrackVisitor track_visitor;
+  session_.AddLocalTrack(ftn, &track_visitor);
+  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+
+  // Publish in window.
+  bool correct_message = false;
+  uint8_t kExpectedMessage[] = {
+      0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x64,
+      0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66,
+  };
+  EXPECT_CALL(mock_session_, SendOrQueueDatagram(_))
+      .WillOnce([&](absl::string_view datagram) {
+        if (datagram.size() == sizeof(kExpectedMessage)) {
+          correct_message = (0 == memcmp(datagram.data(), kExpectedMessage,
+                                         sizeof(kExpectedMessage)));
+        }
+        return webtransport::DatagramStatus(
+            webtransport::DatagramStatusCode::kSuccess, "");
+      });
+  session_.PublishObject(ftn, 5, 0, 0, MoqtForwardingPreference::kDatagram,
+                         "deadbeef", true);
+  EXPECT_TRUE(correct_message);
+}
+
+TEST_F(MoqtSessionTest, ReceiveDatagram) {
+  MockRemoteTrackVisitor visitor_;
+  FullTrackName ftn("foo", "bar");
+  std::string payload = "deadbeef";
+  MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor_, 2);
+  MoqtObject object = {
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
+      /*group_sequence=*/0,
+      /*object_sequence=*/0,
+      /*object_send_order=*/0,
+      /*forwarding_preference=*/MoqtForwardingPreference::kDatagram,
+      /*payload_length=*/8,
+  };
+  char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x64,
+                     0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
+  EXPECT_CALL(visitor_,
+              OnObjectFragment(ftn, object.group_id, object.object_id,
+                               object.object_send_order,
+                               object.forwarding_preference, payload, true))
+      .Times(1);
+  session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
+}
+
 // TODO: Cover more error cases in the above
 
 }  // namespace test
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 6a0891d..44512c4 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -184,10 +184,9 @@
   };
 };
 
-class QUICHE_NO_EXPORT ObjectPreferDatagramMessage : public ObjectMessage {
+class QUICHE_NO_EXPORT ObjectDatagramMessage : public ObjectMessage {
  public:
-  ObjectPreferDatagramMessage()
-      : ObjectMessage(MoqtMessageType::kObjectPreferDatagram) {
+  ObjectDatagramMessage() : ObjectMessage(MoqtMessageType::kObjectDatagram) {
     SetWireImage(raw_packet_, sizeof(raw_packet_));
     object_.forwarding_preference = MoqtForwardingPreference::kDatagram;
   }
@@ -891,8 +890,8 @@
   switch (message_type) {
     case MoqtMessageType::kObjectStream:
       return std::make_unique<ObjectStreamMessage>();
-    case MoqtMessageType::kObjectPreferDatagram:
-      return std::make_unique<ObjectPreferDatagramMessage>();
+    case MoqtMessageType::kObjectDatagram:
+      return std::make_unique<ObjectDatagramMessage>();
     case MoqtMessageType::kSubscribe:
       return std::make_unique<SubscribeMessage>();
     case MoqtMessageType::kSubscribeOk: