Make MoqtForwardingPreference a track property.
Part of draft-03 update, but not part of the wire image.
Not in production.
PiperOrigin-RevId: 614812113
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 5ff0f79..2905bd2 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -268,7 +268,8 @@
FullTrackName full_track_name("foo", "bar");
MockLocalTrackVisitor server_visitor;
MockRemoteTrackVisitor client_visitor;
- server_->session()->AddLocalTrack(full_track_name, &server_visitor);
+ server_->session()->AddLocalTrack(
+ full_track_name, MoqtForwardingPreference::kObject, &server_visitor);
std::optional<absl::string_view> expected_reason = std::nullopt;
bool received_ok = false;
EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
@@ -286,7 +287,8 @@
FullTrackName full_track_name("foo", "bar");
MockLocalTrackVisitor server_visitor;
MockRemoteTrackVisitor client_visitor;
- server_->session()->AddLocalTrack(full_track_name, &server_visitor);
+ server_->session()->AddLocalTrack(
+ full_track_name, MoqtForwardingPreference::kObject, &server_visitor);
std::optional<absl::string_view> expected_reason = std::nullopt;
bool received_ok = false;
EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
@@ -304,7 +306,8 @@
FullTrackName full_track_name("foo", "bar");
MockLocalTrackVisitor server_visitor;
MockRemoteTrackVisitor client_visitor;
- server_->session()->AddLocalTrack(full_track_name, &server_visitor);
+ server_->session()->AddLocalTrack(
+ full_track_name, MoqtForwardingPreference::kObject, &server_visitor);
std::optional<absl::string_view> expected_reason = std::nullopt;
bool received_ok = false;
EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 2c20105..f17b6ec 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -125,6 +125,11 @@
friend H AbslHashValue(H h, const FullTrackName& m);
};
+template <typename H>
+H AbslHashValue(H h, const FullTrackName& m) {
+ return H::combine(std::move(h), m.track_namespace, m.track_name);
+}
+
// These are absolute sequence numbers.
struct FullSequence {
uint64_t group = 0;
@@ -150,8 +155,8 @@
};
template <typename H>
-H AbslHashValue(H h, const FullTrackName& m) {
- return H::combine(std::move(h), m.track_namespace, m.track_name);
+H AbslHashValue(H h, const FullSequence& m) {
+ return H::combine(std::move(h), m.group, m.object);
}
struct QUICHE_EXPORT MoqtClientSetup {
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index a8ee431..25a9e74 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -149,8 +149,10 @@
}
void MoqtSession::AddLocalTrack(const FullTrackName& full_track_name,
+ MoqtForwardingPreference forwarding_preference,
LocalTrack::Visitor* visitor) {
- local_tracks_.try_emplace(full_track_name, full_track_name, visitor);
+ local_tracks_.try_emplace(full_track_name, full_track_name,
+ forwarding_preference, visitor);
}
// TODO: Create state that allows ANNOUNCE_OK/ERROR on spurious namespaces to
@@ -306,22 +308,38 @@
{{"", ""}, nullptr});
}
visitor = subscribe_it->second.visitor;
+ // This does not check that early objects have a consistent forwarding
+ // preference.
return std::pair<FullTrackName, RemoteTrack::Visitor*>(
{{subscribe_it->second.message.track_namespace,
subscribe_it->second.message.track_name},
subscribe_it->second.visitor});
}
+ RemoteTrack& track = it->second;
+ if (!track.CheckForwardingPreference(message.forwarding_preference)) {
+ // Incorrect forwarding preference.
+ Error(MoqtError::kProtocolViolation,
+ "Forwarding preference changes mid-track");
+ return std::pair<FullTrackName, RemoteTrack::Visitor*>({{"", ""}, nullptr});
+ }
return std::pair<FullTrackName, RemoteTrack::Visitor*>(
- {{it->second.full_track_name().track_namespace,
- it->second.full_track_name().track_name},
- it->second.visitor()});
+ {{track.full_track_name().track_namespace,
+ track.full_track_name().track_name},
+ track.visitor()});
}
bool MoqtSession::PublishObject(const FullTrackName& full_track_name,
uint64_t group_id, uint64_t object_id,
uint64_t object_send_order,
- MoqtForwardingPreference forwarding_preference,
absl::string_view payload, bool end_of_stream) {
+ auto track_it = local_tracks_.find(full_track_name);
+ if (track_it == local_tracks_.end()) {
+ QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT for nonexistent track";
+ return false;
+ }
+ LocalTrack& track = track_it->second;
+ MoqtForwardingPreference forwarding_preference =
+ track.forwarding_preference();
if ((forwarding_preference == MoqtForwardingPreference::kObject ||
forwarding_preference == MoqtForwardingPreference::kDatagram) &&
!end_of_stream) {
@@ -330,12 +348,6 @@
"immediately closed";
return false;
}
- auto track_it = local_tracks_.find(full_track_name);
- if (track_it == local_tracks_.end()) {
- QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT for nonexistent track";
- return false;
- }
- LocalTrack& track = track_it->second;
track.SentSequence(FullSequence(group_id, object_id));
std::vector<SubscribeWindow*> subscriptions =
track.ShouldSend({group_id, object_id});
@@ -365,8 +377,7 @@
}
bool new_stream = false;
std::optional<webtransport::StreamId> stream_id =
- subscription->GetStreamForSequence({group_id, object_id},
- forwarding_preference);
+ subscription->GetStreamForSequence(FullSequence(group_id, object_id));
if (!stream_id.has_value()) {
new_stream = true;
stream_id = OpenUnidirectionalStream();
@@ -377,8 +388,7 @@
continue;
}
if (!end_of_stream) {
- subscription->AddStream(forwarding_preference, *stream_id, group_id,
- object_id);
+ subscription->AddStream(group_id, object_id, *stream_id);
}
}
webtransport::Stream* stream = session_->GetStreamById(*stream_id);
@@ -403,7 +413,7 @@
<< object.group_id << ":" << object.object_id
<< " on stream " << *stream_id;
if (end_of_stream && !new_stream) {
- subscription->RemoveStream(forwarding_preference, group_id, object_id);
+ subscription->RemoveStream(group_id, object_id);
}
}
return (failures == 0);
@@ -595,9 +605,11 @@
// and only then sends the Subscribe OK.
SubscribeWindow window =
end.has_value()
- ? SubscribeWindow(message.subscribe_id, start->group, start->object,
- end->group, end->object)
- : SubscribeWindow(message.subscribe_id, start->group,
+ ? SubscribeWindow(message.subscribe_id,
+ track.forwarding_preference(), start->group,
+ start->object, end->group, end->object)
+ : SubscribeWindow(message.subscribe_id,
+ track.forwarding_preference(), start->group,
start->object);
std::optional<absl::string_view> past_objects_available =
track.visitor()->OnSubscribeForPast(window);
@@ -613,12 +625,11 @@
QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
<< message.track_namespace << ":" << message.track_name;
if (!end.has_value()) {
- track.AddWindow(
- SubscribeWindow(message.subscribe_id, start->group, start->object));
+ track.AddWindow(message.subscribe_id, start->group, start->object);
return;
}
- track.AddWindow(SubscribeWindow(message.subscribe_id, start->group,
- start->object, end->group, end->object));
+ track.AddWindow(message.subscribe_id, start->group, start->object, end->group,
+ end->object);
}
void MoqtSession::Stream::OnSubscribeOkMessage(const MoqtSubscribeOk& message) {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index adf5cb8..2b1ef4b 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -90,6 +90,7 @@
// is nullptr, then incoming SUBSCRIBE for objects in the path will receive
// SUBSCRIBE_OK, but never actually get the objects.
void AddLocalTrack(const FullTrackName& full_track_name,
+ MoqtForwardingPreference forwarding_preference,
LocalTrack::Visitor* visitor);
// Send an ANNOUNCE message for |track_namespace|, and call
// |announce_callback| when the response arrives. Will fail immediately if
@@ -128,7 +129,6 @@
// partial objects.
bool PublishObject(const FullTrackName& full_track_name, uint64_t group_id,
uint64_t object_id, uint64_t object_send_order,
- MoqtForwardingPreference forwarding_preference,
absl::string_view payload, bool end_of_stream);
// TODO: Add an API to FIN the stream for a particular track/group/object.
// TODO: Add an API to send partial objects.
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index eee03bf..9b901f2 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -116,7 +116,7 @@
ASSERT_NE(it, session->local_tracks_.end());
LocalTrack& track = it->second;
track.set_track_alias(track_alias);
- track.AddWindow(SubscribeWindow(subscribe_id, start_group, start_object));
+ track.AddWindow(subscribe_id, start_group, start_object);
session->used_track_aliases_.emplace(track_alias);
}
@@ -300,7 +300,9 @@
// Add the track. Now Subscribe should succeed.
MockLocalTrackVisitor local_track_visitor;
- session_.AddLocalTrack(FullTrackName("foo", "bar"), &local_track_visitor);
+ session_.AddLocalTrack(FullTrackName("foo", "bar"),
+ MoqtForwardingPreference::kObject,
+ &local_track_visitor);
correct_message = true;
EXPECT_CALL(mock_stream, Writev(_, _))
.WillOnce([&](absl::Span<const absl::string_view> data,
@@ -391,7 +393,8 @@
MockLocalTrackVisitor local_track_visitor;
FullTrackName ftn("foo", "bar");
EXPECT_FALSE(session_.HasSubscribers(ftn));
- session_.AddLocalTrack(ftn, &local_track_visitor);
+ session_.AddLocalTrack(ftn, MoqtForwardingPreference::kGroup,
+ &local_track_visitor);
EXPECT_FALSE(session_.HasSubscribers(ftn));
// Peer subscribes.
@@ -425,11 +428,11 @@
TEST_F(MoqtSessionTest, SubscribeForPast) {
MockLocalTrackVisitor local_track_visitor;
FullTrackName ftn("foo", "bar");
- session_.AddLocalTrack(ftn, &local_track_visitor);
+ session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
+ &local_track_visitor);
// Send Sequence (2, 0) so that next_sequence is set correctly.
- session_.PublishObject(ftn, 2, 0, 0, MoqtForwardingPreference::kObject, "foo",
- true);
+ session_.PublishObject(ftn, 2, 0, 0, "foo", true);
// Peer subscribes to (0, 0)
MoqtSubscribe request = {
/*subscribe_id=*/1,
@@ -687,13 +690,13 @@
StrictMock<webtransport::test::MockStream> mock_stream;
FullTrackName ftn("foo", "bar");
MockLocalTrackVisitor track_visitor;
- session_.AddLocalTrack(ftn, &track_visitor);
+ session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
+ &track_visitor);
MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
// No subscription; this is a no-op except to update next_sequence.
EXPECT_CALL(mock_stream, Writev(_, _)).Times(0);
- session_.PublishObject(ftn, 4, 1, 0, MoqtForwardingPreference::kObject,
- "deadbeef", true);
+ session_.PublishObject(ftn, 4, 1, 0, "deadbeef", true);
EXPECT_EQ(MoqtSessionPeer::next_sequence(&session_, ftn), FullSequence(4, 2));
// Publish in window.
@@ -717,8 +720,7 @@
sizeof(kExpectedMessage)));
return absl::OkStatus();
});
- session_.PublishObject(ftn, 5, 0, 0, MoqtForwardingPreference::kObject,
- "deadbeef", true);
+ session_.PublishObject(ftn, 5, 0, 0, "deadbeef", true);
EXPECT_TRUE(correct_message);
}
@@ -730,20 +732,21 @@
StrictMock<webtransport::test::MockStream> mock_stream;
FullTrackName ftn("foo", "bar");
MockLocalTrackVisitor track_visitor;
- session_.AddLocalTrack(ftn, &track_visitor);
+ session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
+ &track_visitor);
MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
;
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(false));
- EXPECT_FALSE(session_.PublishObject(
- ftn, 5, 0, 0, MoqtForwardingPreference::kObject, "deadbeef", true));
+ EXPECT_FALSE(session_.PublishObject(ftn, 5, 0, 0, "deadbeef", true));
}
TEST_F(MoqtSessionTest, GetStreamByIdFails) {
StrictMock<webtransport::test::MockStream> mock_stream;
FullTrackName ftn("foo", "bar");
MockLocalTrackVisitor track_visitor;
- session_.AddLocalTrack(ftn, &track_visitor);
+ session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
+ &track_visitor);
MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(true));
@@ -754,14 +757,14 @@
.WillRepeatedly(Return(kOutgoingUniStreamId));
EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
.WillOnce(Return(nullptr));
- EXPECT_FALSE(session_.PublishObject(
- ftn, 5, 0, 0, MoqtForwardingPreference::kObject, "deadbeef", true));
+ EXPECT_FALSE(session_.PublishObject(ftn, 5, 0, 0, "deadbeef", true));
}
TEST_F(MoqtSessionTest, SubscribeProposesBadTrackAlias) {
MockLocalTrackVisitor local_track_visitor;
FullTrackName ftn("foo", "bar");
- session_.AddLocalTrack(ftn, &local_track_visitor);
+ session_.AddLocalTrack(ftn, MoqtForwardingPreference::kGroup,
+ &local_track_visitor);
MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
// Peer subscribes.
@@ -882,7 +885,7 @@
TEST_F(MoqtSessionTest, ReceiveUnsubscribe) {
FullTrackName ftn("foo", "bar");
MockLocalTrackVisitor visitor;
- session_.AddLocalTrack(ftn, &visitor);
+ session_.AddLocalTrack(ftn, MoqtForwardingPreference::kTrack, &visitor);
MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 1, 3, 4);
EXPECT_TRUE(session_.HasSubscribers(ftn));
StrictMock<webtransport::test::MockStream> mock_stream;
@@ -898,7 +901,8 @@
TEST_F(MoqtSessionTest, SendDatagram) {
FullTrackName ftn("foo", "bar");
MockLocalTrackVisitor track_visitor;
- session_.AddLocalTrack(ftn, &track_visitor);
+ session_.AddLocalTrack(ftn, MoqtForwardingPreference::kDatagram,
+ &track_visitor);
MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
// Publish in window.
@@ -916,8 +920,7 @@
return webtransport::DatagramStatus(
webtransport::DatagramStatusCode::kSuccess, "");
});
- session_.PublishObject(ftn, 5, 0, 0, MoqtForwardingPreference::kDatagram,
- "deadbeef", true);
+ session_.PublishObject(ftn, 5, 0, 0, "deadbeef", true);
EXPECT_TRUE(correct_message);
}
@@ -945,6 +948,37 @@
session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
}
+TEST_F(MoqtSessionTest, ForwardingPreferenceMismatch) {
+ MockRemoteTrackVisitor visitor_;
+ FullTrackName ftn("foo", "bar");
+ std::string payload = "deadbeef";
+ MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor_, 2);
+ MoqtObject object = {
+ /*subscribe_id=*/1,
+ /*track_alias=*/2,
+ /*group_sequence=*/0,
+ /*object_sequence=*/0,
+ /*object_send_order=*/0,
+ /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+ /*payload_length=*/8,
+ };
+ StrictMock<webtransport::test::MockStream> mock_stream;
+ std::unique_ptr<MoqtParserVisitor> object_stream =
+ MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+
+ EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(1);
+ EXPECT_CALL(mock_stream, GetStreamId())
+ .WillRepeatedly(Return(kIncomingUniStreamId));
+ object_stream->OnObjectMessage(object, payload, true);
+ ++object.object_id;
+ object.forwarding_preference = MoqtForwardingPreference::kTrack;
+ EXPECT_CALL(mock_session_,
+ CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
+ "Forwarding preference changes mid-track"))
+ .Times(1);
+ object_stream->OnObjectMessage(object, payload, true);
+}
+
// TODO: Cover more error cases in the above
} // namespace test
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
index 9bea570..bcbb40a 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -22,74 +22,49 @@
}
std::optional<webtransport::StreamId> SubscribeWindow::GetStreamForSequence(
- FullSequence sequence,
- MoqtForwardingPreference forwarding_preference) const {
- if (forwarding_preference == MoqtForwardingPreference::kTrack) {
- return track_stream_;
- }
- auto group_it = group_streams_.find(sequence.group);
- if (group_it == group_streams_.end()) {
+ FullSequence sequence) const {
+ FullSequence index = SequenceToIndex(sequence);
+ auto stream_it = send_streams_.find(index);
+ if (stream_it == send_streams_.end()) {
return std::nullopt;
}
- if (forwarding_preference == MoqtForwardingPreference::kGroup) {
- return group_it->second.group_stream;
- }
- auto object_it = group_it->second.object_streams.find(sequence.object);
- if (object_it == group_it->second.object_streams.end()) {
- return std::nullopt;
- }
- return object_it->second;
+ return stream_it->second;
}
-void SubscribeWindow::AddStream(MoqtForwardingPreference forwarding_preference,
- uint64_t group_id, uint64_t object_id,
+void SubscribeWindow::AddStream(uint64_t group_id, uint64_t object_id,
webtransport::StreamId stream_id) {
- if (forwarding_preference == MoqtForwardingPreference::kTrack) {
- QUIC_BUG_IF(quic_bug_moqt_draft_02_01, track_stream_.has_value())
- << "Track stream already assigned";
- track_stream_ = stream_id;
+ if (!InWindow(FullSequence(group_id, object_id))) {
return;
}
- if (forwarding_preference == MoqtForwardingPreference::kGroup) {
- QUIC_BUG_IF(quic_bug_moqt_draft_02_02,
- group_streams_[group_id].group_stream.has_value())
- << "Group stream already assigned";
- group_streams_[group_id].group_stream = stream_id;
+ FullSequence index = SequenceToIndex(FullSequence(group_id, object_id));
+ if (forwarding_preference_ == MoqtForwardingPreference::kDatagram) {
+ QUIC_BUG(quic_bug_moqt_draft_03_01) << "Adding a stream for datagram";
return;
}
- // ObjectStream or ObjectPreferDatagram
- QUIC_BUG_IF(quic_bug_moqt_draft_02_03,
- group_streams_[group_id].object_streams.contains(object_id))
- << "Object stream already assigned";
- group_streams_[group_id].object_streams[object_id] = stream_id;
+ auto stream_it = send_streams_.find(index);
+ if (stream_it != send_streams_.end()) {
+ QUIC_BUG(quic_bug_moqt_draft_03_02) << "Stream already added";
+ return;
+ }
+ send_streams_[index] = stream_id;
}
-void SubscribeWindow::RemoveStream(
- MoqtForwardingPreference forwarding_preference, uint64_t group_id,
- uint64_t object_id) {
- if (forwarding_preference == moqt::MoqtForwardingPreference::kTrack) {
- track_stream_ = std::nullopt;
- return;
- }
- auto group_it = group_streams_.find(group_id);
- if (group_it == group_streams_.end()) {
- return;
- }
- GroupStreams& group = group_it->second;
- if (forwarding_preference == moqt::MoqtForwardingPreference::kGroup) {
- group.group_stream = std::nullopt;
- if (group.object_streams.empty()) {
- group_streams_.erase(group_id);
- }
- return;
- }
- // ObjectStream or ObjectPreferDatagram
- if (group.object_streams.contains(object_id)) {
- group_streams_[group_id].object_streams.erase(object_id);
- if (!group.group_stream.has_value() &&
- group_streams_[group_id].object_streams.empty()) {
- group_streams_.erase(group_id);
- }
+void SubscribeWindow::RemoveStream(uint64_t group_id, uint64_t object_id) {
+ FullSequence index = SequenceToIndex(FullSequence(group_id, object_id));
+ send_streams_.erase(index);
+}
+
+FullSequence SubscribeWindow::SequenceToIndex(FullSequence sequence) const {
+ switch (forwarding_preference_) {
+ case MoqtForwardingPreference::kTrack:
+ return FullSequence(0, 0);
+ case MoqtForwardingPreference::kGroup:
+ return FullSequence(sequence.group, 0);
+ case MoqtForwardingPreference::kObject:
+ return sequence;
+ case MoqtForwardingPreference::kDatagram:
+ QUIC_BUG(quic_bug_moqt_draft_03_01) << "No stream for datagram";
+ return FullSequence(0, 0);
}
}
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index e6ec05d..53652ed 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -9,6 +9,7 @@
#include <optional>
#include <vector>
+#include "absl/container/flat_hash_map.h"
#include "absl/container/node_hash_map.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/common/platform/api/quiche_export.h"
@@ -21,17 +22,22 @@
class QUICHE_EXPORT SubscribeWindow {
public:
// Creates a half-open window.
- SubscribeWindow(uint64_t subscribe_id, uint64_t start_group,
- uint64_t start_object)
- : subscribe_id_(subscribe_id), start_({start_group, start_object}) {}
-
- // Creates a closed window.
- SubscribeWindow(uint64_t subscribe_id, uint64_t start_group,
- uint64_t start_object, uint64_t end_group,
- uint64_t end_object)
+ SubscribeWindow(uint64_t subscribe_id,
+ MoqtForwardingPreference forwarding_preference,
+ uint64_t start_group, uint64_t start_object)
: subscribe_id_(subscribe_id),
start_({start_group, start_object}),
- end_(FullSequence(end_group, end_object)) {}
+ forwarding_preference_(forwarding_preference) {}
+
+ // Creates a closed window.
+ SubscribeWindow(uint64_t subscribe_id,
+ MoqtForwardingPreference forwarding_preference,
+ uint64_t start_group, uint64_t start_object,
+ uint64_t end_group, uint64_t end_object)
+ : subscribe_id_(subscribe_id),
+ start_({start_group, start_object}),
+ end_(FullSequence(end_group, end_object)),
+ forwarding_preference_(forwarding_preference) {}
uint64_t subscribe_id() const { return subscribe_id_; }
@@ -39,45 +45,59 @@
// Returns the stream to send |sequence| on, if already opened.
std::optional<webtransport::StreamId> GetStreamForSequence(
- FullSequence sequence,
- MoqtForwardingPreference forwarding_preference) const;
+ FullSequence sequence) const;
// Records what stream is being used for a track, group, or object depending
// on |forwarding_preference|. Triggers QUIC_BUG if already assigned.
- void AddStream(MoqtForwardingPreference forwarding_preference,
- uint64_t group_id, uint64_t object_id,
+ void AddStream(uint64_t group_id, uint64_t object_id,
webtransport::StreamId stream_id);
- void RemoveStream(MoqtForwardingPreference forwarding_preference,
- uint64_t group_id, uint64_t object_id);
+ void RemoveStream(uint64_t group_id, uint64_t object_id);
private:
- struct GroupStreams {
- std::optional<webtransport::StreamId> group_stream;
- absl::flat_hash_map<uint64_t, webtransport::StreamId> object_streams;
- };
+ // Converts an object sequence number into one that matches the way that
+ // stream IDs are being mapped. (See the comment for send_streams_ below.)
+ FullSequence SequenceToIndex(FullSequence sequence) const;
+
const uint64_t subscribe_id_;
const FullSequence start_;
const std::optional<FullSequence> end_ = std::nullopt;
- // Open streams for this subscription
- std::optional<webtransport::StreamId> track_stream_;
- absl::flat_hash_map<uint64_t, GroupStreams> group_streams_;
+ // Store open streams for this subscription. If the forwarding preference is
+ // kTrack, there is one entry under sequence (0, 0). If kGroup, each entry is
+ // under (group, 0). If kObject, it's tracked under the full sequence. If
+ // kDatagram, the map is empty.
+ absl::flat_hash_map<FullSequence, webtransport::StreamId> send_streams_;
+ // The forwarding preference for this track; informs how the streams are
+ // mapped.
+ const MoqtForwardingPreference forwarding_preference_;
};
// Class to keep track of the sequence number blocks to which a peer is
// subscribed.
class QUICHE_EXPORT MoqtSubscribeWindows {
public:
- MoqtSubscribeWindows() {}
+ MoqtSubscribeWindows(MoqtForwardingPreference forwarding_preference)
+ : forwarding_preference_(forwarding_preference) {}
// Returns a vector of subscribe IDs that apply to the object. They will be in
// reverse order of the AddWindow calls.
std::vector<SubscribeWindow*> SequenceIsSubscribed(FullSequence sequence);
- // |window| has already been converted into absolute sequence numbers. An
+ // |start_group| and |start_object| must be absolute sequence numbers. An
// optimization could consolidate overlapping subscribe windows.
- void AddWindow(SubscribeWindow window) {
- windows_.emplace(window.subscribe_id(), window);
+ void AddWindow(uint64_t subscribe_id, uint64_t start_group,
+ uint64_t start_object) {
+ windows_.emplace(subscribe_id,
+ SubscribeWindow(subscribe_id, forwarding_preference_,
+ start_group, start_object));
+ }
+ void AddWindow(uint64_t subscribe_id, uint64_t start_group,
+ uint64_t start_object, uint64_t end_group,
+ uint64_t end_object) {
+ windows_.emplace(
+ subscribe_id,
+ SubscribeWindow(subscribe_id, forwarding_preference_, start_group,
+ start_object, end_group, end_object));
}
void RemoveWindow(uint64_t subscribe_id) { windows_.erase(subscribe_id); }
@@ -94,6 +114,7 @@
private:
// Indexed by Subscribe ID.
absl::node_hash_map<uint64_t, SubscribeWindow> windows_;
+ const MoqtForwardingPreference forwarding_preference_;
};
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index 5cfbbf5..c509be0 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -7,6 +7,7 @@
#include <optional>
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/platform/api/quic_expect_bug.h"
#include "quiche/quic/platform/api/quic_test.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/web_transport/web_transport.h"
@@ -17,139 +18,88 @@
class QUICHE_EXPORT SubscribeWindowTest : public quic::test::QuicTest {
public:
- SubscribeWindowTest()
- : window_(/*subscribe_id=*/2, /*start_group=*/4,
- /*start_object=*/0, /*end_group=*/5,
- /*end_object=*/1) {}
+ SubscribeWindowTest() {}
- SubscribeWindow window_;
+ const uint64_t subscribe_id_ = 2;
+ const uint64_t start_group_ = 4;
+ const uint64_t start_object_ = 0;
+ const uint64_t end_group_ = 5;
+ const uint64_t end_object_ = 5;
};
TEST_F(SubscribeWindowTest, Queries) {
- EXPECT_EQ(window_.subscribe_id(), 2);
- EXPECT_TRUE(window_.InWindow(FullSequence(4, 0)));
- EXPECT_TRUE(window_.InWindow(FullSequence(5, 1)));
- EXPECT_FALSE(window_.InWindow(FullSequence(5, 2)));
- EXPECT_FALSE(window_.InWindow(FullSequence(6, 0)));
- EXPECT_FALSE(window_.InWindow(FullSequence(3, 12)));
+ SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
+ start_group_, start_object_, end_group_, end_object_);
+ EXPECT_EQ(window.subscribe_id(), 2);
+ EXPECT_TRUE(window.InWindow(FullSequence(4, 0)));
+ EXPECT_TRUE(window.InWindow(FullSequence(5, 5)));
+ EXPECT_FALSE(window.InWindow(FullSequence(5, 6)));
+ EXPECT_FALSE(window.InWindow(FullSequence(6, 0)));
+ EXPECT_FALSE(window.InWindow(FullSequence(3, 12)));
}
-TEST_F(SubscribeWindowTest, AddRemoveStream) {
- window_.AddStream(MoqtForwardingPreference::kTrack, 4, 0, 2);
- window_.AddStream(MoqtForwardingPreference::kGroup, 5, 0, 6);
- window_.AddStream(MoqtForwardingPreference::kObject, 5, 1, 10);
- window_.AddStream(MoqtForwardingPreference::kDatagram, 5, 2, 14);
- // This is a no-op; the stream does not exist.
- window_.RemoveStream(MoqtForwardingPreference::kGroup, 6, 0);
-
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
- MoqtForwardingPreference::kTrack),
- 2);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
- MoqtForwardingPreference::kGroup),
- std::nullopt);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
- MoqtForwardingPreference::kObject),
- std::nullopt);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
- MoqtForwardingPreference::kDatagram),
- std::nullopt);
-
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
- MoqtForwardingPreference::kTrack),
- 2);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
- MoqtForwardingPreference::kGroup),
- 6);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
- MoqtForwardingPreference::kObject),
- std::nullopt);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
- MoqtForwardingPreference::kDatagram),
- std::nullopt);
-
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
- MoqtForwardingPreference::kTrack),
- 2);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
- MoqtForwardingPreference::kGroup),
- 6);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
- MoqtForwardingPreference::kObject),
- 10);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
- MoqtForwardingPreference::kDatagram),
- 10);
-
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
- MoqtForwardingPreference::kTrack),
- 2);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
- MoqtForwardingPreference::kGroup),
- 6);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
- MoqtForwardingPreference::kObject),
- 14);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
- MoqtForwardingPreference::kDatagram),
- 14);
-
- window_.RemoveStream(MoqtForwardingPreference::kTrack, 4, 0);
- window_.RemoveStream(MoqtForwardingPreference::kObject, 5, 1);
- // kObject and kDatagram are interchangeable
- window_.RemoveStream(MoqtForwardingPreference::kObject, 5, 2);
- // The two commands above should not have deleted the group stream.
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
- MoqtForwardingPreference::kGroup),
- 6);
- window_.RemoveStream(MoqtForwardingPreference::kGroup, 5, 0);
-
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
- MoqtForwardingPreference::kTrack),
- std::nullopt);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
- MoqtForwardingPreference::kGroup),
- std::nullopt);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
- MoqtForwardingPreference::kObject),
- std::nullopt);
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
- MoqtForwardingPreference::kDatagram),
- std::nullopt);
+TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdTrack) {
+ SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kTrack,
+ start_group_, start_object_, end_group_, end_object_);
+ window.AddStream(4, 0, 2);
+ EXPECT_QUIC_BUG(window.AddStream(5, 2, 6), "Stream already added");
+ EXPECT_EQ(*window.GetStreamForSequence(FullSequence(5, 2)), 2);
+ window.RemoveStream(7, 2);
+ EXPECT_FALSE(window.GetStreamForSequence(FullSequence(4, 0)).has_value());
}
-TEST_F(SubscribeWindowTest, RemoveGroupBeforeObjects) {
- window_.AddStream(MoqtForwardingPreference::kGroup, 5, 0, 6);
- window_.AddStream(MoqtForwardingPreference::kObject, 5, 1, 10);
- window_.AddStream(MoqtForwardingPreference::kDatagram, 5, 2, 14);
- window_.RemoveStream(MoqtForwardingPreference::kGroup, 5, 0);
- // Object stream is not deleted when the root group stream is.
- EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
- MoqtForwardingPreference::kObject),
- 10);
- EXPECT_FALSE(window_
- .GetStreamForSequence(FullSequence(5, 0),
- MoqtForwardingPreference::kGroup)
- .has_value());
+TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdGroup) {
+ SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kGroup,
+ start_group_, start_object_, end_group_, end_object_);
+ window.AddStream(4, 0, 2);
+ EXPECT_FALSE(window.GetStreamForSequence(FullSequence(5, 0)).has_value());
+ window.AddStream(5, 2, 6);
+ EXPECT_QUIC_BUG(window.AddStream(5, 3, 6), "Stream already added");
+ EXPECT_EQ(*window.GetStreamForSequence(FullSequence(4, 1)), 2);
+ EXPECT_EQ(*window.GetStreamForSequence(FullSequence(5, 0)), 6);
+ window.RemoveStream(5, 1);
+ EXPECT_FALSE(window.GetStreamForSequence(FullSequence(5, 2)).has_value());
+}
+
+TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdObject) {
+ SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
+ start_group_, start_object_, end_group_, end_object_);
+ window.AddStream(4, 0, 2);
+ window.AddStream(4, 1, 6);
+ window.AddStream(4, 2, 10);
+ EXPECT_QUIC_BUG(window.AddStream(4, 2, 14), "Stream already added");
+ EXPECT_EQ(*window.GetStreamForSequence(FullSequence(4, 0)), 2);
+ EXPECT_EQ(*window.GetStreamForSequence(FullSequence(4, 2)), 10);
+ EXPECT_FALSE(window.GetStreamForSequence(FullSequence(4, 4)).has_value());
+ EXPECT_FALSE(window.GetStreamForSequence(FullSequence(5, 0)).has_value());
+ window.RemoveStream(4, 2);
+ EXPECT_FALSE(window.GetStreamForSequence(FullSequence(4, 2)).has_value());
+}
+
+TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdDatagram) {
+ SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kDatagram,
+ start_group_, start_object_, end_group_, end_object_);
+ EXPECT_QUIC_BUG(window.AddStream(4, 0, 2), "Adding a stream for datagram");
}
class QUICHE_EXPORT MoqtSubscribeWindowsTest : public quic::test::QuicTest {
public:
+ MoqtSubscribeWindowsTest() : windows_(MoqtForwardingPreference::kObject) {}
MoqtSubscribeWindows windows_;
};
TEST_F(MoqtSubscribeWindowsTest, IsEmpty) {
EXPECT_TRUE(windows_.IsEmpty());
- windows_.AddWindow(SubscribeWindow(0, 1, 3));
+ windows_.AddWindow(0, 1, 3);
EXPECT_FALSE(windows_.IsEmpty());
}
TEST_F(MoqtSubscribeWindowsTest, IsSubscribed) {
EXPECT_TRUE(windows_.IsEmpty());
// The first two windows overlap; the third is open-ended.
- windows_.AddWindow(SubscribeWindow(0, 1, 0, 3, 9));
- windows_.AddWindow(SubscribeWindow(1, 2, 4, 4, 3));
- windows_.AddWindow(SubscribeWindow(2, 10, 0));
+ windows_.AddWindow(0, 1, 0, 3, 9);
+ windows_.AddWindow(1, 2, 4, 4, 3);
+ windows_.AddWindow(2, 10, 0);
EXPECT_FALSE(windows_.IsEmpty());
EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(0, 8)).empty());
auto hits = windows_.SequenceIsSubscribed(FullSequence(1, 0));
@@ -166,7 +116,7 @@
}
TEST_F(MoqtSubscribeWindowsTest, AddGetRemoveWindow) {
- windows_.AddWindow(SubscribeWindow(0, 1, 0, 3, 9));
+ windows_.AddWindow(0, 1, 0, 3, 9);
SubscribeWindow* window = windows_.GetWindow(0);
EXPECT_EQ(window->subscribe_id(), 0);
EXPECT_EQ(windows_.GetWindow(1), nullptr);
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index e52b084..6193ccd 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -33,12 +33,19 @@
const SubscribeWindow& window) = 0;
};
// |visitor| must not be nullptr.
- LocalTrack(const FullTrackName& full_track_name, Visitor* visitor)
- : full_track_name_(full_track_name), visitor_(visitor) {}
+ LocalTrack(const FullTrackName& full_track_name,
+ MoqtForwardingPreference forwarding_preference, Visitor* visitor)
+ : full_track_name_(full_track_name),
+ forwarding_preference_(forwarding_preference),
+ windows_(forwarding_preference),
+ visitor_(visitor) {}
// Creates a LocalTrack that does not start at sequence (0,0)
- LocalTrack(const FullTrackName& full_track_name, Visitor* visitor,
+ LocalTrack(const FullTrackName& full_track_name,
+ MoqtForwardingPreference forwarding_preference, Visitor* visitor,
FullSequence next_sequence)
: full_track_name_(full_track_name),
+ forwarding_preference_(forwarding_preference),
+ windows_(forwarding_preference),
next_sequence_(next_sequence),
visitor_(visitor) {}
@@ -55,7 +62,18 @@
return windows_.SequenceIsSubscribed(sequence);
}
- void AddWindow(SubscribeWindow window) { windows_.AddWindow(window); }
+ void AddWindow(uint64_t subscribe_id, uint64_t start_group,
+ uint64_t start_object) {
+ windows_.AddWindow(subscribe_id, start_group, start_object);
+ }
+
+ void AddWindow(uint64_t subscribe_id, uint64_t start_group,
+ uint64_t start_object, uint64_t end_group,
+ uint64_t end_object) {
+ windows_.AddWindow(subscribe_id, start_group, start_object, end_group,
+ end_object);
+ }
+
void DeleteWindow(uint64_t subscribe_id) {
windows_.RemoveWindow(subscribe_id);
}
@@ -77,10 +95,16 @@
return windows_.GetWindow(subscribe_id);
}
+ MoqtForwardingPreference forwarding_preference() const {
+ return forwarding_preference_;
+ }
+
private:
// This only needs to track subscriptions to current and future objects;
// requests for objects in the past are forwarded to the application.
const FullTrackName full_track_name_;
+ // The forwarding preference for the track.
+ MoqtForwardingPreference forwarding_preference_;
// Let the first SUBSCRIBE determine the track alias.
std::optional<uint64_t> track_alias_;
// The sequence numbers from this track to which the peer is subscribed.
@@ -122,12 +146,25 @@
Visitor* visitor() { return visitor_; }
+ // When called while processing the first object in the track, sets the
+ // forwarding preference to the value indicated by the incoming encoding.
+ // Otherwise, returns true if the incoming object does not violate the rule
+ // that the preference is consistent.
+ bool CheckForwardingPreference(MoqtForwardingPreference preference) {
+ if (forwarding_preference_.has_value()) {
+ return forwarding_preference_.value() == preference;
+ }
+ forwarding_preference_ = preference;
+ return true;
+ }
+
private:
// TODO: There is no accounting for the number of outstanding subscribes,
// because we can't match track names to individual subscribes.
const FullTrackName full_track_name_;
const uint64_t track_alias_;
Visitor* visitor_;
+ std::optional<MoqtForwardingPreference> forwarding_preference_;
};
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 46fd3c5..36fab35 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -19,7 +19,8 @@
class LocalTrackTest : public quic::test::QuicTest {
public:
LocalTrackTest()
- : track_(FullTrackName("foo", "bar"), &visitor_, FullSequence(4, 1)) {}
+ : track_(FullTrackName("foo", "bar"), MoqtForwardingPreference::kTrack,
+ &visitor_, FullSequence(4, 1)) {}
LocalTrack track_;
MockLocalTrackVisitor visitor_;
};
@@ -34,6 +35,7 @@
track_.SentSequence(FullSequence(4, 1));
EXPECT_EQ(track_.next_sequence(), FullSequence(4, 2));
EXPECT_FALSE(track_.HasSubscriber());
+ EXPECT_EQ(track_.forwarding_preference(), MoqtForwardingPreference::kTrack);
}
TEST_F(LocalTrackTest, SetTrackAlias) {
@@ -43,7 +45,7 @@
}
TEST_F(LocalTrackTest, AddGetDeleteWindow) {
- track_.AddWindow(SubscribeWindow(0, 4, 1));
+ track_.AddWindow(0, 4, 1);
EXPECT_EQ(track_.GetWindow(0)->subscribe_id(), 0);
EXPECT_EQ(track_.GetWindow(1), nullptr);
track_.DeleteWindow(0);
@@ -51,7 +53,7 @@
}
TEST_F(LocalTrackTest, ShouldSend) {
- track_.AddWindow(SubscribeWindow(0, 4, 1));
+ track_.AddWindow(0, 4, 1);
EXPECT_TRUE(track_.HasSubscriber());
EXPECT_TRUE(track_.ShouldSend(FullSequence(3, 12)).empty());
EXPECT_TRUE(track_.ShouldSend(FullSequence(4, 0)).empty());
@@ -73,6 +75,15 @@
EXPECT_EQ(track_.visitor(), &visitor_);
}
+TEST_F(RemoteTrackTest, UpdateForwardingPreference) {
+ EXPECT_TRUE(
+ track_.CheckForwardingPreference(MoqtForwardingPreference::kObject));
+ EXPECT_TRUE(
+ track_.CheckForwardingPreference(MoqtForwardingPreference::kObject));
+ EXPECT_FALSE(
+ track_.CheckForwardingPreference(MoqtForwardingPreference::kDatagram));
+}
+
// TODO: Write test for GetStreamForSequence.
} // namespace test
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index 920fc15..38fd702 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -107,7 +107,6 @@
}
session_->PublishObject(my_track_name_, next_sequence_.group++,
next_sequence_.object, /*object_send_order=*/0,
- moqt::MoqtForwardingPreference::kObject,
input_message, true);
}
@@ -197,7 +196,8 @@
}
// By not sending a visitor, the application will not fulfill subscriptions
// to previous objects.
- session_->AddLocalTrack(my_track_name_, nullptr);
+ session_->AddLocalTrack(my_track_name_,
+ moqt::MoqtForwardingPreference::kObject, nullptr);
moqt::MoqtOutgoingAnnounceCallback announce_callback =
[&](absl::string_view track_namespace,
std::optional<moqt::MoqtAnnounceErrorReason> reason) {