Send and handle MoQT datagrams when required.
draft-03 requires objects with this preference to be sent via datagram, not on a stream.
Not in production.
PiperOrigin-RevId: 613930667
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index b69456a..9137418 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -30,6 +30,7 @@
namespace {
using ::quiche::QuicheBuffer;
+using ::quiche::WireBytes;
using ::quiche::WireOptional;
using ::quiche::WireSpan;
using ::quiche::WireStringWithVarInt62Length;
@@ -213,6 +214,15 @@
}
}
+quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram(
+ const MoqtObject& message, absl::string_view payload) {
+ return Serialize(
+ WireVarInt62(MoqtMessageType::kObjectDatagram),
+ WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias),
+ WireVarInt62(message.group_id), WireVarInt62(message.object_id),
+ WireVarInt62(message.object_send_order), WireBytes(payload));
+}
+
quiche::QuicheBuffer MoqtFramer::SerializeClientSetup(
const MoqtClientSetup& message) {
absl::InlinedVector<IntParameter, 1> int_parameters;
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index e4070c5..bf17364 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -36,6 +36,8 @@
// header if `is_first_in_stream` is set to true.
quiche::QuicheBuffer SerializeObjectHeader(const MoqtObject& message,
bool is_first_in_stream);
+ quiche::QuicheBuffer SerializeObjectDatagram(const MoqtObject& message,
+ absl::string_view payload);
quiche::QuicheBuffer SerializeClientSetup(const MoqtClientSetup& message);
quiche::QuicheBuffer SerializeServerSetup(const MoqtServerSetup& message);
// Returns an empty buffer if there is an illegal combination of locations.
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index fe152d2..36ab8ea 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -31,7 +31,6 @@
std::vector<MoqtFramerTestParams> params;
std::vector<MoqtMessageType> message_types = {
MoqtMessageType::kObjectStream,
- MoqtMessageType::kObjectPreferDatagram,
MoqtMessageType::kSubscribe,
MoqtMessageType::kSubscribeOk,
MoqtMessageType::kSubscribeError,
@@ -106,7 +105,6 @@
TestMessageBase::MessageStructuredData& structured_data) {
switch (message_type_) {
case MoqtMessageType::kObjectStream:
- case MoqtMessageType::kObjectPreferDatagram:
case MoqtMessageType::kStreamHeaderTrack:
case MoqtMessageType::kStreamHeaderGroup: {
MoqtObject data = std::get<MoqtObject>(structured_data);
@@ -164,6 +162,9 @@
auto data = std::get<MoqtServerSetup>(structured_data);
return framer_.SerializeServerSetup(data);
}
+ default:
+ // kObjectDatagram is a totally different code path.
+ return quiche::QuicheBuffer();
}
}
@@ -244,4 +245,22 @@
EXPECT_TRUE(buffer.empty());
}
+TEST_F(MoqtFramerSimpleTest, Datagram) {
+ auto datagram = std::make_unique<ObjectDatagramMessage>();
+ MoqtObject object = {
+ /*subscribe_id=*/3,
+ /*track_alias=*/4,
+ /*group_id=*/5,
+ /*object_id=*/6,
+ /*object_send_order=*/7,
+ /*forwarding_preference=*/MoqtForwardingPreference::kObject,
+ /*payload_length=*/std::nullopt,
+ };
+ std::string payload = "foo";
+ quiche::QuicheBuffer buffer;
+ buffer = framer_.SerializeObjectDatagram(object, payload);
+ EXPECT_EQ(buffer.size(), datagram->total_message_size());
+ EXPECT_EQ(buffer.AsStringView(), datagram->PacketSample());
+}
+
} // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 0a27832..ab18885 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -14,7 +14,7 @@
switch (message_type) {
case MoqtMessageType::kObjectStream:
return "OBJECT_STREAM";
- case MoqtMessageType::kObjectPreferDatagram:
+ case MoqtMessageType::kObjectDatagram:
return "OBJECT_PREFER_DATAGRAM";
case MoqtMessageType::kClientSetup:
return "CLIENT_SETUP";
@@ -71,7 +71,7 @@
switch (type) {
case MoqtMessageType::kObjectStream:
return MoqtForwardingPreference::kObject;
- case MoqtMessageType::kObjectPreferDatagram:
+ case MoqtMessageType::kObjectDatagram:
return MoqtForwardingPreference::kDatagram;
case MoqtMessageType::kStreamHeaderTrack:
return MoqtForwardingPreference::kTrack;
@@ -91,7 +91,7 @@
case MoqtForwardingPreference::kObject:
return MoqtMessageType::kObjectStream;
case MoqtForwardingPreference::kDatagram:
- return MoqtMessageType::kObjectPreferDatagram;
+ return MoqtMessageType::kObjectDatagram;
case MoqtForwardingPreference::kTrack:
return MoqtMessageType::kStreamHeaderTrack;
case MoqtForwardingPreference::kGroup:
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 81d2c4d..2c20105 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -47,7 +47,7 @@
enum class QUICHE_EXPORT MoqtMessageType : uint64_t {
kObjectStream = 0x00,
- kObjectPreferDatagram = 0x01,
+ kObjectDatagram = 0x01,
kSubscribe = 0x03,
kSubscribeOk = 0x04,
kSubscribeError = 0x05,
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index fe49c7a..c529088 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -113,6 +113,25 @@
}
}
+// static
+absl::string_view MoqtParser::ProcessDatagram(absl::string_view data,
+ MoqtObject& object_metadata) {
+ uint64_t value;
+ quic::QuicDataReader reader(data);
+ if (!reader.ReadVarInt62(&value)) {
+ return absl::string_view();
+ }
+ if (static_cast<MoqtMessageType>(value) != MoqtMessageType::kObjectDatagram) {
+ return absl::string_view();
+ }
+ size_t processed_data = ParseObjectHeader(reader, object_metadata,
+ MoqtMessageType::kObjectDatagram);
+ if (processed_data == 0) { // Incomplete header
+ return absl::string_view();
+ }
+ return reader.PeekRemainingPayload();
+}
+
size_t MoqtParser::ProcessMessage(absl::string_view data, bool fin) {
uint64_t value;
quic::QuicDataReader reader(data);
@@ -128,8 +147,10 @@
}
auto type = static_cast<MoqtMessageType>(value);
switch (type) {
+ case MoqtMessageType::kObjectDatagram:
+ ParseError("Received OBJECT_DATAGRAM on stream");
+ return 0;
case MoqtMessageType::kObjectStream:
- case MoqtMessageType::kObjectPreferDatagram:
case MoqtMessageType::kStreamHeaderTrack:
case MoqtMessageType::kStreamHeaderGroup:
return ProcessObject(reader, type, fin);
@@ -167,36 +188,18 @@
size_t MoqtParser::ProcessObject(quic::QuicDataReader& reader,
MoqtMessageType type, bool fin) {
- size_t processed_data;
+ size_t processed_data = 0;
QUICHE_DCHECK(!ObjectPayloadInProgress());
if (!ObjectStreamInitialized()) {
- // nothing has been processed on the stream.
object_metadata_ = MoqtObject();
- if (!reader.ReadVarInt62(&object_metadata_->subscribe_id) ||
- !reader.ReadVarInt62(&object_metadata_->track_alias)) {
+ processed_data = ParseObjectHeader(reader, object_metadata_.value(), type);
+ if (processed_data == 0) {
object_metadata_.reset();
return 0;
}
- if (type != MoqtMessageType::kStreamHeaderTrack &&
- !reader.ReadVarInt62(&object_metadata_->group_id)) {
- object_metadata_.reset();
- return 0;
- }
- if (type != MoqtMessageType::kStreamHeaderTrack &&
- type != MoqtMessageType::kStreamHeaderGroup &&
- !reader.ReadVarInt62(&object_metadata_->object_id)) {
- object_metadata_.reset();
- return 0;
- }
- if (!reader.ReadVarInt62(&object_metadata_->object_send_order)) {
- object_metadata_.reset();
- return 0;
- }
- object_metadata_->forwarding_preference = GetForwardingPreference(type);
}
// At this point, enough data has been processed to store in object_metadata_,
// even if there's nothing else in the buffer.
- processed_data = reader.PreviouslyReadPayload().length();
QUICHE_DCHECK(payload_length_remaining_ == 0);
switch (type) {
case MoqtMessageType::kStreamHeaderTrack:
@@ -548,6 +551,29 @@
return reader.PreviouslyReadPayload().length();
}
+// static
+size_t MoqtParser::ParseObjectHeader(quic::QuicDataReader& reader,
+ MoqtObject& object, MoqtMessageType type) {
+ if (!reader.ReadVarInt62(&object.subscribe_id) ||
+ !reader.ReadVarInt62(&object.track_alias)) {
+ return 0;
+ }
+ if (type != MoqtMessageType::kStreamHeaderTrack &&
+ !reader.ReadVarInt62(&object.group_id)) {
+ return 0;
+ }
+ if (type != MoqtMessageType::kStreamHeaderTrack &&
+ type != MoqtMessageType::kStreamHeaderGroup &&
+ !reader.ReadVarInt62(&object.object_id)) {
+ return 0;
+ }
+ if (!reader.ReadVarInt62(&object.object_send_order)) {
+ return 0;
+ }
+ object.forwarding_preference = GetForwardingPreference(type);
+ return reader.PreviouslyReadPayload().length();
+}
+
void MoqtParser::ParseError(absl::string_view reason) {
ParseError(MoqtError::kProtocolViolation, reason);
}
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 3bb7f47..4cd19ca 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -66,6 +66,12 @@
// datagram rather than a stream.
void ProcessData(absl::string_view data, bool fin);
+ // Provide a separate path for datagrams. Returns the payload bytes, or empty
+ // string_view on error. The caller provides the whole datagram in |data|.
+ // The function puts the object metadata in |object_metadata|.
+ static absl::string_view ProcessDatagram(absl::string_view data,
+ MoqtObject& object_metadata);
+
private:
// The central switch statement to dispatch a message to the correct
// Process* function. Returns 0 if it could not parse the full messsage
@@ -92,6 +98,9 @@
size_t ProcessUnannounce(quic::QuicDataReader& reader);
size_t ProcessGoAway(quic::QuicDataReader& reader);
+ static size_t ParseObjectHeader(quic::QuicDataReader& reader,
+ MoqtObject& object, MoqtMessageType type);
+
// If |error| is not provided, assumes kProtocolViolation.
void ParseError(absl::string_view reason);
void ParseError(MoqtError error, absl::string_view reason);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 48fa22a..4fcd7dd 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -25,19 +25,19 @@
inline bool IsObjectMessage(MoqtMessageType type) {
return (type == MoqtMessageType::kObjectStream ||
- type == MoqtMessageType::kObjectPreferDatagram ||
+ type == MoqtMessageType::kObjectDatagram ||
type == MoqtMessageType::kStreamHeaderTrack ||
type == MoqtMessageType::kStreamHeaderGroup);
}
inline bool IsObjectWithoutPayloadLength(MoqtMessageType type) {
return (type == MoqtMessageType::kObjectStream ||
- type == MoqtMessageType::kObjectPreferDatagram);
+ type == MoqtMessageType::kObjectDatagram);
}
std::vector<MoqtMessageType> message_types = {
MoqtMessageType::kObjectStream,
- MoqtMessageType::kObjectPreferDatagram,
+ // kObjectDatagram is a unique set of tests.
MoqtMessageType::kSubscribe,
MoqtMessageType::kSubscribeOk,
MoqtMessageType::kSubscribeError,
@@ -346,27 +346,6 @@
EXPECT_FALSE(visitor_.parsing_error_.has_value());
}
-TEST_F(MoqtMessageSpecificTest, ObjectPreferDatagramSeparateFin) {
- // OBJECT can return on an unknown-length message even without receiving a
- // FIN.
- MoqtParser parser(kRawQuic, visitor_);
- auto message = std::make_unique<ObjectPreferDatagramMessage>();
- parser.ProcessData(message->PacketSample(), false);
- EXPECT_EQ(visitor_.messages_received_, 1);
- EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
- EXPECT_TRUE(visitor_.object_payload_.has_value());
- EXPECT_EQ(*(visitor_.object_payload_), "foo");
- EXPECT_FALSE(visitor_.end_of_message_);
-
- parser.ProcessData(absl::string_view(), true); // send the FIN
- EXPECT_EQ(visitor_.messages_received_, 2);
- EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
- EXPECT_TRUE(visitor_.object_payload_.has_value());
- EXPECT_EQ(*(visitor_.object_payload_), "");
- EXPECT_TRUE(visitor_.end_of_message_);
- EXPECT_FALSE(visitor_.parsing_error_.has_value());
-}
-
// Send the header + some payload, pure payload, then pure payload to end the
// message.
TEST_F(MoqtMessageSpecificTest, ThreePartObject) {
@@ -807,4 +786,43 @@
EXPECT_EQ(message.start_object->relative_value, 1);
}
+TEST_F(MoqtMessageSpecificTest, DatagramSuccessful) {
+ ObjectDatagramMessage message;
+ MoqtObject object;
+ absl::string_view payload =
+ MoqtParser::ProcessDatagram(message.PacketSample(), object);
+ TestMessageBase::MessageStructuredData object_metadata =
+ TestMessageBase::MessageStructuredData(object);
+ EXPECT_TRUE(message.EqualFieldValues(object_metadata));
+ EXPECT_EQ(payload, "foo");
+}
+
+TEST_F(MoqtMessageSpecificTest, WrongMessageInDatagram) {
+ MoqtParser parser(kRawQuic, visitor_);
+ ObjectStreamMessage message;
+ MoqtObject object;
+ absl::string_view payload =
+ MoqtParser::ProcessDatagram(message.PacketSample(), object);
+ EXPECT_TRUE(payload.empty());
+}
+
+TEST_F(MoqtMessageSpecificTest, TruncatedDatagram) {
+ MoqtParser parser(kRawQuic, visitor_);
+ ObjectDatagramMessage message;
+ message.set_wire_image_size(4);
+ MoqtObject object;
+ absl::string_view payload =
+ MoqtParser::ProcessDatagram(message.PacketSample(), object);
+ EXPECT_TRUE(payload.empty());
+}
+
+TEST_F(MoqtMessageSpecificTest, VeryTruncatedDatagram) {
+ MoqtParser parser(kRawQuic, visitor_);
+ char message = 0x40;
+ MoqtObject object;
+ absl::string_view payload = MoqtParser::ProcessDatagram(
+ absl::string_view(&message, sizeof(message)), object);
+ EXPECT_TRUE(payload.empty());
+}
+
} // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 2340ab3..994c530 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -20,6 +20,7 @@
#include "absl/types/span.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/platform/api/quic_bug_tracker.h"
@@ -113,6 +114,28 @@
}
}
+void MoqtSession::OnDatagramReceived(absl::string_view datagram) {
+ MoqtObject message;
+ absl::string_view payload = MoqtParser::ProcessDatagram(datagram, message);
+ if (payload.empty()) {
+ Error(MoqtError::kProtocolViolation, "Malformed datagram");
+ return;
+ }
+ QUICHE_DLOG(INFO) << ENDPOINT
+ << "Received OBJECT message in datagram for subscribe_id "
+ << message.subscribe_id << " for track alias "
+ << message.track_alias << " with sequence "
+ << message.group_id << ":" << message.object_id
+ << " send_order " << message.object_send_order << " length "
+ << payload.size();
+ auto [full_track_name, visitor] = TrackPropertiesFromAlias(message);
+ if (visitor != nullptr) {
+ visitor->OnObjectFragment(full_track_name, message.group_id,
+ message.object_id, message.object_send_order,
+ message.forwarding_preference, payload, true);
+ }
+}
+
void MoqtSession::Error(MoqtError code, absl::string_view error) {
if (!error_.empty()) {
// Avoid erroring out twice.
@@ -271,6 +294,29 @@
return new_stream->GetStreamId();
}
+std::pair<FullTrackName, RemoteTrack::Visitor*>
+MoqtSession::TrackPropertiesFromAlias(const MoqtObject& message) {
+ auto it = remote_tracks_.find(message.track_alias);
+ RemoteTrack::Visitor* visitor = nullptr;
+ if (it == remote_tracks_.end()) {
+ // SUBSCRIBE_OK has not arrived yet, but deliver it.
+ auto subscribe_it = active_subscribes_.find(message.subscribe_id);
+ if (subscribe_it == active_subscribes_.end()) {
+ return std::pair<FullTrackName, RemoteTrack::Visitor*>(
+ {{"", ""}, nullptr});
+ }
+ visitor = subscribe_it->second.visitor;
+ return std::pair<FullTrackName, RemoteTrack::Visitor*>(
+ {{subscribe_it->second.message.track_namespace,
+ subscribe_it->second.message.track_name},
+ subscribe_it->second.visitor});
+ }
+ return std::pair<FullTrackName, RemoteTrack::Visitor*>(
+ {{it->second.full_track_name().track_namespace,
+ it->second.full_track_name().track_name},
+ it->second.visitor()});
+}
+
bool MoqtSession::PublishObject(const FullTrackName& full_track_name,
uint64_t group_id, uint64_t object_id,
uint64_t object_send_order,
@@ -308,8 +354,14 @@
quiche::StreamWriteOptions write_options;
write_options.set_send_fin(end_of_stream);
for (auto subscription : subscriptions) {
- // TODO: kPreferDatagram should bypass stream stuff. For now, send it on the
- // stream.
+ if (forwarding_preference == MoqtForwardingPreference::kDatagram) {
+ quiche::QuicheBuffer datagram =
+ framer_.SerializeObjectDatagram(object, payload);
+ // TODO(martinduke): It's OK to just silently fail, but better to notify
+ // the app on errors.
+ session_->SendOrQueueDatagram(datagram.AsStringView());
+ continue;
+ }
bool new_stream = false;
std::optional<webtransport::StreamId> stream_id =
subscription->GetStreamForSequence({group_id, object_id},
@@ -412,29 +464,12 @@
payload = absl::string_view(partial_object_);
}
}
- auto it = session_->remote_tracks_.find(message.track_alias);
- RemoteTrack::Visitor* visitor = nullptr;
- absl::string_view track_namespace;
- absl::string_view track_name;
- if (it == session_->remote_tracks_.end()) {
- // SUBSCRIBE_OK has not arrived yet, but deliver it.
- auto subscribe_it = session_->active_subscribes_.find(message.subscribe_id);
- if (subscribe_it == session_->active_subscribes_.end()) {
- return;
- }
- visitor = subscribe_it->second.visitor;
- track_namespace = subscribe_it->second.message.track_namespace;
- track_name = subscribe_it->second.message.track_name;
- } else {
- visitor = it->second.visitor();
- track_namespace = it->second.full_track_name().track_namespace;
- track_name = it->second.full_track_name().track_name;
- }
+ auto [full_track_name, visitor] = session_->TrackPropertiesFromAlias(message);
if (visitor != nullptr) {
- visitor->OnObjectFragment(
- FullTrackName(track_namespace, track_name), message.group_id,
- message.object_id, message.object_send_order,
- message.forwarding_preference, payload, end_of_message);
+ visitor->OnObjectFragment(full_track_name, message.group_id,
+ message.object_id, message.object_send_order,
+ message.forwarding_preference, payload,
+ end_of_message);
}
partial_object_.clear();
}
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 73a92e0..adf5cb8 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -77,7 +77,7 @@
const std::string&) override;
void OnIncomingBidirectionalStreamAvailable() override;
void OnIncomingUnidirectionalStreamAvailable() override;
- void OnDatagramReceived(absl::string_view /*datagram*/) override {}
+ void OnDatagramReceived(absl::string_view datagram) override;
void OnCanCreateNewOutgoingBidirectionalStream() override {}
void OnCanCreateNewOutgoingUnidirectionalStream() override {}
@@ -222,6 +222,11 @@
// TODO: Add a callback if stream creation is delayed.
std::optional<webtransport::StreamId> OpenUnidirectionalStream();
+ // Get FullTrackName and visitor for a subscribe_id and track_alias. Returns
+ // nullptr if not present.
+ std::pair<FullTrackName, RemoteTrack::Visitor*> TrackPropertiesFromAlias(
+ const MoqtObject& message);
+
webtransport::Session* session_;
MoqtSessionParameters parameters_;
MoqtSessionCallbacks callbacks_;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 87c1909..eee03bf 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -895,6 +895,56 @@
EXPECT_FALSE(session_.HasSubscribers(ftn));
}
+TEST_F(MoqtSessionTest, SendDatagram) {
+ FullTrackName ftn("foo", "bar");
+ MockLocalTrackVisitor track_visitor;
+ session_.AddLocalTrack(ftn, &track_visitor);
+ MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+
+ // Publish in window.
+ bool correct_message = false;
+ uint8_t kExpectedMessage[] = {
+ 0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x64,
+ 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66,
+ };
+ EXPECT_CALL(mock_session_, SendOrQueueDatagram(_))
+ .WillOnce([&](absl::string_view datagram) {
+ if (datagram.size() == sizeof(kExpectedMessage)) {
+ correct_message = (0 == memcmp(datagram.data(), kExpectedMessage,
+ sizeof(kExpectedMessage)));
+ }
+ return webtransport::DatagramStatus(
+ webtransport::DatagramStatusCode::kSuccess, "");
+ });
+ session_.PublishObject(ftn, 5, 0, 0, MoqtForwardingPreference::kDatagram,
+ "deadbeef", true);
+ EXPECT_TRUE(correct_message);
+}
+
+TEST_F(MoqtSessionTest, ReceiveDatagram) {
+ MockRemoteTrackVisitor visitor_;
+ FullTrackName ftn("foo", "bar");
+ std::string payload = "deadbeef";
+ MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor_, 2);
+ MoqtObject object = {
+ /*subscribe_id=*/1,
+ /*track_alias=*/2,
+ /*group_sequence=*/0,
+ /*object_sequence=*/0,
+ /*object_send_order=*/0,
+ /*forwarding_preference=*/MoqtForwardingPreference::kDatagram,
+ /*payload_length=*/8,
+ };
+ char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x64,
+ 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
+ EXPECT_CALL(visitor_,
+ OnObjectFragment(ftn, object.group_id, object.object_id,
+ object.object_send_order,
+ object.forwarding_preference, payload, true))
+ .Times(1);
+ session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
+}
+
// TODO: Cover more error cases in the above
} // namespace test
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 6a0891d..44512c4 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -184,10 +184,9 @@
};
};
-class QUICHE_NO_EXPORT ObjectPreferDatagramMessage : public ObjectMessage {
+class QUICHE_NO_EXPORT ObjectDatagramMessage : public ObjectMessage {
public:
- ObjectPreferDatagramMessage()
- : ObjectMessage(MoqtMessageType::kObjectPreferDatagram) {
+ ObjectDatagramMessage() : ObjectMessage(MoqtMessageType::kObjectDatagram) {
SetWireImage(raw_packet_, sizeof(raw_packet_));
object_.forwarding_preference = MoqtForwardingPreference::kDatagram;
}
@@ -891,8 +890,8 @@
switch (message_type) {
case MoqtMessageType::kObjectStream:
return std::make_unique<ObjectStreamMessage>();
- case MoqtMessageType::kObjectPreferDatagram:
- return std::make_unique<ObjectPreferDatagramMessage>();
+ case MoqtMessageType::kObjectDatagram:
+ return std::make_unique<ObjectDatagramMessage>();
case MoqtMessageType::kSubscribe:
return std::make_unique<SubscribeMessage>();
case MoqtMessageType::kSubscribeOk: