Implement MoQT Peeps and Object message changes for draft-06. This is the minimum for interoperability; this code always sends subgroup_id = 0 and ignores the incoming subgroup_id.
Also fixed bugs uncovered by fixing a typo in MoqtIntegrationTest.
PiperOrigin-RevId: 680684190
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 6974ca6..e7042e7 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -280,43 +280,37 @@
quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader(
const MoqtObject& message, bool is_first_in_stream) {
- if (!message.payload_length.has_value() &&
- !(message.forwarding_preference == MoqtForwardingPreference::kObject ||
- message.forwarding_preference == MoqtForwardingPreference::kDatagram)) {
- QUIC_BUG(quic_bug_serialize_object_input_01)
- << "Track or Group forwarding preference requires knowing the object "
- "length in advance";
+ if (!ValidateObjectMetadata(message)) {
+ QUIC_BUG(quic_bug_serialize_object_header_01)
+ << "Object metadata is invalid";
return quiche::QuicheBuffer();
}
- if (message.object_status != MoqtObjectStatus::kNormal &&
- message.payload_length.has_value() && *message.payload_length > 0) {
- QUIC_BUG(quic_bug_serialize_object_input_03)
- << "Object status must be kNormal if payload is non-empty";
+ if (message.forwarding_preference == MoqtForwardingPreference::kDatagram) {
+ QUIC_BUG(quic_bug_serialize_object_header_02)
+ << "Datagrams use SerializeObjectDatagram()";
return quiche::QuicheBuffer();
}
if (!is_first_in_stream) {
switch (message.forwarding_preference) {
case MoqtForwardingPreference::kTrack:
- return (*message.payload_length == 0)
+ return (message.payload_length == 0)
? Serialize(WireVarInt62(message.group_id),
WireVarInt62(message.object_id),
- WireVarInt62(*message.payload_length),
+ WireVarInt62(message.payload_length),
WireVarInt62(message.object_status))
: Serialize(WireVarInt62(message.group_id),
WireVarInt62(message.object_id),
- WireVarInt62(*message.payload_length));
- case MoqtForwardingPreference::kGroup:
- return (*message.payload_length == 0)
+ WireVarInt62(message.payload_length));
+ case MoqtForwardingPreference::kSubgroup:
+ return (message.payload_length == 0)
? Serialize(WireVarInt62(message.object_id),
- WireVarInt62(*message.payload_length),
+ WireVarInt62(message.payload_length),
WireVarInt62(static_cast<uint64_t>(
message.object_status)))
: Serialize(WireVarInt62(message.object_id),
- WireVarInt62(*message.payload_length));
+ WireVarInt62(message.payload_length));
default:
- QUIC_BUG(quic_bug_serialize_object_input_02)
- << "Object or Datagram forwarding_preference must be first in "
- "stream";
+ QUICHE_NOTREACHED();
return quiche::QuicheBuffer();
}
}
@@ -324,14 +318,14 @@
GetMessageTypeForForwardingPreference(message.forwarding_preference);
switch (message.forwarding_preference) {
case MoqtForwardingPreference::kTrack:
- return (*message.payload_length == 0)
+ return (message.payload_length == 0)
? Serialize(WireVarInt62(message_type),
WireVarInt62(message.subscribe_id),
WireVarInt62(message.track_alias),
WireUint8(message.publisher_priority),
WireVarInt62(message.group_id),
WireVarInt62(message.object_id),
- WireVarInt62(*message.payload_length),
+ WireVarInt62(message.payload_length),
WireVarInt62(message.object_status))
: Serialize(WireVarInt62(message_type),
WireVarInt62(message.subscribe_id),
@@ -339,49 +333,64 @@
WireUint8(message.publisher_priority),
WireVarInt62(message.group_id),
WireVarInt62(message.object_id),
- WireVarInt62(*message.payload_length));
- case MoqtForwardingPreference::kGroup:
- return (*message.payload_length == 0)
+ WireVarInt62(message.payload_length));
+ case MoqtForwardingPreference::kSubgroup:
+ return (message.payload_length == 0)
? Serialize(WireVarInt62(message_type),
WireVarInt62(message.subscribe_id),
WireVarInt62(message.track_alias),
WireVarInt62(message.group_id),
+ WireVarInt62(*message.subgroup_id),
WireUint8(message.publisher_priority),
WireVarInt62(message.object_id),
- WireVarInt62(*message.payload_length),
+ WireVarInt62(message.payload_length),
WireVarInt62(message.object_status))
: Serialize(WireVarInt62(message_type),
WireVarInt62(message.subscribe_id),
WireVarInt62(message.track_alias),
WireVarInt62(message.group_id),
+ WireVarInt62(*message.subgroup_id),
WireUint8(message.publisher_priority),
WireVarInt62(message.object_id),
- WireVarInt62(*message.payload_length));
- case MoqtForwardingPreference::kObject:
+ WireVarInt62(message.payload_length));
case MoqtForwardingPreference::kDatagram:
- return Serialize(
- WireVarInt62(message_type), WireVarInt62(message.subscribe_id),
- WireVarInt62(message.track_alias), WireVarInt62(message.group_id),
- WireVarInt62(message.object_id),
- WireUint8(message.publisher_priority),
- WireVarInt62(message.object_status));
+ QUICHE_NOTREACHED();
+ return quiche::QuicheBuffer();
}
}
quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram(
const MoqtObject& message, absl::string_view payload) {
- if (message.object_status != MoqtObjectStatus::kNormal && !payload.empty()) {
+ if (!ValidateObjectMetadata(message)) {
QUIC_BUG(quic_bug_serialize_object_datagram_01)
- << "Object status must be kNormal if payload is non-empty";
+ << "Object metadata is invalid";
return quiche::QuicheBuffer();
}
+ if (message.forwarding_preference != MoqtForwardingPreference::kDatagram) {
+ QUIC_BUG(quic_bug_serialize_object_datagram_02)
+ << "Only datagrams use SerializeObjectDatagram()";
+ return quiche::QuicheBuffer();
+ }
+ if (message.payload_length != payload.length()) {
+ QUIC_BUG(quic_bug_serialize_object_datagram_03)
+ << "Payload length does not match payload";
+ return quiche::QuicheBuffer();
+ }
+ if (message.payload_length == 0) {
+ return Serialize(
+ WireVarInt62(MoqtDataStreamType::kObjectDatagram),
+ WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias),
+ WireVarInt62(message.group_id), WireVarInt62(message.object_id),
+ WireUint8(message.publisher_priority),
+ WireVarInt62(message.payload_length),
+ WireVarInt62(message.object_status));
+ }
return Serialize(
WireVarInt62(MoqtDataStreamType::kObjectDatagram),
WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias),
WireVarInt62(message.group_id), WireVarInt62(message.object_id),
WireUint8(message.publisher_priority),
- WireVarInt62(static_cast<uint64_t>(message.object_status)),
- WireBytes(payload));
+ WireVarInt62(message.payload_length), WireBytes(payload));
}
quiche::QuicheBuffer MoqtFramer::SerializeClientSetup(
@@ -629,4 +638,17 @@
message.delta_from_deadline.ToMicroseconds())));
}
+// static
+bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object) {
+ if (object.object_status != MoqtObjectStatus::kNormal &&
+ object.payload_length > 0) {
+ return false;
+ }
+ if ((object.forwarding_preference == MoqtForwardingPreference::kSubgroup) !=
+ object.subgroup_id.has_value()) {
+ return false;
+ }
+ return true;
+}
+
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index 40b4dbf..b18a77c 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -60,6 +60,9 @@
quiche::QuicheBuffer SerializeObjectAck(const MoqtObjectAck& message);
private:
+ // Returns true if the metadata is internally consistent.
+ static bool ValidateObjectMetadata(const MoqtObject& object);
+
quiche::QuicheBufferAllocator* allocator_;
bool using_webtrans_;
};
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index 510dd17..be6ae32 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -4,7 +4,6 @@
#include "quiche/quic/moqt/moqt_framer.h"
-#include <cerrno>
#include <cstddef>
#include <cstdint>
#include <memory>
@@ -19,7 +18,6 @@
#include "quiche/quic/platform/api/quic_expect_bug.h"
#include "quiche/quic/platform/api/quic_test.h"
#include "quiche/common/quiche_buffer_allocator.h"
-#include "quiche/common/quiche_text_utils.h"
#include "quiche/common/simple_buffer_allocator.h"
#include "quiche/common/test_tools/quiche_test_utils.h"
@@ -81,7 +79,9 @@
MoqtObject adjusted_message = message;
adjusted_message.payload_length = payload.size();
quiche::QuicheBuffer header =
- framer.SerializeObjectHeader(adjusted_message, is_first_in_stream);
+ (message.forwarding_preference == MoqtForwardingPreference::kDatagram)
+ ? framer.SerializeObjectDatagram(adjusted_message, payload)
+ : framer.SerializeObjectHeader(adjusted_message, is_first_in_stream);
if (header.empty()) {
return quiche::QuicheBuffer();
}
@@ -217,13 +217,13 @@
};
TEST_F(MoqtFramerSimpleTest, GroupMiddler) {
- auto header = std::make_unique<StreamHeaderGroupMessage>();
+ auto header = std::make_unique<StreamHeaderSubgroupMessage>();
auto buffer1 = SerializeObject(
framer_, std::get<MoqtObject>(header->structured_data()), "foo", true);
EXPECT_EQ(buffer1.size(), header->total_message_size());
EXPECT_EQ(buffer1.AsStringView(), header->PacketSample());
- auto middler = std::make_unique<StreamMiddlerGroupMessage>();
+ auto middler = std::make_unique<StreamMiddlerSubgroupMessage>();
auto buffer2 = SerializeObject(
framer_, std::get<MoqtObject>(middler->structured_data()), "bar", false);
EXPECT_EQ(buffer2.size(), middler->total_message_size());
@@ -246,28 +246,80 @@
TEST_F(MoqtFramerSimpleTest, BadObjectInput) {
MoqtObject object = {
+ // This is a valid object.
/*subscribe_id=*/3,
/*track_alias=*/4,
/*group_id=*/5,
/*object_id=*/6,
/*publisher_priority=*/7,
/*object_status=*/MoqtObjectStatus::kNormal,
- /*forwarding_preference=*/MoqtForwardingPreference::kObject,
- /*payload_length=*/std::nullopt,
+ /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+ /*subgroup_id=*/8,
+ /*payload_length=*/3,
};
quiche::QuicheBuffer buffer;
- object.forwarding_preference = MoqtForwardingPreference::kDatagram;
- EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
- "must be first");
+
+ // SerializeObjectDatagram() only accepts kDatagram.
+ EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foo"),
+ "Only datagrams use SerializeObjectDatagram()");
EXPECT_TRUE(buffer.empty());
- object.forwarding_preference = MoqtForwardingPreference::kGroup;
+
+ // kSubgroup must have a subgroup_id.
+ object.subgroup_id = std::nullopt;
EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
- "requires knowing the object length");
+ "Object metadata is invalid");
EXPECT_TRUE(buffer.empty());
- object.payload_length = 5;
+ object.subgroup_id = 8;
+
+ // kTrack must not have a subgroup_id.
+ object.forwarding_preference = MoqtForwardingPreference::kTrack;
+ EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
+ "Object metadata is invalid");
+ EXPECT_TRUE(buffer.empty());
+ object.forwarding_preference = MoqtForwardingPreference::kSubgroup;
+
+ // Non-normal status must have no payload.
object.object_status = MoqtObjectStatus::kEndOfGroup;
EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
- "Object status must be kNormal if payload is non-empty");
+ "Object metadata is invalid");
+ EXPECT_TRUE(buffer.empty());
+ // object.object_status = MoqtObjectStatus::kNormal;
+}
+
+TEST_F(MoqtFramerSimpleTest, BadDatagramInput) {
+ MoqtObject object = {
+ // This is a valid datagram.
+ /*subscribe_id=*/3,
+ /*track_alias=*/4,
+ /*group_id=*/5,
+ /*object_id=*/6,
+ /*publisher_priority=*/7,
+ /*object_status=*/MoqtObjectStatus::kNormal,
+ /*forwarding_preference=*/MoqtForwardingPreference::kDatagram,
+ /*subgroup_id=*/std::nullopt,
+ /*payload_length=*/3,
+ };
+ quiche::QuicheBuffer buffer;
+
+ // No datagrams to SerializeObjectHeader().
+ EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
+ "Datagrams use SerializeObjectDatagram()")
+ EXPECT_TRUE(buffer.empty());
+
+ object.object_status = MoqtObjectStatus::kEndOfGroup;
+ EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foo"),
+ "Object metadata is invalid");
+ EXPECT_TRUE(buffer.empty());
+ object.object_status = MoqtObjectStatus::kNormal;
+
+ object.subgroup_id = 8;
+ EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foo"),
+ "Object metadata is invalid");
+ EXPECT_TRUE(buffer.empty());
+ object.subgroup_id = std::nullopt;
+
+ EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foobar"),
+ "Payload length does not match payload");
EXPECT_TRUE(buffer.empty());
}
@@ -280,8 +332,9 @@
/*object_id=*/6,
/*publisher_priority=*/7,
/*object_status=*/MoqtObjectStatus::kNormal,
- /*forwarding_preference=*/MoqtForwardingPreference::kObject,
- /*payload_length=*/std::nullopt,
+ /*forwarding_preference=*/MoqtForwardingPreference::kDatagram,
+ /*subgroup_id=*/std::nullopt,
+ /*payload_length=*/3,
};
std::string payload = "foo";
quiche::QuicheBuffer buffer;
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 11f6be4..d398331 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -190,7 +190,7 @@
});
auto queue = std::make_shared<MoqtOutgoingQueue>(
- FullTrackName{"test", "data"}, MoqtForwardingPreference::kGroup);
+ FullTrackName{"test", "data"}, MoqtForwardingPreference::kSubgroup);
MoqtKnownTrackPublisher known_track_publisher;
known_track_publisher.Add(queue);
client_->session()->set_publisher(&known_track_publisher);
@@ -215,7 +215,7 @@
EXPECT_EQ(group_sequence, 0u);
EXPECT_EQ(object_sequence, 0u);
EXPECT_EQ(status, MoqtObjectStatus::kNormal);
- EXPECT_EQ(forwarding_preference, MoqtForwardingPreference::kGroup);
+ EXPECT_EQ(forwarding_preference, MoqtForwardingPreference::kSubgroup);
EXPECT_EQ(object, "object data");
EXPECT_TRUE(end_of_message);
received_object = true;
@@ -232,15 +232,14 @@
server_->session()->set_publisher(&publisher);
for (MoqtForwardingPreference forwarding_preference :
- {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kGroup,
- MoqtForwardingPreference::kObject,
+ {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kSubgroup,
MoqtForwardingPreference::kDatagram}) {
SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference));
MockRemoteTrackVisitor client_visitor;
std::string name =
absl::StrCat("pref_", static_cast<int>(forwarding_preference));
auto queue = std::make_shared<MoqtOutgoingQueue>(
- FullTrackName{"test", name}, MoqtForwardingPreference::kObject);
+ FullTrackName{"test", name}, MoqtForwardingPreference::kSubgroup);
publisher.Add(queue);
queue->AddObject(MemSliceFromString("object 1"), /*key=*/true);
queue->AddObject(MemSliceFromString("object 2"), /*key=*/false);
@@ -294,15 +293,14 @@
server_->session()->set_publisher(&publisher);
for (MoqtForwardingPreference forwarding_preference :
- {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kGroup,
- MoqtForwardingPreference::kObject,
+ {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kSubgroup,
MoqtForwardingPreference::kDatagram}) {
SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference));
MockRemoteTrackVisitor client_visitor;
std::string name =
absl::StrCat("pref_", static_cast<int>(forwarding_preference));
auto queue = std::make_shared<MoqtOutgoingQueue>(
- FullTrackName{"test", name}, MoqtForwardingPreference::kObject);
+ FullTrackName{"test", name}, forwarding_preference);
publisher.Add(queue);
for (int i = 0; i < 100; ++i) {
queue->AddObject(MemSliceFromString("object"), /*key=*/true);
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc
index 5fdcf65..6cee208 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue.cc
@@ -23,6 +23,9 @@
namespace moqt {
+// TODO(martinduke): Accept subgroup ID.
+// TODO(martinduke): Accept publisher priority.
+// TODO(martinduke): Unless Track Forwarding preference goes away, support it.
bool MoqtLiveRelayQueue::AddObject(uint64_t group_id, uint64_t object_id,
MoqtObjectStatus status,
absl::string_view object) {
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h
index 8c3fbe3..ae2a58e 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.h
+++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -38,7 +38,8 @@
explicit MoqtLiveRelayQueue(FullTrackName track,
MoqtForwardingPreference forwarding_preference)
: track_(std::move(track)),
- forwarding_preference_(forwarding_preference) {}
+ forwarding_preference_(forwarding_preference),
+ next_sequence_(0, 0) {}
MoqtLiveRelayQueue(const MoqtLiveRelayQueue&) = delete;
MoqtLiveRelayQueue(MoqtLiveRelayQueue&&) = default;
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
index 6d4e765..3734e48 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -24,7 +24,7 @@
public:
TestMoqtLiveRelayQueue()
: MoqtLiveRelayQueue(FullTrackName{"test", "track"},
- MoqtForwardingPreference::kGroup) {
+ MoqtForwardingPreference::kSubgroup) {
AddObjectListener(this);
}
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 5c2e503..95d8413 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -4,6 +4,7 @@
#include "quiche/quic/moqt/moqt_messages.h"
+#include <cstdint>
#include <string>
#include <vector>
@@ -19,7 +20,8 @@
namespace moqt {
MoqtObjectStatus IntegerToObjectStatus(uint64_t integer) {
- if (integer >= 0x5) {
+ if (integer >=
+ static_cast<uint64_t>(MoqtObjectStatus::kInvalidObjectStatus)) {
return MoqtObjectStatus::kInvalidObjectStatus;
}
return static_cast<MoqtObjectStatus>(integer);
@@ -113,14 +115,12 @@
std::string MoqtDataStreamTypeToString(MoqtDataStreamType type) {
switch (type) {
- case MoqtDataStreamType::kObjectStream:
- return "OBJECT_STREAM";
case MoqtDataStreamType::kObjectDatagram:
return "OBJECT_PREFER_DATAGRAM";
case MoqtDataStreamType::kStreamHeaderTrack:
return "STREAM_HEADER_TRACK";
- case MoqtDataStreamType::kStreamHeaderGroup:
- return "STREAM_HEADER_GROUP";
+ case MoqtDataStreamType::kStreamHeaderSubgroup:
+ return "STREAM_HEADER_SUBGROUP";
case MoqtDataStreamType::kPadding:
return "PADDING";
}
@@ -130,14 +130,12 @@
std::string MoqtForwardingPreferenceToString(
MoqtForwardingPreference preference) {
switch (preference) {
- case MoqtForwardingPreference::kObject:
- return "OBJECT";
case MoqtForwardingPreference::kDatagram:
return "DATAGRAM";
case MoqtForwardingPreference::kTrack:
return "TRACK";
- case MoqtForwardingPreference::kGroup:
- return "GROUP";
+ case MoqtForwardingPreference::kSubgroup:
+ return "SUBGROUP";
}
QUIC_BUG(quic_bug_bad_moqt_message_type_01)
<< "Unknown preference " << std::to_string(static_cast<int>(preference));
@@ -146,37 +144,33 @@
MoqtForwardingPreference GetForwardingPreference(MoqtDataStreamType type) {
switch (type) {
- case MoqtDataStreamType::kObjectStream:
- return MoqtForwardingPreference::kObject;
case MoqtDataStreamType::kObjectDatagram:
return MoqtForwardingPreference::kDatagram;
case MoqtDataStreamType::kStreamHeaderTrack:
return MoqtForwardingPreference::kTrack;
- case MoqtDataStreamType::kStreamHeaderGroup:
- return MoqtForwardingPreference::kGroup;
+ case MoqtDataStreamType::kStreamHeaderSubgroup:
+ return MoqtForwardingPreference::kSubgroup;
default:
break;
}
QUIC_BUG(quic_bug_bad_moqt_message_type_02)
<< "Message type does not indicate forwarding preference";
- return MoqtForwardingPreference::kObject;
+ return MoqtForwardingPreference::kSubgroup;
};
MoqtDataStreamType GetMessageTypeForForwardingPreference(
MoqtForwardingPreference preference) {
switch (preference) {
- case MoqtForwardingPreference::kObject:
- return MoqtDataStreamType::kObjectStream;
case MoqtForwardingPreference::kDatagram:
return MoqtDataStreamType::kObjectDatagram;
case MoqtForwardingPreference::kTrack:
return MoqtDataStreamType::kStreamHeaderTrack;
- case MoqtForwardingPreference::kGroup:
- return MoqtDataStreamType::kStreamHeaderGroup;
+ case MoqtForwardingPreference::kSubgroup:
+ return MoqtDataStreamType::kStreamHeaderSubgroup;
}
QUIC_BUG(quic_bug_bad_moqt_message_type_03)
<< "Forwarding preference does not indicate message type";
- return MoqtDataStreamType::kObjectStream;
+ return MoqtDataStreamType::kStreamHeaderSubgroup;
}
std::string FullTrackName::ToString() const {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 4a08832..57c26ab 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -65,10 +65,9 @@
inline constexpr size_t kMaxMessageHeaderSize = 2048;
enum class QUICHE_EXPORT MoqtDataStreamType : uint64_t {
- kObjectStream = 0x00,
kObjectDatagram = 0x01,
- kStreamHeaderTrack = 0x50,
- kStreamHeaderGroup = 0x51,
+ kStreamHeaderTrack = 0x02,
+ kStreamHeaderSubgroup = 0x04,
// Currently QUICHE-specific. All data on a kPadding stream is ignored.
kPadding = 0x26d3,
@@ -218,11 +217,20 @@
// These are absolute sequence numbers.
struct FullSequence {
- uint64_t group = 0;
- uint64_t object = 0;
+ uint64_t group;
+ uint64_t subgroup;
+ uint64_t object;
+ FullSequence() : FullSequence(0, 0) {}
+ // There is a lot of code from before subgroups. Assume there's one subgroup
+ // with ID 0 per group.
+ FullSequence(uint64_t group, uint64_t object)
+ : FullSequence(group, 0, object) {}
+ FullSequence(uint64_t group, uint64_t subgroup, uint64_t object)
+ : group(group), subgroup(subgroup), object(object) {}
bool operator==(const FullSequence& other) const {
return group == other.group && object == other.object;
}
+ // These are temporal ordering comparisons, so subgroup ID doesn't matter.
bool operator<(const FullSequence& other) const {
return group < other.group ||
(group == other.group && object < other.object);
@@ -237,7 +245,9 @@
object = other.object;
return *this;
}
- FullSequence next() const { return FullSequence{group, object + 1}; }
+ FullSequence next() const {
+ return FullSequence{group, subgroup, object + 1};
+ }
template <typename H>
friend H AbslHashValue(H h, const FullSequence& m);
@@ -247,6 +257,11 @@
}
};
+struct SubgroupPriority {
+ uint8_t publisher_priority = 0xf0;
+ uint64_t subgroup_id = 0;
+};
+
template <typename H>
H AbslHashValue(H h, const FullSequence& m) {
return H::combine(std::move(h), m.group, m.object);
@@ -268,11 +283,10 @@
};
// These codes do not appear on the wire.
-enum class QUICHE_EXPORT MoqtForwardingPreference : uint8_t {
- kTrack = 0x0,
- kGroup = 0x1,
- kObject = 0x2,
- kDatagram = 0x3,
+enum class QUICHE_EXPORT MoqtForwardingPreference {
+ kTrack,
+ kSubgroup,
+ kDatagram,
};
enum class QUICHE_EXPORT MoqtObjectStatus : uint64_t {
@@ -281,14 +295,14 @@
kGroupDoesNotExist = 0x2,
kEndOfGroup = 0x3,
kEndOfTrack = 0x4,
- kInvalidObjectStatus = 0x5,
+ kEndOfSubgroup = 0x5,
+ kInvalidObjectStatus = 0x6,
};
MoqtObjectStatus IntegerToObjectStatus(uint64_t integer);
// The data contained in every Object message, although the message type
-// implies some of the values. |payload_length| has no value if the length
-// is unknown (because it runs to the end of the stream.)
+// implies some of the values.
struct QUICHE_EXPORT MoqtObject {
uint64_t subscribe_id;
uint64_t track_alias;
@@ -297,7 +311,8 @@
MoqtPriority publisher_priority;
MoqtObjectStatus object_status;
MoqtForwardingPreference forwarding_preference;
- std::optional<uint64_t> payload_length;
+ std::optional<uint64_t> subgroup_id;
+ uint64_t payload_length;
};
enum class QUICHE_EXPORT MoqtFilterType : uint64_t {
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index d7a9785..f7cf5e6 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -28,7 +28,7 @@
public:
TestMoqtOutgoingQueue()
: MoqtOutgoingQueue(FullTrackName{"test", "track"},
- MoqtForwardingPreference::kGroup) {
+ MoqtForwardingPreference::kSubgroup) {
AddObjectListener(this);
}
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 541c550..51658d4 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -52,7 +52,7 @@
bool IsAllowedStreamType(uint64_t value) {
constexpr std::array kAllowedStreamTypes = {
- MoqtDataStreamType::kObjectStream, MoqtDataStreamType::kStreamHeaderGroup,
+ MoqtDataStreamType::kStreamHeaderSubgroup,
MoqtDataStreamType::kStreamHeaderTrack, MoqtDataStreamType::kPadding};
for (MoqtDataStreamType type : kAllowedStreamTypes) {
if (static_cast<uint64_t>(type) == value) {
@@ -72,18 +72,24 @@
!reader.ReadVarInt62(&object.group_id)) {
return 0;
}
- if (type != MoqtDataStreamType::kStreamHeaderTrack &&
- type != MoqtDataStreamType::kStreamHeaderGroup &&
+ if (type == MoqtDataStreamType::kStreamHeaderSubgroup) {
+ uint64_t subgroup_id;
+ if (!reader.ReadVarInt62(&subgroup_id)) {
+ return 0;
+ }
+ object.subgroup_id = subgroup_id;
+ }
+ if (type == MoqtDataStreamType::kObjectDatagram &&
!reader.ReadVarInt62(&object.object_id)) {
return 0;
}
if (!reader.ReadUInt8(&object.publisher_priority)) {
return 0;
}
- uint64_t status = 0;
- if ((type == MoqtDataStreamType::kObjectStream ||
- type == MoqtDataStreamType::kObjectDatagram) &&
- !reader.ReadVarInt62(&status)) {
+ uint64_t status = static_cast<uint64_t>(MoqtObjectStatus::kNormal);
+ if (type == MoqtDataStreamType::kObjectDatagram &&
+ (!reader.ReadVarInt62(&object.payload_length) ||
+ (object.payload_length == 0 && !reader.ReadVarInt62(&status)))) {
return 0;
}
object.object_status = IntegerToObjectStatus(status);
@@ -100,15 +106,13 @@
}
[[fallthrough]];
- case MoqtDataStreamType::kStreamHeaderGroup: {
- uint64_t length;
+ case MoqtDataStreamType::kStreamHeaderSubgroup: {
if (!reader.ReadVarInt62(&object.object_id) ||
- !reader.ReadVarInt62(&length)) {
+ !reader.ReadVarInt62(&object.payload_length)) {
return 0;
}
- object.payload_length = length;
- uint64_t status = 0;
- if (length == 0 && !reader.ReadVarInt62(&status)) {
+ uint64_t status = static_cast<uint64_t>(MoqtObjectStatus::kNormal);
+ if (object.payload_length == 0 && !reader.ReadVarInt62(&status)) {
return 0;
}
object.object_status = IntegerToObjectStatus(status);
@@ -902,11 +906,6 @@
return;
}
- // Annoying path (going away soon): handle kObjectStream receiving a FIN.
- if (data.empty() && fin && type_ == MoqtDataStreamType::kObjectStream) {
- visitor_.OnObjectMessage(*metadata_, "", true);
- }
-
// Sad path: there is already data buffered. Attempt to transfer a small
// chunk from `data` into the buffer, in hope that it will make the contents
// of the buffer parsable without any leftover data. This is a reasonable
@@ -915,8 +914,7 @@
while (!buffered_message_.empty() && !data.empty()) {
absl::string_view chunk = data.substr(0, chunk_size_);
absl::StrAppend(&buffered_message_, chunk);
- absl::string_view unprocessed =
- ProcessDataInner(buffered_message_, fin && data.size() == chunk.size());
+ absl::string_view unprocessed = ProcessDataInner(buffered_message_);
if (unprocessed.size() >= chunk.size()) {
// chunk didn't allow any processing at all.
data.remove_prefix(chunk.size());
@@ -928,7 +926,7 @@
// Happy path: there is no buffered data.
if (buffered_message_.empty() && !data.empty()) {
- buffered_message_.assign(ProcessDataInner(data, fin));
+ buffered_message_.assign(ProcessDataInner(data));
}
if (fin) {
@@ -941,8 +939,7 @@
}
}
-absl::string_view MoqtDataParser::ProcessDataInner(absl::string_view data,
- bool fin) {
+absl::string_view MoqtDataParser::ProcessDataInner(absl::string_view data) {
quic::QuicDataReader reader(data);
while (!reader.IsDoneReading()) {
absl::string_view remainder = reader.PeekRemainingPayload();
@@ -966,11 +963,6 @@
if (bytes_read == 0) {
return remainder;
}
- if (type_ == MoqtDataStreamType::kObjectStream &&
- header.object_status == MoqtObjectStatus::kInvalidObjectStatus) {
- ParseError("Invalid object status");
- return "";
- }
metadata_ = header;
continue;
}
@@ -985,22 +977,14 @@
ParseError("Invalid object status provided");
return "";
}
- payload_length_remaining_ = *metadata_->payload_length;
+ payload_length_remaining_ = metadata_->payload_length;
+ if (payload_length_remaining_ == 0) {
+ visitor_.OnObjectMessage(*metadata_, "", true);
+ }
continue;
}
case kData: {
- if (payload_length_remaining_ == 0) {
- // Special case: kObject, which does not have explicit length.
- if (metadata_->object_status != MoqtObjectStatus::kNormal) {
- ParseError("Object with non-normal status has payload");
- return "";
- }
- visitor_.OnObjectMessage(*metadata_, reader.PeekRemainingPayload(),
- fin);
- return "";
- }
-
absl::string_view payload =
reader.ReadAtMost(payload_length_remaining_);
visitor_.OnObjectMessage(*metadata_, payload,
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 918573a..876b27a 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -206,8 +206,7 @@
if (!metadata_.has_value()) {
return kHeader;
}
- if (payload_length_remaining_ > 0 ||
- *type_ == MoqtDataStreamType::kObjectStream) {
+ if (payload_length_remaining_ > 0) {
return kData;
}
return kSubheader;
@@ -215,8 +214,7 @@
// Processes all that can be entirely processed, and returns the view for the
// data that needs to be buffered.
- // TODO: remove the `fin` argument once kObjectStream is gone.
- absl::string_view ProcessDataInner(absl::string_view data, bool fin);
+ absl::string_view ProcessDataInner(absl::string_view data);
void ParseError(absl::string_view reason);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 633f83f..6c34083 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -41,9 +41,9 @@
MoqtMessageType::kServerSetup, MoqtMessageType::kGoAway,
MoqtMessageType::kMaxSubscribeId, MoqtMessageType::kObjectAck,
};
-constexpr std::array kDataStreamTypes{MoqtDataStreamType::kObjectStream,
- MoqtDataStreamType::kStreamHeaderTrack,
- MoqtDataStreamType::kStreamHeaderGroup};
+constexpr std::array kDataStreamTypes{
+ MoqtDataStreamType::kStreamHeaderTrack,
+ MoqtDataStreamType::kStreamHeaderSubgroup};
using GeneralizedMessageType =
absl::variant<MoqtMessageType, MoqtDataStreamType>;
@@ -325,10 +325,6 @@
}
TEST_P(MoqtParserTest, EarlyFin) {
- if (message_type_ ==
- GeneralizedMessageType(MoqtDataStreamType::kObjectStream)) {
- return;
- }
std::unique_ptr<TestMessageBase> message = MakeMessage();
size_t first_data_size = message->total_message_size() - 1;
ProcessData(message->PacketSample().substr(0, first_data_size), true);
@@ -339,10 +335,6 @@
}
TEST_P(MoqtParserTest, SeparateEarlyFin) {
- if (message_type_ ==
- GeneralizedMessageType(MoqtDataStreamType::kObjectStream)) {
- return;
- }
std::unique_ptr<TestMessageBase> message = MakeMessage();
size_t first_data_size = message->total_message_size() - 1;
ProcessData(message->PacketSample().substr(0, first_data_size), false);
@@ -366,29 +358,12 @@
static constexpr bool kRawQuic = false;
};
-TEST_F(MoqtMessageSpecificTest, ObjectStreamSeparateFin) {
- // OBJECT can return on an unknown-length message even without receiving a
- // FIN.
- MoqtDataParser parser(&visitor_);
- auto message = std::make_unique<ObjectStreamMessage>();
- parser.ProcessData(message->PacketSample(), false);
- EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
- EXPECT_EQ(visitor_.object_payload(), "foo");
- EXPECT_FALSE(visitor_.end_of_message_);
-
- parser.ProcessData(absl::string_view(), true); // send the FIN
- EXPECT_EQ(visitor_.messages_received_, 1);
- EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
- EXPECT_TRUE(visitor_.end_of_message_);
- EXPECT_FALSE(visitor_.parsing_error_.has_value());
-}
-
// Send the header + some payload, pure payload, then pure payload to end the
// message.
TEST_F(MoqtMessageSpecificTest, ThreePartObject) {
MoqtDataParser parser(&visitor_);
- auto message = std::make_unique<ObjectStreamMessage>();
+ auto message = std::make_unique<StreamHeaderSubgroupMessage>();
+ EXPECT_TRUE(message->SetPayloadLength(14));
parser.ProcessData(message->PacketSample(), false);
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
@@ -414,23 +389,24 @@
// Send the part of header, rest of header + payload, plus payload.
TEST_F(MoqtMessageSpecificTest, ThreePartObjectFirstIncomplete) {
MoqtDataParser parser(&visitor_);
- auto message = std::make_unique<ObjectStreamMessage>();
+ auto message = std::make_unique<StreamHeaderSubgroupMessage>();
+ EXPECT_TRUE(message->SetPayloadLength(50));
// first part
parser.ProcessData(message->PacketSample().substr(0, 4), false);
EXPECT_EQ(visitor_.messages_received_, 0);
// second part. Add padding to it.
- message->set_wire_image_size(100);
+ message->set_wire_image_size(55);
parser.ProcessData(
message->PacketSample().substr(4, message->total_message_size() - 4),
false);
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
EXPECT_FALSE(visitor_.end_of_message_);
- // The value "93" is the overall wire image size of 100 minus the non-payload
+ // The value "47" is the overall wire image size of 55 minus the non-payload
// part of the message.
- EXPECT_EQ(visitor_.object_payload().length(), 93);
+ EXPECT_EQ(visitor_.object_payload().length(), 47);
// third part includes FIN
parser.ProcessData("bar", true);
@@ -441,10 +417,10 @@
EXPECT_FALSE(visitor_.parsing_error_.has_value());
}
-TEST_F(MoqtMessageSpecificTest, StreamHeaderGroupFollowOn) {
+TEST_F(MoqtMessageSpecificTest, StreamHeaderSubgroupFollowOn) {
MoqtDataParser parser(&visitor_);
// first part
- auto message1 = std::make_unique<StreamHeaderGroupMessage>();
+ auto message1 = std::make_unique<StreamHeaderSubgroupMessage>();
parser.ProcessData(message1->PacketSample(), false);
EXPECT_EQ(visitor_.messages_received_, 1);
EXPECT_TRUE(message1->EqualFieldValues(*visitor_.last_message_));
@@ -453,7 +429,7 @@
EXPECT_FALSE(visitor_.parsing_error_.has_value());
// second part
visitor_.object_payloads_.clear();
- auto message2 = std::make_unique<StreamMiddlerGroupMessage>();
+ auto message2 = std::make_unique<StreamMiddlerSubgroupMessage>();
parser.ProcessData(message2->PacketSample(), false);
EXPECT_EQ(visitor_.messages_received_, 2);
EXPECT_TRUE(message2->EqualFieldValues(*visitor_.last_message_));
@@ -832,7 +808,7 @@
TEST_F(MoqtMessageSpecificTest, FinMidPayload) {
MoqtDataParser parser(&visitor_);
- auto message = std::make_unique<StreamHeaderGroupMessage>();
+ auto message = std::make_unique<StreamHeaderSubgroupMessage>();
parser.ProcessData(
message->PacketSample().substr(0, message->total_message_size() - 1),
true);
@@ -863,28 +839,18 @@
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
-TEST_F(MoqtMessageSpecificTest, NonNormalObjectHasPayload) {
- MoqtDataParser parser(&visitor_);
- char object_stream[] = {
- 0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x02, // varints
- 0x66, 0x6f, 0x6f, // payload = "foo"
- };
- parser.ProcessData(absl::string_view(object_stream, sizeof(object_stream)),
- false);
- EXPECT_EQ(visitor_.parsing_error_,
- "Object with non-normal status has payload");
- EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
-}
-
TEST_F(MoqtMessageSpecificTest, InvalidObjectStatus) {
MoqtDataParser parser(&visitor_);
- char object_stream[] = {
- 0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x06, // varints
- 0x66, 0x6f, 0x6f, // payload = "foo"
+ char stream_header_subgroup[] = {
+ 0x04, // type field
+ 0x03, 0x04, 0x05, 0x08, // varints
+ 0x07, // publisher priority
+ 0x06, 0x00, 0x0f, // object middler; status = 0x0f
};
- parser.ProcessData(absl::string_view(object_stream, sizeof(object_stream)),
- false);
- EXPECT_EQ(visitor_.parsing_error_, "Invalid object status");
+ parser.ProcessData(
+ absl::string_view(stream_header_subgroup, sizeof(stream_header_subgroup)),
+ false);
+ EXPECT_EQ(visitor_.parsing_error_, "Invalid object status provided");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -1232,7 +1198,7 @@
}
TEST_F(MoqtMessageSpecificTest, WrongMessageInDatagram) {
- ObjectStreamMessage message;
+ StreamHeaderSubgroupMessage message;
MoqtObject object;
absl::string_view payload = ParseDatagram(message.PacketSample(), object);
EXPECT_TRUE(payload.empty());
diff --git a/quiche/quic/moqt/moqt_priority.cc b/quiche/quic/moqt/moqt_priority.cc
index 73e0171..22a8f7a 100644
--- a/quiche/quic/moqt/moqt_priority.cc
+++ b/quiche/quic/moqt/moqt_priority.cc
@@ -51,17 +51,17 @@
webtransport::SendOrder SendOrderForStream(MoqtPriority subscriber_priority,
MoqtPriority publisher_priority,
uint64_t group_id,
- uint64_t object_id,
+ uint64_t subgroup_id,
MoqtDeliveryOrder delivery_order) {
const int64_t track_bits = (Flip<8>(subscriber_priority) << 54) |
(Flip<8>(publisher_priority) << 46);
group_id = OnlyLowestNBits<26>(group_id);
- object_id = OnlyLowestNBits<20>(object_id);
+ subgroup_id = OnlyLowestNBits<20>(subgroup_id);
if (delivery_order == MoqtDeliveryOrder::kAscending) {
group_id = Flip<26>(group_id);
}
- object_id = Flip<20>(object_id); // Object ID is always ascending.
- return track_bits | (group_id << 20) | object_id;
+ subgroup_id = Flip<20>(subgroup_id);
+ return track_bits | (group_id << 20) | subgroup_id;
}
webtransport::SendOrder UpdateSendOrderForSubscriberPriority(
diff --git a/quiche/quic/moqt/moqt_priority.h b/quiche/quic/moqt/moqt_priority.h
index 0dfd490..15f8a55 100644
--- a/quiche/quic/moqt/moqt_priority.h
+++ b/quiche/quic/moqt/moqt_priority.h
@@ -30,7 +30,7 @@
uint64_t group_id, MoqtDeliveryOrder delivery_order);
QUICHE_EXPORT webtransport::SendOrder SendOrderForStream(
MoqtPriority subscriber_priority, MoqtPriority publisher_priority,
- uint64_t group_id, uint64_t object_id, MoqtDeliveryOrder delivery_order);
+ uint64_t group_id, uint64_t subgroup_id, MoqtDeliveryOrder delivery_order);
// Returns |send_order| updated with the new |subscriber_priority|.
QUICHE_EXPORT webtransport::SendOrder UpdateSendOrderForSubscriberPriority(
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 723162c..656b8d0 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -191,10 +191,6 @@
void MoqtSession::OnDatagramReceived(absl::string_view datagram) {
MoqtObject message;
absl::string_view payload = ParseDatagram(datagram, message);
- if (payload.empty()) {
- Error(MoqtError::kProtocolViolation, "Malformed datagram");
- return;
- }
QUICHE_DLOG(INFO) << ENDPOINT
<< "Received OBJECT message in datagram for subscribe_id "
<< message.subscribe_id << " for track alias "
@@ -892,23 +888,23 @@
void MoqtSession::IncomingDataStream::OnObjectMessage(const MoqtObject& message,
absl::string_view payload,
bool end_of_message) {
- QUICHE_DVLOG(1)
- << ENDPOINT << "Received OBJECT message on stream "
- << stream_->GetStreamId() << " for subscribe_id " << message.subscribe_id
- << " for track alias " << message.track_alias << " with sequence "
- << message.group_id << ":" << message.object_id << " priority "
- << message.publisher_priority << " forwarding_preference "
- << MoqtForwardingPreferenceToString(message.forwarding_preference)
- << " length " << payload.size() << " explicit length "
- << (message.payload_length.has_value() ? (int)*message.payload_length
- : -1)
- << (end_of_message ? "F" : "");
+ QUICHE_DVLOG(1) << ENDPOINT << "Received OBJECT message on stream "
+ << stream_->GetStreamId() << " for subscribe_id "
+ << message.subscribe_id << " for track alias "
+ << message.track_alias << " with sequence "
+ << message.group_id << ":" << message.object_id
+ << " priority " << message.publisher_priority
+ << " forwarding_preference "
+ << MoqtForwardingPreferenceToString(
+ message.forwarding_preference)
+ << " length " << payload.size() << " length "
+ << message.payload_length << (end_of_message ? "F" : "");
if (!session_->parameters_.deliver_partial_objects) {
if (!end_of_message) { // Buffer partial object.
- if (partial_object_.empty() && message.payload_length.has_value()) {
+ if (partial_object_.empty()) {
// Avoid redundant allocations by reserving the appropriate amount of
// memory if known.
- partial_object_.reserve(*message.payload_length);
+ partial_object_.reserve(message.payload_length);
}
absl::StrAppend(&partial_object_, payload);
return;
@@ -1027,14 +1023,6 @@
if (stream_id.has_value()) {
raw_stream = session_->session_->GetStreamById(*stream_id);
} else {
- if (!window_.IsStreamProvokingObject(sequence, forwarding_preference)) {
- QUIC_DLOG(INFO) << ENDPOINT << "Received object " << sequence
- << ", but there is no stream that it can be mapped to";
- // It is possible that the we are getting notified of objects out of
- // order, but we still have to send objects in a manner consistent with
- // the forwarding preference used.
- return;
- }
raw_stream = session_->OpenOrQueueDataStream(subscription_id_, sequence);
}
if (raw_stream == nullptr) {
@@ -1089,13 +1077,9 @@
return SendOrderForStream(subscriber_priority_, publisher_priority,
/*group_id=*/0, delivery_order);
break;
- case MoqtForwardingPreference::kGroup:
+ case MoqtForwardingPreference::kSubgroup:
return SendOrderForStream(subscriber_priority_, publisher_priority,
- sequence.group, delivery_order);
- break;
- case MoqtForwardingPreference::kObject:
- return SendOrderForStream(subscriber_priority_, publisher_priority,
- sequence.group, sequence.object,
+ sequence.group, sequence.subgroup,
delivery_order);
break;
case MoqtForwardingPreference::kDatagram:
@@ -1270,6 +1254,11 @@
header.publisher_priority = publisher.GetPublisherPriority();
header.object_status = object.status;
header.forwarding_preference = forwarding_preference;
+ // TODO(martinduke): send values other than 0.
+ header.subgroup_id =
+ (forwarding_preference == MoqtForwardingPreference::kSubgroup)
+ ? 0
+ : std::optional<uint64_t>();
header.payload_length = object.payload.length();
quiche::QuicheBuffer serialized_header =
@@ -1288,19 +1277,15 @@
!subscription.InWindow(next_object_);
break;
- case MoqtForwardingPreference::kGroup:
+ case MoqtForwardingPreference::kSubgroup:
++next_object_.object;
fin = object.status == MoqtObjectStatus::kEndOfTrack ||
object.status == MoqtObjectStatus::kEndOfGroup ||
+ object.status == MoqtObjectStatus::kEndOfSubgroup ||
object.status == MoqtObjectStatus::kGroupDoesNotExist ||
!subscription.InWindow(next_object_);
break;
- case MoqtForwardingPreference::kObject:
- QUICHE_DCHECK(!stream_header_written_);
- fin = true;
- break;
-
case MoqtForwardingPreference::kDatagram:
QUICHE_NOTREACHED();
break;
@@ -1347,6 +1332,8 @@
header.publisher_priority = track_publisher_->GetPublisherPriority();
header.object_status = object->status;
header.forwarding_preference = MoqtForwardingPreference::kDatagram;
+ header.subgroup_id = std::nullopt;
+ header.payload_length = object->payload.length();
quiche::QuicheBuffer datagram = session_->framer_.SerializeObjectDatagram(
header, object->payload.AsStringView());
session_->session_->SendOrQueueDatagram(datagram.AsStringView());
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 994e85a..106c172 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -717,7 +717,8 @@
/*object_sequence=*/0,
/*publisher_priority=*/0,
/*object_status=*/MoqtObjectStatus::kNormal,
- /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+ /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+ /*subgroup_id=*/0,
/*payload_length=*/8,
};
webtransport::test::MockStream mock_stream;
@@ -742,7 +743,8 @@
/*object_sequence=*/0,
/*publisher_priority=*/0,
/*object_status=*/MoqtObjectStatus::kNormal,
- /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+ /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+ /*subgroup_id=*/0,
/*payload_length=*/16,
};
webtransport::test::MockStream mock_stream;
@@ -772,7 +774,8 @@
/*object_sequence=*/0,
/*publisher_priority=*/0,
/*object_status=*/MoqtObjectStatus::kNormal,
- /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+ /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+ /*subgroup_id=*/0,
/*payload_length=*/16,
};
webtransport::test::MockStream mock_stream;
@@ -809,7 +812,8 @@
/*object_sequence=*/0,
/*publisher_priority=*/0,
/*object_status=*/MoqtObjectStatus::kNormal,
- /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+ /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+ /*subgroup_id=*/0,
/*payload_length=*/8,
};
webtransport::test::MockStream mock_stream;
@@ -867,7 +871,8 @@
/*object_sequence=*/0,
/*publisher_priority=*/0,
/*object_status=*/MoqtObjectStatus::kNormal,
- /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+ /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+ /*subgroup_id=*/0,
/*payload_length=*/8,
};
webtransport::test::MockStream mock_stream;
@@ -928,7 +933,8 @@
/*object_sequence=*/0,
/*publisher_priority=*/0,
/*object_status=*/MoqtObjectStatus::kNormal,
- /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+ /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+ /*subgroup_id=*/0,
/*payload_length=*/8,
};
webtransport::test::MockStream mock_stream;
@@ -948,7 +954,7 @@
EXPECT_CALL(mock_stream, GetStreamId())
.WillRepeatedly(Return(kIncomingUniStreamId));
object_stream->OnObjectMessage(object, payload, true);
- object.forwarding_preference = MoqtForwardingPreference::kObject;
+ object.forwarding_preference = MoqtForwardingPreference::kTrack;
++object.object_id;
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -980,7 +986,8 @@
/*object_sequence=*/0,
/*publisher_priority=*/0,
/*object_status=*/MoqtObjectStatus::kNormal,
- /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+ /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+ /*subgroup_id=*/0,
/*payload_length=*/8,
};
webtransport::test::MockStream mock_stream;
@@ -1004,7 +1011,7 @@
MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor, 2);
// The track already exists, and has a different forwarding preference.
MoqtSessionPeer::remote_track(&session_, 2)
- .CheckForwardingPreference(MoqtForwardingPreference::kObject);
+ .CheckForwardingPreference(MoqtForwardingPreference::kTrack);
// SUBSCRIBE_OK arrives
MoqtSubscribeOk ok = {
@@ -1025,7 +1032,7 @@
TEST_F(MoqtSessionTest, CreateIncomingDataStreamAndSend) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kObject,
+ auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
FullSequence(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
@@ -1052,7 +1059,7 @@
// Verify first six message fields are sent correctly
bool correct_message = false;
- const std::string kExpectedMessage = {0x00, 0x00, 0x02, 0x05, 0x00, 0x00};
+ const std::string kExpectedMessage = {0x04, 0x00, 0x02, 0x05, 0x00, 0x00};
EXPECT_CALL(mock_stream, Writev(_, _))
.WillOnce([&](absl::Span<const absl::string_view> data,
const quiche::StreamWriteOptions& options) {
@@ -1075,8 +1082,8 @@
TEST_F(MoqtSessionTest, UnidirectionalStreamCannotBeOpened) {
FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(4, 2));
+ auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
+ FullSequence(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
@@ -1118,8 +1125,8 @@
TEST_F(MoqtSessionTest, OutgoingStreamDisappears) {
FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(4, 2));
+ auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
+ FullSequence(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
@@ -1277,7 +1284,7 @@
// Publish in window.
bool correct_message = false;
uint8_t kExpectedMessage[] = {
- 0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x00, 0x64,
+ 0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x08, 0x64,
0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66,
};
EXPECT_CALL(mock_session_, SendOrQueueDatagram(_))
@@ -1311,9 +1318,10 @@
/*publisher_priority=*/0,
/*object_status=*/MoqtObjectStatus::kNormal,
/*forwarding_preference=*/MoqtForwardingPreference::kDatagram,
+ /*subgroup_id=*/std::nullopt,
/*payload_length=*/8,
};
- char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x64,
+ char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x08, 0x64,
0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
EXPECT_CALL(visitor_,
OnObjectFragment(ftn, object.group_id, object.object_id,
@@ -1335,7 +1343,8 @@
/*object_sequence=*/0,
/*publisher_priority=*/0,
/*object_status=*/MoqtObjectStatus::kNormal,
- /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+ /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+ /*subgroup_id=*/0,
/*payload_length=*/8,
};
webtransport::test::MockStream mock_stream;
@@ -1410,8 +1419,8 @@
TEST_F(MoqtSessionTest, QueuedStreamsOpenedInOrder) {
FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0));
+ auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
+ FullSequence(0, 0));
EXPECT_CALL(*track, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
MoqtObjectListener* subscription =
@@ -1482,22 +1491,22 @@
EXPECT_CALL(mock_stream0, Writev(_, _))
.WillOnce([&](absl::Span<const absl::string_view> data,
const quiche::StreamWriteOptions& options) {
- // The Group ID is the 5th byte of the stream.
+ // The Group ID is the 4th byte of the stream.
EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0);
return absl::OkStatus();
});
EXPECT_CALL(mock_stream1, Writev(_, _))
.WillOnce([&](absl::Span<const absl::string_view> data,
const quiche::StreamWriteOptions& options) {
- // The Group ID is the 5th byte of the stream.
- EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 1);
+ // The Group ID is the 4th byte of the stream.
+ EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 1);
return absl::OkStatus();
});
EXPECT_CALL(mock_stream2, Writev(_, _))
.WillOnce([&](absl::Span<const absl::string_view> data,
const quiche::StreamWriteOptions& options) {
- // The Group ID is the 5th byte of the stream.
- EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 2);
+ // The Group ID is the 4th byte of the stream.
+ EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 2);
return absl::OkStatus();
});
session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1505,8 +1514,8 @@
TEST_F(MoqtSessionTest, StreamQueuedForSubscriptionThatDoesntExist) {
FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0));
+ auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
+ FullSequence(0, 0));
EXPECT_CALL(*track, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
MoqtObjectListener* subscription =
@@ -1527,8 +1536,8 @@
TEST_F(MoqtSessionTest, QueuedStreamPriorityChanged) {
FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0));
+ auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
+ FullSequence(0, 0));
EXPECT_CALL(*track, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
// Create two identical subscriptions with different priorities.
@@ -1579,9 +1588,9 @@
.WillOnce([&](absl::Span<const absl::string_view> data,
const quiche::StreamWriteOptions& options) {
// Check subscribe ID is 0.
- EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 0);
+ EXPECT_EQ(static_cast<const uint8_t>(data[0][1]), 0);
// Check Group ID is 0
- EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0);
+ EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 0);
return absl::OkStatus();
});
session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1615,9 +1624,9 @@
.WillOnce([&](absl::Span<const absl::string_view> data,
const quiche::StreamWriteOptions& options) {
// Check subscribe ID is 0.
- EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 1);
+ EXPECT_EQ(static_cast<const uint8_t>(data[0][1]), 1);
// Check Group ID is 0
- EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0);
+ EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 0);
return absl::OkStatus();
});
session_.OnCanCreateNewOutgoingUnidirectionalStream();
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
index 0c4a637..4637bd1 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -66,38 +66,18 @@
return true;
}
-bool SubscribeWindow::IsStreamProvokingObject(
- FullSequence sequence, MoqtForwardingPreference preference) const {
- if (sequence == start_) {
- return true;
- }
- switch (preference) {
- case MoqtForwardingPreference::kTrack:
- return false;
- case MoqtForwardingPreference::kGroup:
- // Note: this assumes that the group starts with object 0.
- return sequence.object == 0;
- case MoqtForwardingPreference::kObject:
- return true;
- case MoqtForwardingPreference::kDatagram:
- QUICHE_DCHECK(false);
- return true;
- }
-}
-
ReducedSequenceIndex::ReducedSequenceIndex(
FullSequence sequence, MoqtForwardingPreference preference) {
switch (preference) {
case MoqtForwardingPreference::kTrack:
sequence_ = FullSequence(0, 0);
break;
- case MoqtForwardingPreference::kGroup:
+ case MoqtForwardingPreference::kSubgroup:
sequence_ = FullSequence(sequence.group, 0);
break;
- case MoqtForwardingPreference::kObject:
case MoqtForwardingPreference::kDatagram:
sequence_ = sequence;
- break;
+ return;
}
}
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index 5e0307d..b850f3e 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -43,11 +43,6 @@
// MoQT, subscription windows are only allowed to shrink, not to expand).
bool UpdateStartEnd(FullSequence start, std::optional<FullSequence> end);
- // Returns true if for a given forwarding preference, the specified sequence
- // number might be the first object on a stream.
- bool IsStreamProvokingObject(FullSequence sequence,
- MoqtForwardingPreference preference) const;
-
private:
FullSequence start_;
std::optional<FullSequence> end_;
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index 49213ff..ba8687e 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -44,8 +44,8 @@
EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 0)), std::nullopt);
}
-TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdGroup) {
- SendStreamMap stream_map(MoqtForwardingPreference::kGroup);
+TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdSubgroup) {
+ SendStreamMap stream_map(MoqtForwardingPreference::kSubgroup);
stream_map.AddStream(FullSequence{4, 0}, 2);
EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), std::nullopt);
stream_map.AddStream(FullSequence{5, 2}, 6);
@@ -57,21 +57,6 @@
EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 2)), std::nullopt);
}
-TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdObject) {
- SendStreamMap stream_map(MoqtForwardingPreference::kObject);
- stream_map.AddStream(FullSequence{4, 0}, 2);
- stream_map.AddStream(FullSequence{4, 1}, 6);
- stream_map.AddStream(FullSequence{4, 2}, 10);
- EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{4, 2}, 14),
- "Stream already added");
- EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 0)), 2);
- EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 2)), 10);
- EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 4)), std::nullopt);
- EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), std::nullopt);
- stream_map.RemoveStream(FullSequence(4, 2), 10);
- EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 2)), std::nullopt);
-}
-
TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdDatagram) {
SendStreamMap stream_map(MoqtForwardingPreference::kDatagram);
EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{4, 0}, 2),
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 4b0ff92..cd62dd8 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -27,9 +27,9 @@
TEST_F(RemoteTrackTest, UpdateForwardingPreference) {
EXPECT_TRUE(
- track_.CheckForwardingPreference(MoqtForwardingPreference::kObject));
+ track_.CheckForwardingPreference(MoqtForwardingPreference::kSubgroup));
EXPECT_TRUE(
- track_.CheckForwardingPreference(MoqtForwardingPreference::kObject));
+ track_.CheckForwardingPreference(MoqtForwardingPreference::kSubgroup));
EXPECT_FALSE(
track_.CheckForwardingPreference(MoqtForwardingPreference::kDatagram));
}
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 7e4fafe..b274194 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -139,6 +139,10 @@
QUIC_LOG(INFO) << "OBJECT Object Send Order mismatch";
return false;
}
+ if (cast.subgroup_id != object_.subgroup_id) {
+ QUIC_LOG(INFO) << "OBJECT Subgroup ID mismatch";
+ return false;
+ }
if (cast.payload_length != object_.payload_length) {
QUIC_LOG(INFO) << "OBJECT Payload Length mismatch";
return false;
@@ -159,23 +163,8 @@
/*publisher_priority=*/7,
/*object_status=*/MoqtObjectStatus::kNormal,
/*forwarding_preference=*/MoqtForwardingPreference::kTrack,
- /*payload_length=*/std::nullopt,
- };
-};
-
-class QUICHE_NO_EXPORT ObjectStreamMessage : public ObjectMessage {
- public:
- ObjectStreamMessage() : ObjectMessage() {
- SetWireImage(raw_packet_, sizeof(raw_packet_));
- object_.forwarding_preference = MoqtForwardingPreference::kObject;
- }
-
- void ExpandVarints() override { ExpandVarintsImpl("vvvvv-v---"); }
-
- private:
- uint8_t raw_packet_[10] = {
- 0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, // varints
- 0x66, 0x6f, 0x6f, // payload = "foo"
+ /*subgroup_id=*/std::nullopt,
+ /*payload_length=*/3,
};
};
@@ -190,8 +179,9 @@
private:
uint8_t raw_packet_[10] = {
- 0x01, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00, // varints
- 0x66, 0x6f, 0x6f, // payload = "foo"
+ 0x01, 0x03, 0x04, 0x05, 0x06, // varints
+ 0x07, // publisher priority
+ 0x03, 0x66, 0x6f, 0x6f, // payload = "foo"
};
};
@@ -205,16 +195,17 @@
object_.payload_length = 3;
}
- void ExpandVarints() override { ExpandVarintsImpl("--vv-vvv"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvv-vvv"); }
private:
// Some tests check that a FIN sent at the halfway point of a message results
// in an error. Without the unnecessary expanded varint 0x0405, the halfway
// point falls at the end of the Stream Header, which is legal. Expand the
// varint so that the FIN would be illegal.
- uint8_t raw_packet_[11] = {
- 0x40, 0x50, // two byte type field
- 0x03, 0x04, 0x07, // varints
+ uint8_t raw_packet_[10] = {
+ 0x02, // type field
+ 0x03, 0x04, // varints
+ 0x07, // publisher priority
0x05, 0x06, // object middler
0x03, 0x66, 0x6f, 0x6f, // payload = "foo"
};
@@ -226,7 +217,6 @@
StreamMiddlerTrackMessage() : ObjectMessage() {
SetWireImage(raw_packet_, sizeof(raw_packet_));
object_.forwarding_preference = MoqtForwardingPreference::kTrack;
- object_.payload_length = 3;
object_.group_id = 9;
object_.object_id = 10;
}
@@ -240,35 +230,47 @@
};
};
-class QUICHE_NO_EXPORT StreamHeaderGroupMessage : public ObjectMessage {
+class QUICHE_NO_EXPORT StreamHeaderSubgroupMessage : public ObjectMessage {
public:
- StreamHeaderGroupMessage() : ObjectMessage() {
+ StreamHeaderSubgroupMessage() : ObjectMessage() {
SetWireImage(raw_packet_, sizeof(raw_packet_));
- object_.forwarding_preference = MoqtForwardingPreference::kGroup;
- object_.payload_length = 3;
+ object_.forwarding_preference = MoqtForwardingPreference::kSubgroup;
+ object_.subgroup_id = 8;
}
- void ExpandVarints() override { ExpandVarintsImpl("--vvv-vv"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvvvv-vv"); }
+
+ bool SetPayloadLength(uint8_t payload_length) {
+ if (payload_length > 63) {
+ // This only supports one-byte varints.
+ return false;
+ }
+ object_.payload_length = payload_length;
+ raw_packet_[7] = payload_length;
+ SetWireImage(raw_packet_, sizeof(raw_packet_));
+ return true;
+ }
private:
uint8_t raw_packet_[11] = {
- 0x40, 0x51, // two-byte type field
- 0x03, 0x04, 0x05, 0x07, // varints
+ 0x04, // type field
+ 0x03, 0x04, 0x05, 0x08, // varints
+ 0x07, // publisher priority
0x06, 0x03, 0x66, 0x6f, 0x6f, // object middler; payload = "foo"
};
};
// Used only for tests that process multiple objects on one stream.
-class QUICHE_NO_EXPORT StreamMiddlerGroupMessage : public ObjectMessage {
+class QUICHE_NO_EXPORT StreamMiddlerSubgroupMessage : public ObjectMessage {
public:
- StreamMiddlerGroupMessage() : ObjectMessage() {
+ StreamMiddlerSubgroupMessage() : ObjectMessage() {
SetWireImage(raw_packet_, sizeof(raw_packet_));
- object_.forwarding_preference = MoqtForwardingPreference::kGroup;
- object_.payload_length = 3;
+ object_.forwarding_preference = MoqtForwardingPreference::kSubgroup;
+ object_.subgroup_id = 8;
object_.object_id = 9;
}
- void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vv"); }
private:
uint8_t raw_packet_[5] = {
@@ -1187,14 +1189,12 @@
static inline std::unique_ptr<TestMessageBase> CreateTestDataStream(
MoqtDataStreamType type) {
switch (type) {
- case MoqtDataStreamType::kObjectStream:
- return std::make_unique<ObjectStreamMessage>();
case MoqtDataStreamType::kObjectDatagram:
return std::make_unique<ObjectDatagramMessage>();
case MoqtDataStreamType::kStreamHeaderTrack:
return std::make_unique<StreamHeaderTrackMessage>();
- case MoqtDataStreamType::kStreamHeaderGroup:
- return std::make_unique<StreamHeaderGroupMessage>();
+ case MoqtDataStreamType::kStreamHeaderSubgroup:
+ return std::make_unique<StreamHeaderSubgroupMessage>();
case MoqtDataStreamType::kPadding:
return nullptr;
}
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index b706469..68208b1 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -165,7 +165,7 @@
FullTrackName my_track_name =
chat_strings_->GetFullTrackNameFromUsername(username_);
queue_ = std::make_shared<MoqtOutgoingQueue>(
- my_track_name, MoqtForwardingPreference::kObject);
+ my_track_name, MoqtForwardingPreference::kSubgroup);
publisher_.Add(queue_);
session_->set_publisher(&publisher_);
MoqtOutgoingAnnounceCallback announce_callback =
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc
index 29d0842..028786e 100644
--- a/quiche/quic/moqt/tools/chat_server.cc
+++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -60,6 +60,9 @@
}
ChatServer::ChatServerSessionHandler::~ChatServerSessionHandler() {
+ if (!server_->is_running_) {
+ return;
+ }
if (username_.has_value()) {
server_->DeleteUser(*username_);
}
@@ -125,7 +128,7 @@
: server_(std::move(proof_source), std::move(incoming_session_callback_)),
strings_(chat_id),
catalog_(std::make_shared<MoqtOutgoingQueue>(
- strings_.GetCatalogName(), MoqtForwardingPreference::kGroup)),
+ strings_.GetCatalogName(), MoqtForwardingPreference::kSubgroup)),
remote_track_visitor_(this) {
catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
quiche::SimpleBufferAllocator::Get(),
@@ -145,6 +148,7 @@
ChatServer::~ChatServer() {
// Kill all sessions so that the callback doesn't fire when the server is
// destroyed.
+ is_running_ = false;
server_.quic_server().Shutdown();
}
@@ -156,7 +160,7 @@
// Add a local track.
user_queues_[username] = std::make_shared<MoqtLiveRelayQueue>(
strings_.GetFullTrackNameFromUsername(username),
- MoqtForwardingPreference::kObject);
+ MoqtForwardingPreference::kSubgroup);
publisher_.Add(user_queues_[username]);
}
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
index a399af9..7bb4f13 100644
--- a/quiche/quic/moqt/tools/chat_server.h
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -103,6 +103,7 @@
MoqtIncomingSessionCallback incoming_session_callback_ =
[&](absl::string_view path) { return IncomingSessionHandler(path); };
+ bool is_running_ = true;
MoqtServer server_;
std::list<ChatServerSessionHandler> sessions_;
MoqChatStrings strings_;
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index e84a37d..d111d16 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -118,7 +118,7 @@
QuicBandwidth bitrate)
: Actor(simulator, actor_name),
queue_(std::make_shared<MoqtOutgoingQueue>(
- track_name, MoqtForwardingPreference::kGroup)),
+ track_name, MoqtForwardingPreference::kSubgroup)),
keyframe_interval_(keyframe_interval),
time_between_frames_(QuicTimeDelta::FromMicroseconds(1.0e6 / fps)),
i_to_p_ratio_(i_to_p_ratio),