Implement Key-Value-Pair struct from the MoQT spec.
Update all messages to match the spec as to what parameters can be included. Parse and frame of parameters is in key-value multimap that can be shared with extensions.
Renumber SETUP type to 0x20/0x21. Roll the version to draft-11, although this CL will interop with neither -10 nor -11.
PiperOrigin-RevId: 754055947
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index b59bc5f..430616e 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -9,20 +9,16 @@
#include <cstdlib>
#include <optional>
#include <string>
-#include <type_traits>
#include <utility>
-#include <variant>
#include <vector>
-#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
-#include "quiche/quic/core/quic_data_writer.h"
#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
-#include "quiche/quic/platform/api/quic_bug_tracker.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
@@ -41,135 +37,78 @@
using ::quiche::WireUint8;
using ::quiche::WireVarInt62;
-// Encoding for string parameters as described in
-// https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#name-parameters
-struct StringParameter {
- template <typename Enum>
- StringParameter(Enum type, absl::string_view data)
- : type(static_cast<uint64_t>(type)), data(data) {
- static_assert(std::is_enum_v<Enum>);
- }
-
- uint64_t type;
- absl::string_view data;
-};
-class WireStringParameter {
+class WireKeyVarIntPair {
public:
- using DataType = StringParameter;
+ explicit WireKeyVarIntPair(uint64_t key, uint64_t value)
+ : key_(key), value_(value) {}
- explicit WireStringParameter(const StringParameter& parameter)
- : parameter_(parameter) {}
size_t GetLengthOnWire() {
- return quiche::ComputeLengthOnWire(
- WireVarInt62(parameter_.type),
- WireStringWithVarInt62Length(parameter_.data));
+ return quiche::ComputeLengthOnWire(WireVarInt62(key_),
+ WireVarInt62(value_));
}
absl::Status SerializeIntoWriter(quiche::QuicheDataWriter& writer) {
- return quiche::SerializeIntoWriter(
- writer, WireVarInt62(parameter_.type),
- WireStringWithVarInt62Length(parameter_.data));
+ return quiche::SerializeIntoWriter(writer, WireVarInt62(key_),
+ WireVarInt62(value_));
}
private:
- const StringParameter& parameter_;
+ const uint64_t key_;
+ const uint64_t value_;
};
-// Encoding for integer parameters as described in
-// https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#name-parameters
-struct IntParameter {
- template <typename Enum, typename Param>
- IntParameter(Enum type, Param value)
- : type(static_cast<uint64_t>(type)), value(static_cast<uint64_t>(value)) {
- static_assert(std::is_enum_v<Enum>);
- static_assert(std::is_enum_v<Param> || std::is_unsigned_v<Param>);
- }
-
- uint64_t type;
- uint64_t value;
-};
-class WireIntParameter {
+class WireKeyStringPair {
public:
- using DataType = IntParameter;
-
- explicit WireIntParameter(const IntParameter& parameter)
- : parameter_(parameter) {}
+ explicit WireKeyStringPair(uint64_t key, absl::string_view value)
+ : key_(key), value_(value) {}
size_t GetLengthOnWire() {
- return quiche::ComputeLengthOnWire(
- WireVarInt62(parameter_.type),
- WireVarInt62(NeededVarIntLen(parameter_.value)),
- WireVarInt62(parameter_.value));
+ return quiche::ComputeLengthOnWire(WireVarInt62(key_),
+ WireStringWithVarInt62Length(value_));
}
absl::Status SerializeIntoWriter(quiche::QuicheDataWriter& writer) {
- return quiche::SerializeIntoWriter(
- writer, WireVarInt62(parameter_.type),
- WireVarInt62(NeededVarIntLen(parameter_.value)),
- WireVarInt62(parameter_.value));
+ return quiche::SerializeIntoWriter(writer, WireVarInt62(key_),
+ WireStringWithVarInt62Length(value_));
}
private:
- size_t NeededVarIntLen(const uint64_t value) {
- return static_cast<size_t>(quic::QuicDataWriter::GetVarInt62Len(value));
- }
-
- const IntParameter& parameter_;
+ const uint64_t key_;
+ const absl::string_view value_;
};
-class WireSubscribeParameterList {
+class WireKeyValuePairList {
public:
- explicit WireSubscribeParameterList(const MoqtSubscribeParameters& list)
- : list_(list) {}
+ explicit WireKeyValuePairList(const KeyValuePairList& list) : list_(list) {}
size_t GetLengthOnWire() {
- auto string_parameters = StringParameters();
- auto int_parameters = IntParameters();
- return quiche::ComputeLengthOnWire(
- WireVarInt62(string_parameters.size() + int_parameters.size()),
- WireSpan<WireStringParameter>(string_parameters),
- WireSpan<WireIntParameter>(int_parameters));
+ size_t total = WireVarInt62(list_.size()).GetLengthOnWire();
+ list_.ForEach(
+ [&](uint64_t key, uint64_t value) {
+ total += WireKeyVarIntPair(key, value).GetLengthOnWire();
+ return true;
+ },
+ [&](uint64_t key, absl::string_view value) {
+ total += WireKeyStringPair(key, value).GetLengthOnWire();
+ return true;
+ });
+ return total;
}
absl::Status SerializeIntoWriter(quiche::QuicheDataWriter& writer) {
- auto string_parameters = StringParameters();
- auto int_parameters = IntParameters();
- return quiche::SerializeIntoWriter(
- writer, WireVarInt62(string_parameters.size() + int_parameters.size()),
- WireSpan<WireStringParameter>(string_parameters),
- WireSpan<WireIntParameter>(int_parameters));
+ WireVarInt62(list_.size()).SerializeIntoWriter(writer);
+ list_.ForEach(
+ [&](uint64_t key, uint64_t value) {
+ absl::Status status =
+ WireKeyVarIntPair(key, value).SerializeIntoWriter(writer);
+ return quiche::IsWriterStatusOk(status);
+ },
+ [&](uint64_t key, absl::string_view value) {
+ absl::Status status =
+ WireKeyStringPair(key, value).SerializeIntoWriter(writer);
+ return quiche::IsWriterStatusOk(status);
+ });
+ return absl::OkStatus();
}
private:
- absl::InlinedVector<StringParameter, 1> StringParameters() const {
- absl::InlinedVector<StringParameter, 1> result;
- if (list_.authorization_info.has_value()) {
- result.push_back(
- StringParameter(MoqtTrackRequestParameter::kAuthorizationInfo,
- *list_.authorization_info));
- }
- return result;
- }
- absl::InlinedVector<IntParameter, 3> IntParameters() const {
- absl::InlinedVector<IntParameter, 3> result;
- if (list_.delivery_timeout.has_value()) {
- QUICHE_DCHECK_GE(*list_.delivery_timeout, quic::QuicTimeDelta::Zero());
- result.push_back(IntParameter(
- MoqtTrackRequestParameter::kDeliveryTimeout,
- static_cast<uint64_t>(list_.delivery_timeout->ToMilliseconds())));
- }
- if (list_.max_cache_duration.has_value()) {
- QUICHE_DCHECK_GE(*list_.max_cache_duration, quic::QuicTimeDelta::Zero());
- result.push_back(IntParameter(
- MoqtTrackRequestParameter::kMaxCacheDuration,
- static_cast<uint64_t>(list_.max_cache_duration->ToMilliseconds())));
- }
- if (list_.object_ack_window.has_value()) {
- QUICHE_DCHECK_GE(*list_.object_ack_window, quic::QuicTimeDelta::Zero());
- result.push_back(IntParameter(
- MoqtTrackRequestParameter::kOackWindowSize,
- static_cast<uint64_t>(list_.object_ack_window->ToMicroseconds())));
- }
- return result;
- }
-
- const MoqtSubscribeParameters& list_;
+ const KeyValuePairList& list_;
};
class WireFullTrackName {
@@ -261,18 +200,63 @@
return value << 1;
}
+void SessionParametersToKeyValuePairList(
+ const MoqtSessionParameters& parameters, KeyValuePairList& out) {
+ if (!parameters.using_webtrans &&
+ parameters.perspective == quic::Perspective::IS_CLIENT) {
+ out.insert(SetupParameter::kPath, parameters.path);
+ }
+ if (parameters.max_subscribe_id > 0) {
+ out.insert(SetupParameter::kMaxRequestId, parameters.max_subscribe_id);
+ }
+ if (parameters.max_auth_token_cache_size > 0) {
+ out.insert(SetupParameter::kMaxAuthTokenCacheSize,
+ parameters.max_auth_token_cache_size);
+ }
+ if (parameters.support_object_acks) {
+ out.insert(SetupParameter::kSupportObjectAcks, 1ULL);
+ }
+}
+
+void VersionSpecificParametersToKeyValuePairList(
+ const VersionSpecificParameters& parameters, KeyValuePairList& out) {
+ out.clear();
+ for (const auto& it : parameters.authorization_token) {
+ out.insert(VersionSpecificParameter::kAuthorizationToken, it);
+ }
+ if (!parameters.delivery_timeout.IsInfinite()) {
+ out.insert(
+ VersionSpecificParameter::kDeliveryTimeout,
+ static_cast<uint64_t>(parameters.delivery_timeout.ToMilliseconds()));
+ }
+ if (parameters.authorization_info.has_value()) {
+ out.insert(VersionSpecificParameter::kAuthorizationInfo,
+ *parameters.authorization_info);
+ }
+ if (!parameters.max_cache_duration.IsInfinite()) {
+ out.insert(
+ VersionSpecificParameter::kMaxCacheDuration,
+ static_cast<uint64_t>(parameters.max_cache_duration.ToMilliseconds()));
+ }
+ if (parameters.oack_window_size.has_value()) {
+ out.insert(
+ VersionSpecificParameter::kOackWindowSize,
+ static_cast<uint64_t>(parameters.oack_window_size->ToMicroseconds()));
+ }
+}
+
} // namespace
quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader(
const MoqtObject& message, MoqtDataStreamType message_type,
bool is_first_in_stream) {
if (!ValidateObjectMetadata(message, /*is_datagram=*/false)) {
- QUIC_BUG(quic_bug_serialize_object_header_01)
+ QUICHE_BUG(QUICHE_BUG_serialize_object_header_01)
<< "Object metadata is invalid";
return quiche::QuicheBuffer();
}
if (!message.subgroup_id.has_value()) {
- QUIC_BUG(quic_bug_serialize_object_header_02)
+ QUICHE_BUG(QUICHE_BUG_serialize_object_header_02)
<< "Subgroup ID is not set on data stream";
return quiche::QuicheBuffer();
}
@@ -365,12 +349,12 @@
quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram(
const MoqtObject& message, absl::string_view payload) {
if (!ValidateObjectMetadata(message, /*is_datagram=*/true)) {
- QUIC_BUG(quic_bug_serialize_object_datagram_01)
+ QUICHE_BUG(QUICHE_BUG_serialize_object_datagram_01)
<< "Object metadata is invalid";
return quiche::QuicheBuffer();
}
if (message.payload_length != payload.length()) {
- QUIC_BUG(quic_bug_serialize_object_datagram_03)
+ QUICHE_BUG(QUICHE_BUG_serialize_object_datagram_03)
<< "Payload length does not match payload";
return quiche::QuicheBuffer();
}
@@ -392,44 +376,34 @@
quiche::QuicheBuffer MoqtFramer::SerializeClientSetup(
const MoqtClientSetup& message) {
- absl::InlinedVector<IntParameter, 1> int_parameters;
- absl::InlinedVector<StringParameter, 1> string_parameters;
- if (message.max_subscribe_id.has_value()) {
- int_parameters.push_back(IntParameter(MoqtSetupParameter::kMaxSubscribeId,
- *message.max_subscribe_id));
- }
- if (message.supports_object_ack) {
- int_parameters.push_back(
- IntParameter(MoqtSetupParameter::kSupportObjectAcks, 1u));
- }
- if (!using_webtrans_ && message.path.has_value()) {
- string_parameters.push_back(
- StringParameter(MoqtSetupParameter::kPath, *message.path));
+ KeyValuePairList parameters;
+ SessionParametersToKeyValuePairList(message.parameters, parameters);
+ if (!ValidateSetupParameters(parameters, using_webtrans_,
+ quic::Perspective::IS_SERVER)) {
+ QUICHE_BUG(QUICHE_BUG_invalid_parameters)
+ << "Serializing invalid MoQT parameters";
+ return quiche::QuicheBuffer();
}
return SerializeControlMessage(
MoqtMessageType::kClientSetup,
WireVarInt62(message.supported_versions.size()),
WireSpan<WireVarInt62, MoqtVersion>(message.supported_versions),
- WireVarInt62(string_parameters.size() + int_parameters.size()),
- WireSpan<WireIntParameter>(int_parameters),
- WireSpan<WireStringParameter>(string_parameters));
+ WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeServerSetup(
const MoqtServerSetup& message) {
- absl::InlinedVector<IntParameter, 1> int_parameters;
- if (message.max_subscribe_id.has_value()) {
- int_parameters.push_back(IntParameter(MoqtSetupParameter::kMaxSubscribeId,
- *message.max_subscribe_id));
- }
- if (message.supports_object_ack) {
- int_parameters.push_back(
- IntParameter(MoqtSetupParameter::kSupportObjectAcks, 1u));
+ KeyValuePairList parameters;
+ SessionParametersToKeyValuePairList(message.parameters, parameters);
+ if (!ValidateSetupParameters(parameters, using_webtrans_,
+ quic::Perspective::IS_CLIENT)) {
+ QUICHE_BUG(QUICHE_BUG_invalid_parameters)
+ << "Serializing invalid MoQT parameters";
+ return quiche::QuicheBuffer();
}
return SerializeControlMessage(MoqtMessageType::kServerSetup,
WireVarInt62(message.selected_version),
- WireVarInt62(int_parameters.size()),
- WireSpan<WireIntParameter>(int_parameters));
+ WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeSubscribe(
@@ -439,6 +413,14 @@
QUICHE_BUG(MoqtFramer_invalid_subscribe) << "Invalid object range";
return quiche::QuicheBuffer();
}
+ KeyValuePairList parameters;
+ VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kSubscribe)) {
+ QUICHE_BUG(QUICHE_BUG_invalid_parameters)
+ << "Serializing invalid MoQT parameters";
+ return quiche::QuicheBuffer();
+ }
switch (filter_type) {
case MoqtFilterType::kLatestObject:
return SerializeControlMessage(
@@ -447,7 +429,7 @@
WireFullTrackName(message.full_track_name, true),
WireUint8(message.subscriber_priority),
WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
- WireSubscribeParameterList(message.parameters));
+ WireKeyValuePairList(parameters));
case MoqtFilterType::kAbsoluteStart:
return SerializeControlMessage(
MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id),
@@ -457,7 +439,7 @@
WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
WireVarInt62(message.start->group),
WireVarInt62(message.start->object),
- WireSubscribeParameterList(message.parameters));
+ WireKeyValuePairList(parameters));
case MoqtFilterType::kAbsoluteRange:
return SerializeControlMessage(
MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id),
@@ -467,7 +449,7 @@
WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
WireVarInt62(message.start->group),
WireVarInt62(message.start->object), WireVarInt62(*message.end_group),
- WireSubscribeParameterList(message.parameters));
+ WireKeyValuePairList(parameters));
default:
QUICHE_BUG(MoqtFramer_end_group_missing) << "Subscribe framing error.";
return quiche::QuicheBuffer();
@@ -476,9 +458,13 @@
quiche::QuicheBuffer MoqtFramer::SerializeSubscribeOk(
const MoqtSubscribeOk& message) {
- if (message.parameters.authorization_info.has_value()) {
- QUICHE_BUG(MoqtFramer_invalid_subscribe_ok)
- << "SUBSCRIBE_OK with delivery timeout";
+ KeyValuePairList parameters;
+ VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kSubscribeOk)) {
+ QUICHE_BUG(QUICHE_BUG_invalid_parameters)
+ << "Serializing invalid MoQT parameters";
+ return quiche::QuicheBuffer();
}
if (message.largest_id.has_value()) {
return SerializeControlMessage(
@@ -487,13 +473,13 @@
WireDeliveryOrder(message.group_order), WireUint8(1),
WireVarInt62(message.largest_id->group),
WireVarInt62(message.largest_id->object),
- WireSubscribeParameterList(message.parameters));
+ WireKeyValuePairList(parameters));
}
return SerializeControlMessage(
MoqtMessageType::kSubscribeOk, WireVarInt62(message.subscribe_id),
WireVarInt62(message.expires.ToMilliseconds()),
WireDeliveryOrder(message.group_order), WireUint8(0),
- WireSubscribeParameterList(message.parameters));
+ WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeSubscribeError(
@@ -521,9 +507,13 @@
quiche::QuicheBuffer MoqtFramer::SerializeSubscribeUpdate(
const MoqtSubscribeUpdate& message) {
- if (message.parameters.authorization_info.has_value()) {
- QUICHE_BUG(MoqtFramer_invalid_subscribe_update)
- << "SUBSCRIBE_UPDATE with authorization info";
+ KeyValuePairList parameters;
+ VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kSubscribeUpdate)) {
+ QUICHE_BUG(QUICHE_BUG_invalid_parameters)
+ << "Serializing invalid MoQT parameters";
+ return quiche::QuicheBuffer();
}
uint64_t end_group =
message.end_group.has_value() ? *message.end_group + 1 : 0;
@@ -531,18 +521,23 @@
MoqtMessageType::kSubscribeUpdate, WireVarInt62(message.subscribe_id),
WireVarInt62(message.start.group), WireVarInt62(message.start.object),
WireVarInt62(end_group), WireUint8(message.subscriber_priority),
- WireSubscribeParameterList(message.parameters));
+ WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeAnnounce(
const MoqtAnnounce& message) {
- if (message.parameters.delivery_timeout.has_value()) {
- QUICHE_BUG(MoqtFramer_invalid_announce) << "ANNOUNCE with delivery timeout";
+ KeyValuePairList parameters;
+ VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kAnnounce)) {
+ QUICHE_BUG(QUICHE_BUG_invalid_parameters)
+ << "Serializing invalid MoQT parameters";
+ return quiche::QuicheBuffer();
}
return SerializeControlMessage(
MoqtMessageType::kAnnounce,
WireFullTrackName(message.track_namespace, false),
- WireSubscribeParameterList(message.parameters));
+ WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeAnnounceOk(
@@ -572,9 +567,18 @@
quiche::QuicheBuffer MoqtFramer::SerializeTrackStatusRequest(
const MoqtTrackStatusRequest& message) {
+ KeyValuePairList parameters;
+ VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
+ if (!ValidateVersionSpecificParameters(
+ parameters, MoqtMessageType::kTrackStatusRequest)) {
+ QUICHE_BUG(QUICHE_BUG_invalid_parameters)
+ << "Serializing invalid MoQT parameters";
+ return quiche::QuicheBuffer();
+ }
return SerializeControlMessage(
MoqtMessageType::kTrackStatusRequest,
- WireFullTrackName(message.full_track_name, true));
+ WireFullTrackName(message.full_track_name, true),
+ WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeUnannounce(
@@ -586,11 +590,19 @@
quiche::QuicheBuffer MoqtFramer::SerializeTrackStatus(
const MoqtTrackStatus& message) {
+ KeyValuePairList parameters;
+ VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kTrackStatus)) {
+ QUICHE_BUG(QUICHE_BUG_invalid_parameters)
+ << "Serializing invalid MoQT parameters";
+ return quiche::QuicheBuffer();
+ }
return SerializeControlMessage(
MoqtMessageType::kTrackStatus,
WireFullTrackName(message.full_track_name, true),
WireVarInt62(message.status_code), WireVarInt62(message.last_group),
- WireVarInt62(message.last_object));
+ WireVarInt62(message.last_object), WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeGoAway(const MoqtGoAway& message) {
@@ -601,10 +613,18 @@
quiche::QuicheBuffer MoqtFramer::SerializeSubscribeAnnounces(
const MoqtSubscribeAnnounces& message) {
+ KeyValuePairList parameters;
+ VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
+ if (!ValidateVersionSpecificParameters(
+ parameters, MoqtMessageType::kSubscribeAnnounces)) {
+ QUICHE_BUG(QUICHE_BUG_invalid_parameters)
+ << "Serializing invalid MoQT parameters";
+ return quiche::QuicheBuffer();
+ }
return SerializeControlMessage(
MoqtMessageType::kSubscribeAnnounces,
WireFullTrackName(message.track_namespace, false),
- WireSubscribeParameterList(message.parameters));
+ WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeSubscribeAnnouncesOk(
@@ -645,6 +665,13 @@
QUICHE_BUG(MoqtFramer_invalid_fetch) << "Invalid FETCH object range";
return quiche::QuicheBuffer();
}
+ KeyValuePairList parameters;
+ VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
+ if (!ValidateVersionSpecificParameters(parameters, MoqtMessageType::kFetch)) {
+ QUICHE_BUG(QUICHE_BUG_invalid_parameters)
+ << "Serializing invalid MoQT parameters";
+ return quiche::QuicheBuffer();
+ }
if (message.joining_fetch.has_value()) {
return SerializeControlMessage(
MoqtMessageType::kFetch, WireVarInt62(message.fetch_id),
@@ -653,7 +680,7 @@
WireVarInt62(FetchType::kJoining),
WireVarInt62(message.joining_fetch->joining_subscribe_id),
WireVarInt62(message.joining_fetch->preceding_group_offset),
- WireSubscribeParameterList(message.parameters));
+ WireKeyValuePairList(parameters));
}
return SerializeControlMessage(
MoqtMessageType::kFetch, WireVarInt62(message.fetch_id),
@@ -666,7 +693,7 @@
WireVarInt62(message.end_group),
WireVarInt62(message.end_object.has_value() ? *message.end_object + 1
: 0),
- WireSubscribeParameterList(message.parameters));
+ WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel(
@@ -676,12 +703,20 @@
}
quiche::QuicheBuffer MoqtFramer::SerializeFetchOk(const MoqtFetchOk& message) {
- return SerializeControlMessage(
- MoqtMessageType::kFetchOk, WireVarInt62(message.subscribe_id),
- WireDeliveryOrder(message.group_order),
- WireVarInt62(message.largest_id.group),
- WireVarInt62(message.largest_id.object),
- WireSubscribeParameterList(message.parameters));
+ KeyValuePairList parameters;
+ VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kFetchOk)) {
+ QUICHE_BUG(QUICHE_BUG_invalid_parameters)
+ << "Serializing invalid MoQT parameters";
+ return quiche::QuicheBuffer();
+ }
+ return SerializeControlMessage(MoqtMessageType::kFetchOk,
+ WireVarInt62(message.subscribe_id),
+ WireDeliveryOrder(message.group_order),
+ WireVarInt62(message.largest_id.group),
+ WireVarInt62(message.largest_id.object),
+ WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeFetchError(
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index 9895416..690418b 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -415,8 +415,7 @@
/*group_order=*/std::nullopt,
start,
end_group,
- MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt,
- std::nullopt},
+ VersionSpecificParameters("bar"),
};
quiche::QuicheBuffer buffer;
MoqtFilterType expected_filter_type = GetFilterType(subscribe);
@@ -444,7 +443,7 @@
/*group_order=*/std::nullopt,
/*start=*/Location(4, 3),
/*end_group=*/3,
- MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, std::nullopt},
+ VersionSpecificParameters("bar"),
};
quiche::QuicheBuffer buffer;
EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe),
@@ -463,7 +462,7 @@
/*end_group=*/1,
/*end_object=*/1,
/*parameters=*/
- MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt},
+ VersionSpecificParameters("baz"),
};
quiche::QuicheBuffer buffer;
EXPECT_QUIC_BUG(buffer = framer_.SerializeFetch(fetch),
@@ -482,8 +481,7 @@
/*start=*/Location(4, 3),
/*end_group=*/4,
/*subscriber_priority=*/0xaa,
- MoqtSubscribeParameters{std::nullopt, std::nullopt, std::nullopt,
- std::nullopt},
+ VersionSpecificParameters(),
};
quiche::QuicheBuffer buffer;
buffer = framer_.SerializeSubscribeUpdate(subscribe_update);
@@ -498,8 +496,7 @@
/*start=*/Location(4, 3),
/*end_group=*/4,
/*subscriber_priority=*/0xaa,
- MoqtSubscribeParameters{std::nullopt, std::nullopt, std::nullopt,
- std::nullopt},
+ VersionSpecificParameters(),
};
quiche::QuicheBuffer buffer;
buffer = framer_.SerializeSubscribeUpdate(subscribe_update);
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 96b36f3..73a67d6 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -78,7 +78,7 @@
std::optional<absl::string_view>()))
.WillOnce([&]() { received_ok = true; });
client_->session()->SubscribeCurrentObject(track_name, visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
@@ -144,7 +144,8 @@
std::optional<MoqtAnnounceErrorReason> error_message)>
announce_callback;
client_->session()->Announce(FullTrackName{"foo"},
- announce_callback.AsStdFunction());
+ announce_callback.AsStdFunction(),
+ VersionSpecificParameters());
bool matches = false;
EXPECT_CALL(announce_callback, Call(_, _))
.WillOnce([&](FullTrackName track_namespace,
@@ -179,7 +180,8 @@
std::optional<MoqtAnnounceErrorReason> error_message)>
announce_callback;
client_->session()->Announce(FullTrackName{"foo"},
- announce_callback.AsStdFunction());
+ announce_callback.AsStdFunction(),
+ VersionSpecificParameters());
bool matches = false;
EXPECT_CALL(announce_callback, Call(_, _))
.WillOnce([&](FullTrackName track_namespace,
@@ -219,7 +221,8 @@
std::optional<MoqtAnnounceErrorReason> error_message)>
announce_callback;
client_->session()->Announce(FullTrackName{"foo"},
- announce_callback.AsStdFunction());
+ announce_callback.AsStdFunction(),
+ VersionSpecificParameters());
bool matches = false;
EXPECT_CALL(announce_callback, Call(_, _))
.WillOnce([&](FullTrackName track_namespace,
@@ -229,7 +232,7 @@
track_name.AddElement("/catalog");
EXPECT_FALSE(error.has_value());
server_->session()->SubscribeCurrentObject(track_name, &server_visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
});
EXPECT_CALL(server_visitor, OnReply(_, _, _)).WillOnce([&]() {
matches = true;
@@ -253,7 +256,7 @@
track_name.AddElement("data");
server_->session()->SubscribeAbsolute(
track_name, /*start_group=*/0, /*start_object=*/0, &server_visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
return std::optional<MoqtAnnounceErrorReason>();
});
@@ -268,7 +271,8 @@
});
client_->session()->Announce(
FullTrackName{"test"},
- [](FullTrackName, std::optional<MoqtAnnounceErrorReason>) {});
+ [](FullTrackName, std::optional<MoqtAnnounceErrorReason>) {},
+ VersionSpecificParameters());
bool success = test_harness_.RunUntilWithDefaultTimeout(
[&]() { return received_subscribe_ok; });
EXPECT_TRUE(success);
@@ -316,7 +320,7 @@
queue->AddObject(MemSliceFromString("object 3"), /*key=*/false);
client_->session()->SubscribeCurrentObject(FullTrackName("test", name),
&client_visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
std::optional<Location> largest_id;
EXPECT_CALL(client_visitor, OnReply)
.WillOnce([&](const FullTrackName& /*name*/, std::optional<Location> id,
@@ -405,7 +409,7 @@
full_track_name,
[&](std::unique_ptr<MoqtFetchTask> task) { fetch = std::move(task); },
Location{0, 0}, 99, std::nullopt, 128, std::nullopt,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
// Run until we get FETCH_OK.
bool success = test_harness_.RunUntilWithDefaultTimeout(
[&]() { return fetch != nullptr; });
@@ -444,7 +448,8 @@
std::optional<MoqtAnnounceErrorReason> error_message)>
announce_callback;
client_->session()->Announce(FullTrackName{"foo"},
- announce_callback.AsStdFunction());
+ announce_callback.AsStdFunction(),
+ VersionSpecificParameters());
bool matches = false;
EXPECT_CALL(announce_callback, Call(_, _))
.WillOnce([&](FullTrackName track_namespace,
@@ -479,7 +484,7 @@
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
client_->session()->SubscribeAbsolute(full_track_name, 0, 0, &client_visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
@@ -505,7 +510,7 @@
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
@@ -531,7 +536,7 @@
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
@@ -546,7 +551,7 @@
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
@@ -579,7 +584,7 @@
// Reject this subscribe because there already is one.
EXPECT_FALSE(client_->session()->SubscribeCurrentObject(
- full_track_name, &client_visitor, MoqtSubscribeParameters()));
+ full_track_name, &client_visitor, VersionSpecificParameters()));
queue->RemoveAllSubscriptions(); // Induce a SUBSCRIBE_DONE.
bool subscribe_done = false;
EXPECT_CALL(client_visitor, OnSubscribeDone).WillOnce([&]() {
@@ -591,7 +596,7 @@
// Subscription is deleted; the client session should not immediately reject
// a new attempt.
EXPECT_TRUE(client_->session()->SubscribeCurrentObject(
- full_track_name, &client_visitor, MoqtSubscribeParameters()));
+ full_track_name, &client_visitor, VersionSpecificParameters()));
}
TEST_F(MoqtIntegrationTest, ObjectAcks) {
@@ -629,8 +634,8 @@
ack_function(100, 200, quic::QuicTimeDelta::FromMicroseconds(456));
});
- MoqtSubscribeParameters parameters;
- parameters.object_ack_window = quic::QuicTimeDelta::FromMilliseconds(100);
+ VersionSpecificParameters parameters;
+ parameters.oack_window_size = quic::QuicTimeDelta::FromMilliseconds(100);
client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor,
parameters);
EXPECT_CALL(monitoring, OnObjectAckSupportKnown(true));
@@ -665,7 +670,7 @@
bool received_ok = false;
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
- MoqtSubscribeParameters parameters;
+ VersionSpecificParameters parameters;
// Set delivery timeout to ~ 1 RTT: any loss is fatal.
parameters.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(100);
client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor,
@@ -715,7 +720,7 @@
bool received_ok = false;
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
- MoqtSubscribeParameters parameters;
+ VersionSpecificParameters parameters;
// Set delivery timeout to ~ 1 RTT: any loss is fatal.
parameters.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(100);
client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor,
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index f2e9f7e..eba04a3 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -4,23 +4,86 @@
#include "quiche/quic/moqt/moqt_messages.h"
+#include <array>
+#include <cstddef>
#include <cstdint>
+#include <optional>
#include <string>
+#include <utility>
#include <vector>
#include "absl/algorithm/container.h"
+#include "absl/container/btree_map.h"
#include "absl/status/status.h"
#include "absl/strings/escaping.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
+#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/platform/api/quic_bug_tracker.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
+void KeyValuePairList::insert(uint64_t key, absl::string_view value) {
+ if (key % 2 == 0) {
+ QUICHE_BUG(key_value_pair_string_is_even) << "Key value pair of wrong type";
+ return;
+ }
+ string_map_.emplace(key, value);
+}
+
+void KeyValuePairList::insert(uint64_t key, uint64_t value) {
+ if (key % 2 == 1) {
+ QUICHE_BUG(key_value_pair_int_is_odd) << "Key value pair of wrong type";
+ return;
+ }
+ integer_map_.emplace(key, value);
+}
+
+size_t KeyValuePairList::count(uint64_t key) const {
+ if (key % 2 == 0) {
+ return integer_map_.count(key);
+ }
+ return string_map_.count(key);
+}
+
+bool KeyValuePairList::contains(uint64_t key) const {
+ if (key % 2 == 0) {
+ return integer_map_.contains(key);
+ }
+ return string_map_.contains(key);
+}
+
+std::vector<uint64_t> KeyValuePairList::GetIntegers(uint64_t key) const {
+ if (key % 2 == 1) {
+ QUICHE_BUG(key_value_pair_int_is_odd) << "Key value pair of wrong type";
+ return {};
+ }
+ std::vector<uint64_t> result;
+ auto [range_start, range_end] = integer_map_.equal_range(key);
+ for (auto& it = range_start; it != range_end; ++it) {
+ result.push_back(it->second);
+ }
+ return result;
+}
+
+std::vector<absl::string_view> KeyValuePairList::GetStrings(
+ uint64_t key) const {
+ if (key % 2 == 0) {
+ QUICHE_BUG(key_value_pair_string_is_even) << "Key value pair of wrong type";
+ return {};
+ }
+ std::vector<absl::string_view> result;
+ auto [range_start, range_end] = string_map_.equal_range(key);
+ for (auto& it = range_start; it != range_end; ++it) {
+ result.push_back(it->second);
+ }
+ return result;
+}
+
MoqtObjectStatus IntegerToObjectStatus(uint64_t integer) {
if (integer >=
static_cast<uint64_t>(MoqtObjectStatus::kInvalidObjectStatus)) {
@@ -45,6 +108,71 @@
return MoqtFilterType::kLatestObject;
}
+bool ValidateSetupParameters(const KeyValuePairList& parameters, bool webtrans,
+ quic::Perspective perspective) {
+ if (parameters.count(SetupParameter::kPath) > 1 ||
+ parameters.count(SetupParameter::kMaxRequestId) > 1 ||
+ parameters.count(SetupParameter::kMaxAuthTokenCacheSize) > 1 ||
+ parameters.count(SetupParameter::kSupportObjectAcks) > 1) {
+ return false;
+ }
+ if ((webtrans || perspective == quic::Perspective::IS_CLIENT) ==
+ parameters.contains(SetupParameter::kPath)) {
+ // Only non-webtrans servers should receive kPath.
+ return false;
+ }
+ if (!parameters.contains(SetupParameter::kSupportObjectAcks)) {
+ return true;
+ }
+ std::vector<uint64_t> support_object_acks =
+ parameters.GetIntegers(SetupParameter::kSupportObjectAcks);
+ QUICHE_DCHECK(support_object_acks.size() == 1);
+ if (support_object_acks.front() > 1) {
+ return false;
+ }
+ return true;
+}
+
+const std::array<MoqtMessageType, 5> kAllowsAuthorization = {
+ MoqtMessageType::kSubscribe, MoqtMessageType::kTrackStatusRequest,
+ MoqtMessageType::kFetch, MoqtMessageType::kSubscribeAnnounces,
+ MoqtMessageType::kAnnounce};
+const std::array<MoqtMessageType, 4> kAllowsDeliveryTimeout = {
+ MoqtMessageType::kSubscribe, MoqtMessageType::kSubscribeOk,
+ MoqtMessageType::kSubscribeUpdate, MoqtMessageType::kTrackStatus};
+const std::array<MoqtMessageType, 3> kAllowsMaxCacheDuration = {
+ MoqtMessageType::kSubscribeOk, MoqtMessageType::kTrackStatus,
+ MoqtMessageType::kFetchOk};
+bool ValidateVersionSpecificParameters(const KeyValuePairList& parameters,
+ MoqtMessageType message_type) {
+ size_t authorization_token =
+ parameters.count(VersionSpecificParameter::kAuthorizationToken);
+ size_t delivery_timeout =
+ parameters.count(VersionSpecificParameter::kDeliveryTimeout);
+ size_t authorization_info =
+ parameters.count(VersionSpecificParameter::kAuthorizationInfo);
+ size_t max_cache_duration =
+ parameters.count(VersionSpecificParameter::kMaxCacheDuration);
+ if (delivery_timeout > 1 || authorization_info > 1 ||
+ max_cache_duration > 1) {
+ // Disallowed duplicate.
+ return false;
+ }
+ if ((authorization_token > 0 || authorization_info > 0) &&
+ !absl::c_linear_search(kAllowsAuthorization, message_type)) {
+ return false;
+ }
+ if (delivery_timeout > 0 &&
+ !absl::c_linear_search(kAllowsDeliveryTimeout, message_type)) {
+ return false;
+ }
+ if (max_cache_duration > 0 &&
+ !absl::c_linear_search(kAllowsMaxCacheDuration, message_type)) {
+ return false;
+ }
+ return true;
+}
+
std::string MoqtMessageTypeToString(const MoqtMessageType message_type) {
switch (message_type) {
case MoqtMessageType::kClientSetup:
@@ -52,7 +180,7 @@
case MoqtMessageType::kServerSetup:
return "SERVER_SETUP";
case MoqtMessageType::kSubscribe:
- return "SUBSCRIBE_REQUEST";
+ return "SUBSCRIBE";
case MoqtMessageType::kSubscribeOk:
return "SUBSCRIBE_OK";
case MoqtMessageType::kSubscribeError:
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index cbcd4d6..41809c7 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -13,9 +13,9 @@
#include <optional>
#include <string>
#include <utility>
-#include <variant>
#include <vector>
+#include "absl/container/btree_map.h"
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
@@ -27,6 +27,7 @@
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_callbacks.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
@@ -36,32 +37,52 @@
}
enum class MoqtVersion : uint64_t {
- kDraft10 = 0xff00000a,
+ kDraft11 = 0xff00000b,
kUnrecognizedVersionForTests = 0xfe0000ff,
};
-inline constexpr MoqtVersion kDefaultMoqtVersion = MoqtVersion::kDraft10;
+inline constexpr MoqtVersion kDefaultMoqtVersion = MoqtVersion::kDraft11;
inline constexpr uint64_t kDefaultInitialMaxSubscribeId = 100;
+// TODO(martinduke): Implement an auth token cache.
+inline constexpr uint64_t kDefaultMaxAuthTokenCacheSize = 0;
inline constexpr uint64_t kMinNamespaceElements = 1;
inline constexpr uint64_t kMaxNamespaceElements = 32;
struct QUICHE_EXPORT MoqtSessionParameters {
// TODO: support multiple versions.
- // TODO: support roles other than PubSub.
-
+ MoqtSessionParameters() = default;
explicit MoqtSessionParameters(quic::Perspective perspective)
: perspective(perspective), using_webtrans(true) {}
MoqtSessionParameters(quic::Perspective perspective, std::string path)
: perspective(perspective),
using_webtrans(false),
path(std::move(path)) {}
+ MoqtSessionParameters(quic::Perspective perspective, std::string path,
+ uint64_t max_subscribe_id)
+ : perspective(perspective),
+ using_webtrans(true),
+ path(std::move(path)),
+ max_subscribe_id(max_subscribe_id) {}
+ MoqtSessionParameters(quic::Perspective perspective,
+ uint64_t max_subscribe_id)
+ : perspective(perspective), max_subscribe_id(max_subscribe_id) {}
+ bool operator==(const MoqtSessionParameters& other) {
+ return version == other.version &&
+ deliver_partial_objects == other.deliver_partial_objects &&
+ perspective == other.perspective &&
+ using_webtrans == other.using_webtrans && path == other.path &&
+ max_subscribe_id == other.max_subscribe_id &&
+ max_auth_token_cache_size == other.max_auth_token_cache_size &&
+ support_object_acks == other.support_object_acks;
+ }
MoqtVersion version = kDefaultMoqtVersion;
- quic::Perspective perspective;
- bool using_webtrans;
- std::string path;
- uint64_t max_subscribe_id = kDefaultInitialMaxSubscribeId;
bool deliver_partial_objects = false;
+ quic::Perspective perspective = quic::Perspective::IS_SERVER;
+ bool using_webtrans = true;
+ std::string path = "";
+ uint64_t max_subscribe_id = kDefaultInitialMaxSubscribeId;
+ uint64_t max_auth_token_cache_size = kDefaultMaxAuthTokenCacheSize;
bool support_object_acks = false;
};
@@ -108,8 +129,8 @@
kFetchOk = 0x18,
kFetchError = 0x19,
kSubscribesBlocked = 0x1a,
- kClientSetup = 0x40,
- kServerSetup = 0x41,
+ kClientSetup = 0x20,
+ kServerSetup = 0x21,
// QUICHE-specific extensions.
@@ -139,23 +160,54 @@
0x01;
inline constexpr webtransport::StreamErrorCode kResetCodeTimedOut = 0x02;
-enum class QUICHE_EXPORT MoqtSetupParameter : uint64_t {
- kRole = 0x0,
+enum class QUICHE_EXPORT SetupParameter : uint64_t {
kPath = 0x1,
- kMaxSubscribeId = 0x2,
+ kMaxRequestId = 0x2,
+ kMaxAuthTokenCacheSize = 0x4,
// QUICHE-specific extensions.
// Indicates support for OACK messages.
- kSupportObjectAcks = 0xbbf1439,
+ kSupportObjectAcks = 0xbbf1438,
};
-enum class QUICHE_EXPORT MoqtTrackRequestParameter : uint64_t {
- kAuthorizationInfo = 0x2,
- kDeliveryTimeout = 0x3,
+enum class QUICHE_EXPORT VersionSpecificParameter : uint64_t {
+ kAuthorizationToken = 0x1,
+ kDeliveryTimeout = 0x2,
+ kAuthorizationInfo = 0x3,
kMaxCacheDuration = 0x4,
// QUICHE-specific extensions.
- kOackWindowSize = 0xbbf1439,
+ kOackWindowSize = 0xbbf1438,
+};
+
+struct VersionSpecificParameters {
+ VersionSpecificParameters() = default;
+ // Likely parameter combinations.
+ VersionSpecificParameters(quic::QuicTimeDelta delivery_timeout,
+ quic::QuicTimeDelta max_cache_duration)
+ : delivery_timeout(delivery_timeout),
+ max_cache_duration(max_cache_duration) {}
+ explicit VersionSpecificParameters(std::string authorization_info)
+ : authorization_info(authorization_info) {}
+ VersionSpecificParameters(quic::QuicTimeDelta delivery_timeout,
+ std::string authorization_info)
+ : delivery_timeout(delivery_timeout),
+ authorization_info(authorization_info) {}
+
+ // TODO(martinduke): Turn auth_token into structured data.
+ std::vector<std::string> authorization_token;
+ quic::QuicTimeDelta delivery_timeout = quic::QuicTimeDelta::Infinite();
+ std::optional<std::string> authorization_info;
+ quic::QuicTimeDelta max_cache_duration = quic::QuicTimeDelta::Infinite();
+ std::optional<quic::QuicTimeDelta> oack_window_size;
+
+ bool operator==(const VersionSpecificParameters& other) const {
+ return authorization_token == other.authorization_token &&
+ delivery_timeout == other.delivery_timeout &&
+ authorization_info == other.authorization_info &&
+ max_cache_duration == other.max_cache_duration &&
+ oack_window_size == other.oack_window_size;
+ }
};
// Used for SUBSCRIBE_ERROR, ANNOUNCE_ERROR, ANNOUNCE_CANCEL,
@@ -306,17 +358,97 @@
return H::combine(std::move(h), m.group, m.object);
}
+// Encodes a list of key-value pairs common to both parameters and extensions.
+// If the key is odd, it is a length-prefixed string (which may encode further
+// item-specific structure). If the key is even, it is a varint.
+// This class does not interpret the semantic meaning of the keys and values,
+// although it does accept various uint64_t-based enums to reduce the burden of
+// casting on the caller.
+class KeyValuePairList {
+ public:
+ KeyValuePairList() = default;
+ size_t size() const { return integer_map_.size() + string_map_.size(); }
+ void insert(VersionSpecificParameter key, uint64_t value) {
+ insert(static_cast<uint64_t>(key), value);
+ }
+ void insert(SetupParameter key, uint64_t value) {
+ insert(static_cast<uint64_t>(key), value);
+ }
+ void insert(VersionSpecificParameter key, absl::string_view value) {
+ insert(static_cast<uint64_t>(key), value);
+ }
+ void insert(SetupParameter key, absl::string_view value) {
+ insert(static_cast<uint64_t>(key), value);
+ }
+ void insert(uint64_t key, absl::string_view value);
+ void insert(uint64_t key, uint64_t value);
+ size_t count(VersionSpecificParameter key) const {
+ return count(static_cast<uint64_t>(key));
+ }
+ size_t count(SetupParameter key) const {
+ return count(static_cast<uint64_t>(key));
+ }
+ bool contains(VersionSpecificParameter key) const {
+ return contains(static_cast<uint64_t>(key));
+ }
+ bool contains(SetupParameter key) const {
+ return contains(static_cast<uint64_t>(key));
+ }
+ // If either of these callbacks returns false, ForEach will return early.
+ using IntCallback = quiche::UnretainedCallback<bool(uint64_t, uint64_t)>;
+ using StringCallback =
+ quiche::UnretainedCallback<bool(uint64_t, absl::string_view)>;
+ // Iterates through the whole list, and executes int_callback for each integer
+ // value and string_callback for each string value.
+ bool ForEach(IntCallback int_callback, StringCallback string_callback) const {
+ for (const auto& [key, value] : integer_map_) {
+ if (!int_callback(key, value)) {
+ return false;
+ }
+ }
+ for (const auto& [key, value] : string_map_) {
+ if (!string_callback(key, value)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ std::vector<uint64_t> GetIntegers(VersionSpecificParameter key) const {
+ return GetIntegers(static_cast<uint64_t>(key));
+ }
+ std::vector<uint64_t> GetIntegers(SetupParameter key) const {
+ return GetIntegers(static_cast<uint64_t>(key));
+ }
+ std::vector<absl::string_view> GetStrings(
+ VersionSpecificParameter key) const {
+ return GetStrings(static_cast<uint64_t>(key));
+ }
+ std::vector<absl::string_view> GetStrings(SetupParameter key) const {
+ return GetStrings(static_cast<uint64_t>(key));
+ }
+ void clear() {
+ integer_map_.clear();
+ string_map_.clear();
+ }
+
+ private:
+ size_t count(uint64_t key) const;
+ bool contains(uint64_t key) const;
+ std::vector<uint64_t> GetIntegers(uint64_t key) const;
+ std::vector<absl::string_view> GetStrings(uint64_t key) const;
+ absl::btree_multimap<uint64_t, uint64_t> integer_map_;
+ absl::btree_multimap<uint64_t, std::string> string_map_;
+};
+
+// TODO(martinduke): Collapse both Setup messages into MoqtSessionParameters.
struct QUICHE_EXPORT MoqtClientSetup {
std::vector<MoqtVersion> supported_versions;
- std::optional<std::string> path;
- std::optional<uint64_t> max_subscribe_id;
- bool supports_object_ack = false;
+ MoqtSessionParameters parameters;
};
struct QUICHE_EXPORT MoqtServerSetup {
MoqtVersion selected_version;
- std::optional<uint64_t> max_subscribe_id;
- bool supports_object_ack = false;
+ MoqtSessionParameters parameters;
};
// These codes do not appear on the wire.
@@ -357,25 +489,6 @@
kAbsoluteRange = 0x4,
};
-struct QUICHE_EXPORT MoqtSubscribeParameters {
- std::optional<std::string> authorization_info;
- std::optional<quic::QuicTimeDelta> delivery_timeout;
- std::optional<quic::QuicTimeDelta> max_cache_duration;
-
- // If present, indicates that OBJECT_ACK messages will be sent in response to
- // the objects on the stream. The actual value is informational, and it
- // communicates how many frames the subscriber is willing to buffer, in
- // microseconds.
- std::optional<quic::QuicTimeDelta> object_ack_window;
-
- bool operator==(const MoqtSubscribeParameters& other) const {
- return authorization_info == other.authorization_info &&
- delivery_timeout == other.delivery_timeout &&
- max_cache_duration == other.max_cache_duration &&
- object_ack_window == other.object_ack_window;
- }
-};
-
struct QUICHE_EXPORT MoqtSubscribe {
uint64_t subscribe_id;
uint64_t track_alias;
@@ -391,8 +504,7 @@
std::optional<Location> start;
std::optional<uint64_t> end_group;
// If the mode is kNone, the these are std::nullopt.
-
- MoqtSubscribeParameters parameters;
+ VersionSpecificParameters parameters;
};
// Deduce the filter type from the combination of group and object IDs. Returns
@@ -406,7 +518,7 @@
MoqtDeliveryOrder group_order;
// If ContextExists on the wire is zero, largest_id has no value.
std::optional<Location> largest_id;
- MoqtSubscribeParameters parameters;
+ VersionSpecificParameters parameters;
};
struct QUICHE_EXPORT MoqtSubscribeError {
@@ -442,12 +554,12 @@
Location start;
std::optional<uint64_t> end_group;
MoqtPriority subscriber_priority;
- MoqtSubscribeParameters parameters;
+ VersionSpecificParameters parameters;
};
struct QUICHE_EXPORT MoqtAnnounce {
FullTrackName track_namespace;
- MoqtSubscribeParameters parameters;
+ VersionSpecificParameters parameters;
};
struct QUICHE_EXPORT MoqtAnnounceOk {
@@ -490,6 +602,7 @@
MoqtTrackStatusCode status_code;
uint64_t last_group;
uint64_t last_object;
+ VersionSpecificParameters parameters;
};
struct QUICHE_EXPORT MoqtAnnounceCancel {
@@ -500,6 +613,7 @@
struct QUICHE_EXPORT MoqtTrackStatusRequest {
FullTrackName full_track_name;
+ VersionSpecificParameters parameters;
};
struct QUICHE_EXPORT MoqtGoAway {
@@ -508,7 +622,7 @@
struct QUICHE_EXPORT MoqtSubscribeAnnounces {
FullTrackName track_namespace;
- MoqtSubscribeParameters parameters;
+ VersionSpecificParameters parameters;
};
struct QUICHE_EXPORT MoqtSubscribeAnnouncesOk {
@@ -553,7 +667,7 @@
Location start_object; // subgroup is ignored
uint64_t end_group;
std::optional<uint64_t> end_object;
- MoqtSubscribeParameters parameters;
+ VersionSpecificParameters parameters;
};
struct QUICHE_EXPORT MoqtFetchCancel {
@@ -564,7 +678,7 @@
uint64_t subscribe_id;
MoqtDeliveryOrder group_order;
Location largest_id; // subgroup is ignored
- MoqtSubscribeParameters parameters;
+ VersionSpecificParameters parameters;
};
struct QUICHE_EXPORT MoqtFetchError {
@@ -588,6 +702,16 @@
quic::QuicTimeDelta delta_from_deadline = quic::QuicTimeDelta::Zero();
};
+// Returns false if duplicates are present for a known parameter where the spec
+// forbids duplicates. |perspective| is the consumer of the message, not the
+// sender.
+bool ValidateSetupParameters(const KeyValuePairList& parameters, bool webtrans,
+ quic::Perspective perspective);
+// Returns false if the parameters contain a protocol violation, or a
+// parameter cannot be in |message type|.
+bool ValidateVersionSpecificParameters(const KeyValuePairList& parameters,
+ MoqtMessageType message_type);
+
std::string MoqtMessageTypeToString(MoqtMessageType message_type);
std::string MoqtDataStreamTypeToString(MoqtDataStreamType type);
std::string MoqtDatagramTypeToString(MoqtDatagramType type);
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 1dbe07e..390ceec 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -20,6 +20,7 @@
#include "absl/types/span.h"
#include "quiche/quic/core/quic_data_reader.h"
#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
@@ -101,6 +102,108 @@
return result;
}
+// Reads from |reader| to list. Returns false if there is a read error.
+bool ParseKeyValuePairList(quic::QuicDataReader& reader,
+ KeyValuePairList& list) {
+ list.clear();
+ uint64_t num_params;
+ if (!reader.ReadVarInt62(&num_params)) {
+ return false;
+ }
+ for (uint64_t i = 0; i < num_params; ++i) {
+ uint64_t type;
+ if (!reader.ReadVarInt62(&type)) {
+ return false;
+ }
+ if (type % 2 == 1) {
+ absl::string_view bytes;
+ if (!reader.ReadStringPieceVarInt62(&bytes)) {
+ return false;
+ }
+ list.insert(type, bytes);
+ continue;
+ }
+ uint64_t value;
+ if (!reader.ReadVarInt62(&value)) {
+ return false;
+ }
+ list.insert(type, value);
+ }
+ return true;
+}
+
+void KeyValuePairListToMoqtSessionParameters(const KeyValuePairList& parameters,
+ MoqtSessionParameters& out) {
+ parameters.ForEach(
+ [&](uint64_t key, uint64_t value) {
+ SetupParameter parameter = static_cast<SetupParameter>(key);
+ switch (parameter) {
+ case SetupParameter::kMaxRequestId:
+ out.max_subscribe_id = value;
+ break;
+ case SetupParameter::kMaxAuthTokenCacheSize:
+ out.max_auth_token_cache_size = value;
+ break;
+ case SetupParameter::kSupportObjectAcks:
+ out.support_object_acks = (value == 1);
+ break;
+ default:
+ break;
+ }
+ return true;
+ },
+ [&](uint64_t key, absl::string_view value) {
+ SetupParameter parameter = static_cast<SetupParameter>(key);
+ switch (parameter) {
+ case SetupParameter::kPath:
+ out.path = value;
+ break;
+ default:
+ break;
+ }
+ return true;
+ });
+}
+
+void KeyValuePairListToVersionSpecificParameters(
+ const KeyValuePairList& parameters, VersionSpecificParameters& out) {
+ parameters.ForEach(
+ [&](uint64_t key, uint64_t value) {
+ VersionSpecificParameter parameter =
+ static_cast<VersionSpecificParameter>(key);
+ switch (parameter) {
+ case VersionSpecificParameter::kDeliveryTimeout:
+ out.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(value);
+ break;
+ case VersionSpecificParameter::kMaxCacheDuration:
+ out.max_cache_duration =
+ quic::QuicTimeDelta::FromMilliseconds(value);
+ break;
+ case VersionSpecificParameter::kOackWindowSize:
+ out.oack_window_size = quic::QuicTimeDelta::FromMicroseconds(value);
+ break;
+ default:
+ break;
+ }
+ return true;
+ },
+ [&](uint64_t key, absl::string_view value) {
+ VersionSpecificParameter parameter =
+ static_cast<VersionSpecificParameter>(key);
+ switch (parameter) {
+ case VersionSpecificParameter::kAuthorizationToken:
+ out.authorization_token.push_back(std::string(value));
+ break;
+ case VersionSpecificParameter::kAuthorizationInfo:
+ out.authorization_info = value;
+ break;
+ default:
+ break;
+ }
+ return true;
+ });
+}
+
} // namespace
void MoqtControlParser::ReadAndDispatchMessages() {
@@ -275,6 +378,8 @@
size_t MoqtControlParser::ProcessClientSetup(quic::QuicDataReader& reader) {
MoqtClientSetup setup;
+ setup.parameters.using_webtrans = uses_web_transport_;
+ setup.parameters.perspective = quic::Perspective::IS_CLIENT;
uint64_t number_of_supported_versions;
if (!reader.ReadVarInt62(&number_of_supported_versions)) {
return 0;
@@ -286,131 +391,59 @@
}
setup.supported_versions.push_back(static_cast<MoqtVersion>(version));
}
- uint64_t num_params;
- if (!reader.ReadVarInt62(&num_params)) {
+ KeyValuePairList parameters;
+ if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
- // Parse parameters
- for (uint64_t i = 0; i < num_params; ++i) {
- uint64_t type;
- absl::string_view value;
- if (!ReadParameter(reader, type, value)) {
- return 0;
- }
- auto key = static_cast<MoqtSetupParameter>(type);
- switch (key) {
- case MoqtSetupParameter::kPath:
- if (uses_web_transport_) {
- ParseError(
- "WebTransport connection is using PATH parameter in SETUP");
- return 0;
- }
- if (setup.path.has_value()) {
- ParseError("PATH parameter appears twice in CLIENT_SETUP");
- return 0;
- }
- setup.path = value;
- break;
- case MoqtSetupParameter::kMaxSubscribeId:
- if (setup.max_subscribe_id.has_value()) {
- ParseError("MAX_SUBSCRIBE_ID parameter appears twice in SETUP");
- return 0;
- }
- uint64_t max_id;
- if (!StringViewToVarInt(value, max_id)) {
- ParseError("MAX_SUBSCRIBE_ID parameter is not a valid varint");
- return 0;
- }
- setup.max_subscribe_id = max_id;
- break;
- case MoqtSetupParameter::kSupportObjectAcks:
- uint64_t flag;
- if (!StringViewToVarInt(value, flag) || flag > 1) {
- ParseError("Invalid kSupportObjectAcks value");
- return 0;
- }
- setup.supports_object_ack = static_cast<bool>(flag);
- break;
- default:
- // Skip over the parameter.
- break;
- }
- }
- if (!uses_web_transport_ && !setup.path.has_value()) {
- ParseError("PATH SETUP parameter missing from Client message over QUIC");
+ if (!ValidateSetupParameters(parameters, uses_web_transport_,
+ quic::Perspective::IS_SERVER)) {
+ ParseError("Client SETUP contains invalid parameters");
return 0;
}
+ KeyValuePairListToMoqtSessionParameters(parameters, setup.parameters);
+ // TODO(martinduke): Validate construction of the PATH (Sec 8.3.2.1)
visitor_.OnClientSetupMessage(setup);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessServerSetup(quic::QuicDataReader& reader) {
MoqtServerSetup setup;
+ setup.parameters.using_webtrans = uses_web_transport_;
+ setup.parameters.perspective = quic::Perspective::IS_SERVER;
uint64_t version;
if (!reader.ReadVarInt62(&version)) {
return 0;
}
setup.selected_version = static_cast<MoqtVersion>(version);
- uint64_t num_params;
- if (!reader.ReadVarInt62(&num_params)) {
+ KeyValuePairList parameters;
+ if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
- // Parse parameters
- for (uint64_t i = 0; i < num_params; ++i) {
- uint64_t type;
- absl::string_view value;
- if (!ReadParameter(reader, type, value)) {
- return 0;
- }
- auto key = static_cast<MoqtSetupParameter>(type);
- switch (key) {
- case MoqtSetupParameter::kPath:
- ParseError("PATH parameter in SERVER_SETUP");
- return 0;
- case MoqtSetupParameter::kMaxSubscribeId:
- if (setup.max_subscribe_id.has_value()) {
- ParseError("MAX_SUBSCRIBE_ID parameter appears twice in SETUP");
- return 0;
- }
- uint64_t max_id;
- if (!StringViewToVarInt(value, max_id)) {
- ParseError("MAX_SUBSCRIBE_ID parameter is not a valid varint");
- return 0;
- }
- setup.max_subscribe_id = max_id;
- break;
- case MoqtSetupParameter::kSupportObjectAcks:
- uint64_t flag;
- if (!StringViewToVarInt(value, flag) || flag > 1) {
- ParseError("Invalid kSupportObjectAcks value");
- return 0;
- }
- setup.supports_object_ack = static_cast<bool>(flag);
- break;
- default:
- // Skip over the parameter.
- break;
- }
+ if (!ValidateSetupParameters(parameters, uses_web_transport_,
+ quic::Perspective::IS_CLIENT)) {
+ ParseError("Server SETUP contains invalid parameters");
+ return 0;
}
+ KeyValuePairListToMoqtSessionParameters(parameters, setup.parameters);
visitor_.OnServerSetupMessage(setup);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessSubscribe(quic::QuicDataReader& reader) {
- MoqtSubscribe subscribe_request;
+ MoqtSubscribe subscribe;
uint64_t filter, group, object;
uint8_t group_order;
absl::string_view track_name;
- if (!reader.ReadVarInt62(&subscribe_request.subscribe_id) ||
- !reader.ReadVarInt62(&subscribe_request.track_alias) ||
- !ReadTrackNamespace(reader, subscribe_request.full_track_name) ||
+ if (!reader.ReadVarInt62(&subscribe.subscribe_id) ||
+ !reader.ReadVarInt62(&subscribe.track_alias) ||
+ !ReadTrackNamespace(reader, subscribe.full_track_name) ||
!reader.ReadStringPieceVarInt62(&track_name) ||
- !reader.ReadUInt8(&subscribe_request.subscriber_priority) ||
+ !reader.ReadUInt8(&subscribe.subscriber_priority) ||
!reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&filter)) {
return 0;
}
- subscribe_request.full_track_name.AddElement(track_name);
- if (!ParseDeliveryOrder(group_order, subscribe_request.group_order)) {
+ subscribe.full_track_name.AddElement(track_name);
+ if (!ParseDeliveryOrder(group_order, subscribe.group_order)) {
ParseError("Invalid group order value in SUBSCRIBE message");
return 0;
}
@@ -423,15 +456,15 @@
if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) {
return 0;
}
- subscribe_request.start = Location(group, object);
+ subscribe.start = Location(group, object);
if (filter_type == MoqtFilterType::kAbsoluteStart) {
break;
}
if (!reader.ReadVarInt62(&group)) {
return 0;
}
- subscribe_request.end_group = group;
- if (*subscribe_request.end_group < subscribe_request.start->group) {
+ subscribe.end_group = group;
+ if (*subscribe.end_group < subscribe.start->group) {
ParseError("End group is less than start group");
return 0;
}
@@ -440,10 +473,18 @@
ParseError("Invalid filter type");
return 0;
}
- if (!ReadSubscribeParameters(reader, subscribe_request.parameters)) {
+ KeyValuePairList parameters;
+ if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
- visitor_.OnSubscribeMessage(subscribe_request);
+ // TODO(martinduke): Parse kAuthorizationToken.
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kSubscribe)) {
+ ParseError("SUBSCRIBE contains invalid parameters");
+ return 0;
+ }
+ KeyValuePairListToVersionSpecificParameters(parameters, subscribe.parameters);
+ visitor_.OnSubscribeMessage(subscribe);
return reader.PreviouslyReadPayload().length();
}
@@ -474,13 +515,17 @@
return 0;
}
}
- if (!ReadSubscribeParameters(reader, subscribe_ok.parameters)) {
+ KeyValuePairList parameters;
+ if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
- if (subscribe_ok.parameters.authorization_info.has_value()) {
- ParseError("SUBSCRIBE_OK has authorization info");
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kSubscribeOk)) {
+ ParseError("SUBSCRIBE_OK contains invalid parameters");
return 0;
}
+ KeyValuePairListToVersionSpecificParameters(parameters,
+ subscribe_ok.parameters);
visitor_.OnSubscribeOkMessage(subscribe_ok);
return reader.PreviouslyReadPayload().length();
}
@@ -531,9 +576,17 @@
!reader.ReadUInt8(&subscribe_update.subscriber_priority)) {
return 0;
}
- if (!ReadSubscribeParameters(reader, subscribe_update.parameters)) {
+ KeyValuePairList parameters;
+ if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kSubscribeUpdate)) {
+ ParseError("SUBSCRIBE_UPDATE contains invalid parameters");
+ return 0;
+ }
+ KeyValuePairListToVersionSpecificParameters(parameters,
+ subscribe_update.parameters);
subscribe_update.start = Location(start_group, start_object);
if (end_group > 0) {
subscribe_update.end_group = end_group - 1;
@@ -542,10 +595,6 @@
return 0;
}
}
- if (subscribe_update.parameters.authorization_info.has_value()) {
- ParseError("SUBSCRIBE_UPDATE has authorization info");
- return 0;
- }
visitor_.OnSubscribeUpdateMessage(subscribe_update);
return reader.PreviouslyReadPayload().length();
}
@@ -555,13 +604,16 @@
if (!ReadTrackNamespace(reader, announce.track_namespace)) {
return 0;
}
- if (!ReadSubscribeParameters(reader, announce.parameters)) {
+ KeyValuePairList parameters;
+ if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
- if (announce.parameters.delivery_timeout.has_value()) {
- ParseError("ANNOUNCE has delivery timeout");
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kAnnounce)) {
+ ParseError("ANNOUNCE contains invalid parameters");
return 0;
}
+ KeyValuePairListToVersionSpecificParameters(parameters, announce.parameters);
visitor_.OnAnnounceMessage(announce);
return reader.PreviouslyReadPayload().length();
}
@@ -616,6 +668,17 @@
return 0;
}
track_status_request.full_track_name.AddElement(name);
+ KeyValuePairList parameters;
+ if (!ParseKeyValuePairList(reader, parameters)) {
+ return 0;
+ }
+ if (!ValidateVersionSpecificParameters(
+ parameters, MoqtMessageType::kTrackStatusRequest)) {
+ ParseError("TRACK_STATUS_REQUEST message contains invalid parameters");
+ return 0;
+ }
+ KeyValuePairListToVersionSpecificParameters(parameters,
+ track_status_request.parameters);
visitor_.OnTrackStatusRequestMessage(track_status_request);
return reader.PreviouslyReadPayload().length();
}
@@ -646,6 +709,17 @@
return 0;
}
track_status.status_code = static_cast<MoqtTrackStatusCode>(value);
+ KeyValuePairList parameters;
+ if (!ParseKeyValuePairList(reader, parameters)) {
+ return 0;
+ }
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kTrackStatus)) {
+ ParseError("TRACK_STATUS message contains invalid parameters");
+ return 0;
+ }
+ KeyValuePairListToVersionSpecificParameters(parameters,
+ track_status.parameters);
visitor_.OnTrackStatusMessage(track_status);
return reader.PreviouslyReadPayload().length();
}
@@ -661,14 +735,22 @@
size_t MoqtControlParser::ProcessSubscribeAnnounces(
quic::QuicDataReader& reader) {
- MoqtSubscribeAnnounces subscribe_namespace;
- if (!ReadTrackNamespace(reader, subscribe_namespace.track_namespace)) {
+ MoqtSubscribeAnnounces subscribe_announces;
+ if (!ReadTrackNamespace(reader, subscribe_announces.track_namespace)) {
return 0;
}
- if (!ReadSubscribeParameters(reader, subscribe_namespace.parameters)) {
+ KeyValuePairList parameters;
+ if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
- visitor_.OnSubscribeAnnouncesMessage(subscribe_namespace);
+ if (!ValidateVersionSpecificParameters(
+ parameters, MoqtMessageType::kSubscribeAnnounces)) {
+ ParseError("SUBSCRIBE_ANNOUNCES message contains invalid parameters");
+ return 0;
+ }
+ KeyValuePairListToVersionSpecificParameters(parameters,
+ subscribe_announces.parameters);
+ visitor_.OnSubscribeAnnouncesMessage(subscribe_announces);
return reader.PreviouslyReadPayload().length();
}
@@ -770,9 +852,15 @@
ParseError("Invalid FETCH type");
return 0;
}
- if (!ReadSubscribeParameters(reader, fetch.parameters)) {
+ KeyValuePairList parameters;
+ if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
+ if (!ValidateVersionSpecificParameters(parameters, MoqtMessageType::kFetch)) {
+ ParseError("FETCH message contains invalid parameters");
+ return 0;
+ }
+ KeyValuePairListToVersionSpecificParameters(parameters, fetch.parameters);
visitor_.OnFetchMessage(fetch);
return reader.PreviouslyReadPayload().length();
}
@@ -789,18 +877,25 @@
size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) {
MoqtFetchOk fetch_ok;
uint8_t group_order;
+ KeyValuePairList parameters;
if (!reader.ReadVarInt62(&fetch_ok.subscribe_id) ||
!reader.ReadUInt8(&group_order) ||
!reader.ReadVarInt62(&fetch_ok.largest_id.group) ||
!reader.ReadVarInt62(&fetch_ok.largest_id.object) ||
- !ReadSubscribeParameters(reader, fetch_ok.parameters)) {
+ !ParseKeyValuePairList(reader, parameters)) {
return 0;
}
if (group_order != 0x01 && group_order != 0x02) {
ParseError("Invalid group order value in FETCH_OK");
return 0;
}
+ if (!ValidateVersionSpecificParameters(parameters,
+ MoqtMessageType::kFetchOk)) {
+ ParseError("FETCH_OK message contains invalid parameters");
+ return 0;
+ }
fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
+ KeyValuePairListToVersionSpecificParameters(parameters, fetch_ok.parameters);
visitor_.OnFetchOkMessage(fetch_ok);
return reader.PreviouslyReadPayload().length();
}
@@ -857,109 +952,6 @@
visitor_.OnParsingError(error_code, reason);
}
-bool MoqtControlParser::ReadVarIntPieceVarInt62(quic::QuicDataReader& reader,
- uint64_t& result) {
- uint64_t length;
- if (!reader.ReadVarInt62(&length)) {
- return false;
- }
- uint64_t actual_length = static_cast<uint64_t>(reader.PeekVarInt62Length());
- if (length != actual_length) {
- ParseError("Parameter VarInt has length field mismatch");
- return false;
- }
- if (!reader.ReadVarInt62(&result)) {
- return false;
- }
- return true;
-}
-
-bool MoqtControlParser::ReadParameter(quic::QuicDataReader& reader,
- uint64_t& type,
- absl::string_view& value) {
- if (!reader.ReadVarInt62(&type)) {
- return false;
- }
- return reader.ReadStringPieceVarInt62(&value);
-}
-
-bool MoqtControlParser::ReadSubscribeParameters(
- quic::QuicDataReader& reader, MoqtSubscribeParameters& params) {
- uint64_t num_params;
- if (!reader.ReadVarInt62(&num_params)) {
- return false;
- }
- for (uint64_t i = 0; i < num_params; ++i) {
- uint64_t type;
- absl::string_view value;
- if (!ReadParameter(reader, type, value)) {
- return false;
- }
- uint64_t raw_value;
- auto key = static_cast<MoqtTrackRequestParameter>(type);
- switch (key) {
- case MoqtTrackRequestParameter::kAuthorizationInfo:
- if (params.authorization_info.has_value()) {
- ParseError("AUTHORIZATION_INFO parameter appears twice");
- return false;
- }
- params.authorization_info = value;
- break;
- case moqt::MoqtTrackRequestParameter::kDeliveryTimeout:
- if (params.delivery_timeout.has_value()) {
- ParseError("DELIVERY_TIMEOUT parameter appears twice");
- return false;
- }
- if (!StringViewToVarInt(value, raw_value)) {
- return false;
- }
- params.delivery_timeout =
- quic::QuicTimeDelta::FromMilliseconds(raw_value);
- break;
- case moqt::MoqtTrackRequestParameter::kMaxCacheDuration:
- if (params.max_cache_duration.has_value()) {
- ParseError("MAX_CACHE_DURATION parameter appears twice");
- return false;
- }
- if (!StringViewToVarInt(value, raw_value)) {
- return false;
- }
- params.max_cache_duration =
- quic::QuicTimeDelta::FromMilliseconds(raw_value);
- break;
- case MoqtTrackRequestParameter::kOackWindowSize: {
- if (params.object_ack_window.has_value()) {
- ParseError("OACK_WINDOW_SIZE parameter appears twice in SUBSCRIBE");
- return false;
- }
- if (!StringViewToVarInt(value, raw_value)) {
- ParseError("OACK_WINDOW_SIZE parameter is not a valid varint");
- return false;
- }
- params.object_ack_window =
- quic::QuicTimeDelta::FromMicroseconds(raw_value);
- break;
- }
- default:
- // Skip over the parameter.
- break;
- }
- }
- return true;
-}
-
-bool MoqtControlParser::StringViewToVarInt(absl::string_view& sv,
- uint64_t& vi) {
- quic::QuicDataReader reader(sv);
- if (static_cast<size_t>(reader.PeekVarInt62Length()) != sv.length()) {
- ParseError(MoqtError::kParameterLengthMismatch,
- "Parameter length does not match varint encoding");
- return false;
- }
- reader.ReadVarInt62(&vi);
- return true;
-}
-
bool MoqtControlParser::ReadTrackNamespace(quic::QuicDataReader& reader,
FullTrackName& full_track_name) {
QUICHE_DCHECK(full_track_name.empty());
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 37c759e..137dd87 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -138,24 +138,6 @@
void ParseError(absl::string_view reason);
void ParseError(MoqtError error, absl::string_view reason);
- // Reads an integer whose length is specified by a preceding VarInt62 and
- // returns it in |result|. Returns false if parsing fails.
- bool ReadVarIntPieceVarInt62(quic::QuicDataReader& reader, uint64_t& result);
- // Read a parameter and return the value as a string_view. Returns false if
- // |reader| does not have enough data.
- bool ReadParameter(quic::QuicDataReader& reader, uint64_t& type,
- absl::string_view& value);
- // Reads MoqtSubscribeParameter from one of the message types that supports
- // it. The cursor in |reader| should point to the "number of parameters"
- // field in the message. The cursor will move to the end of the parameters.
- // Returns false if it could not parse the full message, in which case the
- // cursor in |reader| should not be used.
- bool ReadSubscribeParameters(quic::QuicDataReader& reader,
- MoqtSubscribeParameters& params);
- // Convert a string view to a varint. Throws an error and returns false if the
- // string_view is not exactly the right length.
- bool StringViewToVarInt(absl::string_view& sv, uint64_t& vi);
-
// Parses a message that a track namespace but not name. The last element of
// |full_track_name| will be set to the empty string. Returns false if it
// could not parse the full namespace field.
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index a2270db..6ba8a22 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -559,21 +559,21 @@
EXPECT_FALSE(visitor_.parsing_error_.has_value());
}
-TEST_F(MoqtMessageSpecificTest, ClientSetupMaxSubscribeIdAppearsTwice) {
+TEST_F(MoqtMessageSpecificTest, ClientSetupMaxRequestIdAppearsTwice) {
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char setup[] = {
- 0x40, 0x40, 0x0f, 0x02, 0x01, 0x02, // versions
- 0x03, // 3 params
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
- 0x02, 0x01, 0x32, // max_subscribe_id = 50
- 0x02, 0x01, 0x32, // max_subscribe_id = 50
+ 0x20, 0x0d, 0x02, 0x01, 0x02, // versions
+ 0x03, // 3 params
+ 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
+ 0x02, 0x32, // max_request_id = 50
+ 0x02, 0x32, // max_request_id = 50
};
stream.Receive(absl::string_view(setup, sizeof(setup)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_,
- "MAX_SUBSCRIBE_ID parameter appears twice in SETUP");
+ "Client SETUP contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -581,7 +581,7 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char setup[] = {
- 0x40, 0x41, 0x07,
+ 0x21, 0x07,
0x01, // version = 1
0x01, // 1 param
0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
@@ -589,7 +589,8 @@
stream.Receive(absl::string_view(setup, sizeof(setup)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_EQ(visitor_.parsing_error_, "PATH parameter in SERVER_SETUP");
+ EXPECT_EQ(visitor_.parsing_error_,
+ "Server SETUP contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -597,16 +598,16 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char setup[] = {
- 0x40, 0x40, 0x0e, 0x02, 0x01, 0x02, // versions = 1, 2
- 0x02, // 2 params
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
+ 0x20, 0x0e, 0x02, 0x01, 0x02, // versions = 1, 2
+ 0x02, // 2 params
+ 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
+ 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
};
stream.Receive(absl::string_view(setup, sizeof(setup)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_,
- "PATH parameter appears twice in CLIENT_SETUP");
+ "Client SETUP contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -614,15 +615,15 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kWebTrans, &stream, visitor_);
char setup[] = {
- 0x40, 0x40, 0x09, 0x02, 0x01, 0x02, // versions = 1, 2
- 0x01, // 1 param
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
+ 0x20, 0x09, 0x02, 0x01, 0x02, // versions = 1, 2
+ 0x01, // 1 param
+ 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
};
stream.Receive(absl::string_view(setup, sizeof(setup)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_,
- "WebTransport connection is using PATH parameter in SETUP");
+ "Client SETUP contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -630,14 +631,14 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char setup[] = {
- 0x40, 0x40, 0x04, 0x02, 0x01, 0x02, // versions = 1, 2
- 0x00, // no param
+ 0x20, 0x04, 0x02, 0x01, 0x02, // versions = 1, 2
+ 0x00, // no param
};
stream.Receive(absl::string_view(setup, sizeof(setup)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_,
- "PATH SETUP parameter missing from Client message over QUIC");
+ "Client SETUP contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -645,17 +646,17 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char setup[] = {
- 0x40, 0x40, 0x0f, 0x02, 0x01, 0x02, // versions = 1, 2
- 0x03, // 4 params
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
- 0x02, 0x01, 0x32, // max_subscribe_id = 50
- 0x02, 0x01, 0x32, // max_subscribe_id = 50
+ 0x20, 0x0d, 0x02, 0x01, 0x02, // versions = 1, 2
+ 0x03, // 4 params
+ 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
+ 0x02, 0x32, // max_subscribe_id = 50
+ 0x02, 0x32, // max_subscribe_id = 50
};
stream.Receive(absl::string_view(setup, sizeof(setup)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
EXPECT_EQ(visitor_.parsing_error_,
- "MAX_SUBSCRIBE_ID parameter appears twice in SETUP");
+ "Client SETUP contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -669,14 +670,13 @@
0x20, 0x02, // priority = 0x20 descending
0x02, // filter_type = kLatestObject
0x02, // two params
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_EQ(visitor_.parsing_error_,
- "AUTHORIZATION_INFO parameter appears twice");
+ EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -684,99 +684,58 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x18, 0x01, 0x02, 0x01,
+ 0x03, 0x16, 0x01, 0x02, 0x01,
0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, // priority = 0x20 descending
0x02, // filter_type = kLatestObject
0x02, // two params
- 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000
- 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000
+ 0x02, 0x67, 0x10, // delivery_timeout = 10000
+ 0x02, 0x67, 0x10, // delivery_timeout = 10000
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_EQ(visitor_.parsing_error_,
- "DELIVERY_TIMEOUT parameter appears twice");
+ EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
-TEST_F(MoqtMessageSpecificTest, SubscribeDeliveryTimeoutMalformed) {
- webtransport::test::InMemoryStream stream(/*stream_id=*/0);
- MoqtControlParser parser(kRawQuic, &stream, visitor_);
- char subscribe[] = {
- 0x03, 0x14, 0x01, 0x02, 0x01,
- 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
- 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
- 0x02, // filter_type = kLatestObject
- 0x01, // one param
- 0x03, 0x01, 0x67, 0x10, // delivery_timeout = 10000
- };
- stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
- parser.ReadAndDispatchMessages();
- EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_EQ(visitor_.parsing_error_,
- "Parameter length does not match varint encoding");
- EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kParameterLengthMismatch);
-}
-
TEST_F(MoqtMessageSpecificTest, SubscribeMaxCacheDurationTwice) {
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x18, 0x01, 0x02, 0x01,
+ 0x03, 0x16, 0x01, 0x02, 0x01,
0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, // priority = 0x20 descending
0x02, // filter_type = kLatestObject
0x02, // two params
- 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000
- 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000
+ 0x04, 0x67, 0x10, // max_cache_duration = 10000
+ 0x04, 0x67, 0x10, // max_cache_duration = 10000
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_EQ(visitor_.parsing_error_,
- "MAX_CACHE_DURATION parameter appears twice");
+ EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
-TEST_F(MoqtMessageSpecificTest, SubscribeMaxCacheDurationMalformed) {
- webtransport::test::InMemoryStream stream(/*stream_id=*/0);
- MoqtControlParser parser(kRawQuic, &stream, visitor_);
- char subscribe[] = {
- 0x03, 0x14, 0x01, 0x02, 0x01,
- 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
- 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
- 0x02, // filter_type = kLatestObject
- 0x01, // one param
- 0x04, 0x01, 0x67, 0x10, // max_cache_duration = 10000
- };
- stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
- parser.ReadAndDispatchMessages();
- EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_EQ(visitor_.parsing_error_,
- "Parameter length does not match varint encoding");
- EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kParameterLengthMismatch);
-}
-
TEST_F(MoqtMessageSpecificTest, SubscribeOkHasAuthorizationInfo) {
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kWebTrans, &stream, visitor_);
char subscribe_ok[] = {
- 0x04, 0x10, 0x01, 0x03, // subscribe_id = 1, expires = 3
+ 0x04, 0x0f, 0x01, 0x03, // subscribe_id = 1, expires = 3
0x02, 0x01, // group_order = 2, content exists
0x0c, 0x14, // largest_group_id = 12, largest_object_id = 20,
0x02, // 2 parameters
- 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x02, 0x67, 0x10, // delivery_timeout = 10000
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
stream.Receive(absl::string_view(subscribe_ok, sizeof(subscribe_ok)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE_OK has authorization info");
+ EXPECT_EQ(visitor_.parsing_error_,
+ "SUBSCRIBE_OK contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -787,13 +746,14 @@
0x02, 0x0b, 0x02, 0x03, 0x01, 0x05, // start and end sequences
0xaa, // priority = 0xaa
0x01, // 1 parameter
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)),
false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE_UPDATE has authorization info");
+ EXPECT_EQ(visitor_.parsing_error_,
+ "SUBSCRIBE_UPDATE contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -803,14 +763,13 @@
char announce[] = {
0x06, 0x10, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x02, // 2 params
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
stream.Receive(absl::string_view(announce, sizeof(announce)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_EQ(visitor_.parsing_error_,
- "AUTHORIZATION_INFO parameter appears twice");
+ EXPECT_EQ(visitor_.parsing_error_, "ANNOUNCE contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -818,15 +777,15 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kWebTrans, &stream, visitor_);
char announce[] = {
- 0x06, 0x0f, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x06, 0x0e, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x02, // 2 params
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
- 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x02, 0x67, 0x10, // delivery_timeout = 10000
};
stream.Receive(absl::string_view(announce, sizeof(announce)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_EQ(visitor_.parsing_error_, "ANNOUNCE has delivery timeout");
+ EXPECT_EQ(visitor_.parsing_error_, "ANNOUNCE contains invalid parameters");
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
}
@@ -931,7 +890,7 @@
0x20, 0x02, // priority = 0x20, group order descending
0x02, // filter_type = kLatestObject
0x01, // 1 parameter
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
@@ -953,7 +912,7 @@
0x20, 0x08, // priority = 0x20 ???
0x01, // filter_type = kLatestGroup
0x01, // 1 parameter
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
@@ -973,7 +932,7 @@
0x04, // start_group = 4
0x01, // start_object = 1
0x01, // 1 parameter
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
@@ -999,7 +958,7 @@
0x01, // start_object = 1
0x07, // end_group = 7
0x01, // 1 parameter
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
@@ -1025,7 +984,7 @@
0x01, // start_object = 1
0x03, // end_group = 3
0x01, // 1 parameter
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
@@ -1070,10 +1029,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe_update[] = {
- 0x02, 0x0b, 0x02, 0x03, 0x01, 0x03, // start and end sequences
+ 0x02, 0x08, 0x02, 0x03, 0x01, 0x03, // start and end sequences
0x20, // priority
0x01, // 1 parameter
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x02, 0x20, // delivery_timeout = 32 ms
};
stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)),
false);
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 88bedaf..967d81a 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -34,6 +34,8 @@
#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_session_callbacks.h"
+#include "quiche/quic/moqt/moqt_session_interface.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/platform/api/quic_logging.h"
@@ -148,12 +150,8 @@
control_stream_ = control_stream->GetStreamId();
MoqtClientSetup setup = MoqtClientSetup{
.supported_versions = std::vector<MoqtVersion>{parameters_.version},
- .max_subscribe_id = parameters_.max_subscribe_id,
- .supports_object_ack = parameters_.support_object_acks,
+ .parameters = parameters_,
};
- if (!parameters_.using_webtrans) {
- setup.path = parameters_.path;
- }
SendControlMessage(framer_.SerializeClientSetup(setup));
QUIC_DLOG(INFO) << ENDPOINT << "Send the SETUP message";
}
@@ -243,7 +241,7 @@
bool MoqtSession::SubscribeAnnounces(
FullTrackName track_namespace,
MoqtOutgoingSubscribeAnnouncesCallback callback,
- MoqtSubscribeParameters parameters) {
+ VersionSpecificParameters parameters) {
if (received_goaway_ || sent_goaway_) {
QUIC_DLOG(INFO) << ENDPOINT
<< "Tried to send SUBSCRIBE_ANNOUNCES after GOAWAY";
@@ -251,9 +249,7 @@
}
MoqtSubscribeAnnounces message;
message.track_namespace = track_namespace;
- // Cannot contain a delivery timeout.
- parameters.delivery_timeout = std::nullopt;
- message.parameters = std::move(parameters);
+ message.parameters = parameters;
SendControlMessage(framer_.SerializeSubscribeAnnounces(message));
QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_ANNOUNCES message for "
<< message.track_namespace;
@@ -275,7 +271,8 @@
}
void MoqtSession::Announce(FullTrackName track_namespace,
- MoqtOutgoingAnnounceCallback announce_callback) {
+ MoqtOutgoingAnnounceCallback announce_callback,
+ VersionSpecificParameters parameters) {
if (outgoing_announces_.contains(track_namespace)) {
std::move(announce_callback)(
track_namespace,
@@ -290,6 +287,7 @@
}
MoqtAnnounce message;
message.track_namespace = track_namespace;
+ message.parameters = parameters;
SendControlMessage(framer_.SerializeAnnounce(message));
QUIC_DLOG(INFO) << ENDPOINT << "Sent ANNOUNCE message for "
<< message.track_namespace;
@@ -323,14 +321,14 @@
bool MoqtSession::SubscribeAbsolute(const FullTrackName& name,
uint64_t start_group, uint64_t start_object,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) {
+ VersionSpecificParameters parameters) {
MoqtSubscribe message;
message.full_track_name = name;
message.subscriber_priority = kDefaultSubscriberPriority;
message.group_order = std::nullopt;
message.start = Location(start_group, start_object);
message.end_group = std::nullopt;
- message.parameters = std::move(parameters);
+ message.parameters = parameters;
return Subscribe(message, visitor);
}
@@ -338,7 +336,7 @@
uint64_t start_group, uint64_t start_object,
uint64_t end_group,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) {
+ VersionSpecificParameters parameters) {
if (end_group < start_group) {
QUIC_DLOG(ERROR) << "Subscription end is before beginning";
return false;
@@ -349,20 +347,20 @@
message.group_order = std::nullopt;
message.start = Location(start_group, start_object);
message.end_group = end_group;
- message.parameters = std::move(parameters);
+ message.parameters = parameters;
return Subscribe(message, visitor);
}
bool MoqtSession::SubscribeCurrentObject(const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) {
+ VersionSpecificParameters parameters) {
MoqtSubscribe message;
message.full_track_name = name;
message.subscriber_priority = kDefaultSubscriberPriority;
message.group_order = std::nullopt;
message.start = std::nullopt;
message.end_group = std::nullopt;
- message.parameters = std::move(parameters);
+ message.parameters = parameters;
return Subscribe(message, visitor);
}
@@ -383,8 +381,7 @@
uint64_t end_group, std::optional<uint64_t> end_object,
MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters) {
- // TODO(martinduke): support authorization info
+ VersionSpecificParameters parameters) {
if (next_subscribe_id_ >= peer_max_subscribe_id_) {
QUIC_DLOG(INFO) << ENDPOINT << "Tried to send FETCH with ID "
<< next_subscribe_id_
@@ -396,8 +393,6 @@
QUIC_DLOG(INFO) << ENDPOINT << "Tried to send FETCH after GOAWAY";
return false;
}
- // Cannot have a delivery timeout.
- parameters.delivery_timeout = std::nullopt;
MoqtFetch message;
message.full_track_name = name;
message.fetch_id = next_subscribe_id_++;
@@ -407,7 +402,6 @@
message.subscriber_priority = priority;
message.group_order = delivery_order;
message.parameters = parameters;
- message.parameters.object_ack_window = std::nullopt;
SendControlMessage(framer_.SerializeFetch(message));
QUIC_DLOG(INFO) << ENDPOINT << "Sent FETCH message for "
<< message.full_track_name;
@@ -419,7 +413,7 @@
bool MoqtSession::JoiningFetch(const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
uint64_t num_previous_groups,
- MoqtSubscribeParameters parameters) {
+ VersionSpecificParameters parameters) {
return JoiningFetch(
name, visitor,
[this,
@@ -444,7 +438,7 @@
uint64_t num_previous_groups,
MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters) {
+ VersionSpecificParameters parameters) {
if ((next_subscribe_id_ + 1) >= peer_max_subscribe_id_) {
QUIC_DLOG(INFO) << ENDPOINT << "Tried to send JOINING_FETCH with ID "
<< (next_subscribe_id_ + 1)
@@ -633,23 +627,15 @@
visitor->OnCanAckObjects(absl::bind_front(&MoqtSession::SendObjectAck, this,
message.subscribe_id));
} else {
- QUICHE_DLOG_IF(WARNING, message.parameters.object_ack_window.has_value())
+ QUICHE_DLOG_IF(WARNING, message.parameters.oack_window_size.has_value())
<< "Attempting to set object_ack_window on a connection that does not "
"support it.";
- message.parameters.object_ack_window = std::nullopt;
- }
- if (message.parameters.delivery_timeout.has_value() &&
- message.parameters.delivery_timeout->IsInfinite()) {
- // Cannot encode an infinite delivery timeout.
- message.parameters.delivery_timeout = std::nullopt;
+ message.parameters.oack_window_size = std::nullopt;
}
SendControlMessage(framer_.SerializeSubscribe(message));
QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE message for "
<< message.full_track_name;
- auto track = std::make_unique<SubscribeRemoteTrack>(
- message, visitor,
- message.parameters.delivery_timeout.value_or(
- quic::QuicTimeDelta::Infinite()));
+ auto track = std::make_unique<SubscribeRemoteTrack>(message, visitor);
subscribe_by_name_.emplace(message.full_track_name, track.get());
subscribe_by_alias_.emplace(message.track_alias, track.get());
upstream_by_id_.emplace(message.subscribe_id, std::move(track));
@@ -856,20 +842,17 @@
absl::Hex(session_->parameters_.version)));
return;
}
- session_->peer_supports_object_ack_ = message.supports_object_ack;
+ session_->peer_supports_object_ack_ = message.parameters.support_object_acks;
QUICHE_DLOG(INFO) << ENDPOINT << "Received the SETUP message";
if (session_->parameters_.perspective == Perspective::IS_SERVER) {
MoqtServerSetup response;
+ response.parameters = session_->parameters_;
response.selected_version = session_->parameters_.version;
- response.max_subscribe_id = session_->parameters_.max_subscribe_id;
- response.supports_object_ack = session_->parameters_.support_object_acks;
SendOrBufferMessage(session_->framer_.SerializeServerSetup(response));
QUIC_DLOG(INFO) << ENDPOINT << "Sent the SETUP message";
}
// TODO: handle path.
- if (message.max_subscribe_id.has_value()) {
- session_->peer_max_subscribe_id_ = *message.max_subscribe_id;
- }
+ session_->peer_max_subscribe_id_ = message.parameters.max_subscribe_id;
std::move(session_->callbacks_.session_established_callback)();
}
@@ -887,12 +870,10 @@
absl::Hex(session_->parameters_.version)));
return;
}
- session_->peer_supports_object_ack_ = message.supports_object_ack;
+ session_->peer_supports_object_ack_ = message.parameters.support_object_acks;
QUIC_DLOG(INFO) << ENDPOINT << "Received the SETUP message";
// TODO: handle path.
- if (message.max_subscribe_id.has_value()) {
- session_->peer_max_subscribe_id_ = *message.max_subscribe_id;
- }
+ session_->peer_max_subscribe_id_ = message.parameters.max_subscribe_id;
std::move(session_->callbacks_.session_established_callback)();
}
@@ -960,9 +941,7 @@
MoqtTrackPublisher* track_publisher_ptr = track_publisher->get();
auto subscription = std::make_unique<MoqtSession::PublishedSubscription>(
session_, *std::move(track_publisher), message, monitoring);
- subscription->set_delivery_timeout(
- message.parameters.delivery_timeout.value_or(
- quic::QuicTimeDelta::Infinite()));
+ subscription->set_delivery_timeout(message.parameters.delivery_timeout);
MoqtSession::PublishedSubscription* subscription_ptr = subscription.get();
auto [it, success] = session_->published_subscriptions_.emplace(
message.subscribe_id, std::move(subscription));
@@ -1087,9 +1066,7 @@
}
it->second->Update(message.start, message.end_group,
message.subscriber_priority);
- if (message.parameters.delivery_timeout.has_value()) {
- it->second->set_delivery_timeout(*message.parameters.delivery_timeout);
- }
+ it->second->set_delivery_timeout(message.parameters.delivery_timeout);
}
void MoqtSession::ControlStream::OnAnnounceMessage(
@@ -1692,7 +1669,7 @@
monitoring_interface_(monitoring_interface) {
if (monitoring_interface_ != nullptr) {
monitoring_interface_->OnObjectAckSupportKnown(
- subscribe.parameters.object_ack_window.has_value());
+ subscribe.parameters.oack_window_size.has_value());
}
QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
<< subscribe.full_track_name;
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index da5742e..f1e8c81 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -91,17 +91,17 @@
quic::Perspective perspective() const { return parameters_.perspective; }
// Returns true if message was sent.
- bool SubscribeAnnounces(
- FullTrackName track_namespace,
- MoqtOutgoingSubscribeAnnouncesCallback callback,
- MoqtSubscribeParameters parameters = MoqtSubscribeParameters());
+ bool SubscribeAnnounces(FullTrackName track_namespace,
+ MoqtOutgoingSubscribeAnnouncesCallback callback,
+ VersionSpecificParameters parameters);
bool UnsubscribeAnnounces(FullTrackName track_namespace);
// Send an ANNOUNCE message for |track_namespace|, and call
// |announce_callback| when the response arrives. Will fail immediately if
// there is already an unresolved ANNOUNCE for that namespace.
void Announce(FullTrackName track_namespace,
- MoqtOutgoingAnnounceCallback announce_callback);
+ MoqtOutgoingAnnounceCallback announce_callback,
+ VersionSpecificParameters parameters);
// Returns true if message was sent, false if there is no ANNOUNCE to cancel.
bool Unannounce(FullTrackName track_namespace);
// Allows the subscriber to declare it will not subscribe to |track_namespace|
@@ -116,15 +116,15 @@
bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group,
uint64_t start_object,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) override;
+ VersionSpecificParameters parameters) override;
// Subscribe from (start_group, start_object) to the end of end_group.
bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group,
uint64_t start_object, uint64_t end_group,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) override;
+ VersionSpecificParameters parameters) override;
bool SubscribeCurrentObject(const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) override;
+ VersionSpecificParameters parameters) override;
// Returns false if the subscription is not found. The session immediately
// destroys all subscription state.
void Unsubscribe(const FullTrackName& name);
@@ -136,7 +136,7 @@
Location start, uint64_t end_group,
std::optional<uint64_t> end_object, MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters) override;
+ VersionSpecificParameters parameters) override;
// Sends both a SUBSCRIBE and a joining FETCH, beginning |num_previous_groups|
// groups before the current group. The Fetch will not be flow controlled,
// instead using |visitor| to deliver fetched objects when they arrive. Gaps
@@ -146,7 +146,7 @@
bool JoiningFetch(const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
uint64_t num_previous_groups,
- MoqtSubscribeParameters parameters) override;
+ VersionSpecificParameters parameters) override;
// Sends both a SUBSCRIBE and a joining FETCH, beginning |num_previous_groups|
// groups before the current group. The application provides |callback| to
// fully control acceptance of Fetched objects.
@@ -155,7 +155,7 @@
FetchResponseCallback callback,
uint64_t num_previous_groups, MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters) override;
+ VersionSpecificParameters parameters) override;
// Send a GOAWAY message to the peer. |new_session_uri| must be empty if
// called by the client.
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h
index 1fd77eb..cdd197e 100644
--- a/quiche/quic/moqt/moqt_session_interface.h
+++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -49,18 +49,18 @@
virtual bool SubscribeAbsolute(const FullTrackName& name,
uint64_t start_group, uint64_t start_object,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) = 0;
+ VersionSpecificParameters parameters) = 0;
// Subscribe from (start_group, start_object) to the end of end_group.
virtual bool SubscribeAbsolute(const FullTrackName& name,
uint64_t start_group, uint64_t start_object,
uint64_t end_group,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) = 0;
+ VersionSpecificParameters parameters) = 0;
// Subscribe to all objects that are larger than the current Largest
// Group/Object ID.
virtual bool SubscribeCurrentObject(const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) = 0;
+ VersionSpecificParameters parameters) = 0;
// Sends an UNSUBSCRIBE message and removes all of the state related to the
// subscription. Returns false if the subscription is not found.
@@ -74,7 +74,7 @@
Location start, uint64_t end_group,
std::optional<uint64_t> end_object, MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters) = 0;
+ VersionSpecificParameters parameters) = 0;
// Sends both a SUBSCRIBE and a joining FETCH, beginning `num_previous_groups`
// groups before the current group. The Fetch will not be flow controlled,
@@ -85,7 +85,7 @@
virtual bool JoiningFetch(const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
uint64_t num_previous_groups,
- MoqtSubscribeParameters parameters) = 0;
+ VersionSpecificParameters parameters) = 0;
// Sends both a SUBSCRIBE and a joining FETCH, beginning `num_previous_groups`
// groups before the current group. `callback` acts the same way as the
@@ -95,7 +95,13 @@
FetchResponseCallback callback,
uint64_t num_previous_groups, MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters) = 0;
+ VersionSpecificParameters parameters) = 0;
+
+ // TODO: Add SubscribeAnnounces, UnsubscribeAnnounces method.
+ // TODO: Add Announce, Unannounce method.
+ // TODO: Add AnnounceCancel method.
+ // TODO: Add TrackStatusRequest method.
+ // TODO: Add SubscribeUpdate, SubscribeDone method.
};
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 73d5fc8..efe079e 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -66,7 +66,7 @@
/*group_order=*/std::nullopt,
/*start=*/Location(0, 0),
/*end_group=*/std::nullopt,
- /*parameters=*/MoqtSubscribeParameters(),
+ /*parameters=*/VersionSpecificParameters(),
};
return subscribe;
}
@@ -81,7 +81,7 @@
/*start=*/Location(0, 0),
/*end_group=*/1,
/*end_object=*/std::nullopt,
- /*parameters=*/MoqtSubscribeParameters(),
+ /*parameters=*/VersionSpecificParameters(),
};
return fetch;
}
@@ -160,7 +160,7 @@
(*track_status == MoqtTrackStatusCode::kInProgress)
? std::make_optional(publisher->GetLargestSequence())
: std::optional<Location>(),
- /*parameters=*/MoqtSubscribeParameters(),
+ /*parameters=*/VersionSpecificParameters(),
};
EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _));
control_parser->OnSubscribeMessage(subscribe);
@@ -272,7 +272,7 @@
MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream_);
MoqtClientSetup setup = {
/*supported_versions=*/{kDefaultMoqtVersion},
- /*path=*/std::nullopt,
+ MoqtSessionParameters(quic::Perspective::IS_CLIENT),
};
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kServerSetup), _));
@@ -361,7 +361,8 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _));
session_.Announce(FullTrackName{"foo"},
- announce_resolved_callback.AsStdFunction());
+ announce_resolved_callback.AsStdFunction(),
+ VersionSpecificParameters());
MoqtAnnounceOk ok = {
/*track_namespace=*/FullTrackName{"foo"},
@@ -403,7 +404,8 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _));
session_.Announce(FullTrackName{"foo"},
- announce_resolved_callback.AsStdFunction());
+ announce_resolved_callback.AsStdFunction(),
+ VersionSpecificParameters());
MoqtAnnounceOk ok = {
/*track_namespace=*/FullTrackName{"foo"},
@@ -435,7 +437,8 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _));
session_.Announce(FullTrackName{"foo"},
- announce_resolved_callback.AsStdFunction());
+ announce_resolved_callback.AsStdFunction(),
+ VersionSpecificParameters());
MoqtAnnounceError error = {
/*track_namespace=*/FullTrackName{"foo"},
@@ -605,18 +608,18 @@
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
EXPECT_CALL(
mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _))
.Times(1);
EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo2", "bar2"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
// Second time does not send SUBSCRIBES_BLOCKED.
EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo2", "bar2"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
}
TEST_F(MoqtSessionTest, SubscribeDuplicateTrackName) {
@@ -629,10 +632,10 @@
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
}
TEST_F(MoqtSessionTest, SubscribeWithOk) {
@@ -644,7 +647,7 @@
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
MoqtSubscribeOk ok = {
/*subscribe_id=*/0,
@@ -673,7 +676,7 @@
Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _));
EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
MoqtMaxSubscribeId max_subscribe_id = {
/*max_subscribe_id=*/kDefaultInitialMaxSubscribeId + 1,
};
@@ -683,7 +686,7 @@
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
}
TEST_F(MoqtSessionTest, LowerMaxSubscribeIdIsAnError) {
@@ -723,7 +726,7 @@
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
MoqtSubscribeError error = {
/*subscribe_id=*/0,
@@ -842,7 +845,8 @@
EXPECT_EQ(track_namespace, ftn);
EXPECT_FALSE(error.has_value());
EXPECT_EQ(reason, "");
- });
+ },
+ VersionSpecificParameters());
MoqtSubscribeAnnouncesOk ok = {
/*track_namespace=*/track_namespace,
};
@@ -873,7 +877,8 @@
ASSERT_TRUE(error.has_value());
EXPECT_EQ(*error, SubscribeErrorCode::kInvalidRange);
EXPECT_EQ(reason, "deadbeef");
- });
+ },
+ VersionSpecificParameters());
MoqtSubscribeAnnouncesError error = {
/*track_namespace=*/track_namespace,
/*error_code=*/SubscribeErrorCode::kInvalidRange,
@@ -1637,7 +1642,7 @@
MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream_);
MoqtClientSetup setup = {
/*supported_versions*/ {kDefaultMoqtVersion},
- /*path=*/std::nullopt,
+ MoqtSessionParameters(),
};
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kServerSetup), _));
@@ -2344,7 +2349,7 @@
Writev(SerializedControlMessage(expected_fetch), _));
EXPECT_TRUE(session_.JoiningFetch(
expected_subscribe.full_track_name, &remote_track_visitor, nullptr, 1,
- 0x80, MoqtDeliveryOrder::kAscending, MoqtSubscribeParameters()));
+ 0x80, MoqtDeliveryOrder::kAscending, VersionSpecificParameters()));
}
TEST_F(MoqtSessionTest, SendJoiningFetchNoFlowControl) {
@@ -2359,16 +2364,16 @@
Writev(ControlMessageOfType(MoqtMessageType::kFetch), _));
EXPECT_TRUE(session_.JoiningFetch(FullTrackName("foo", "bar"),
&remote_track_visitor, 0,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
EXPECT_CALL(remote_track_visitor, OnReply).Times(1);
stream_input->OnSubscribeOkMessage(
MoqtSubscribeOk(0, quic::QuicTimeDelta::FromMilliseconds(0),
MoqtDeliveryOrder::kAscending, Location(2, 0),
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
stream_input->OnFetchOkMessage(MoqtFetchOk(1, MoqtDeliveryOrder::kAscending,
Location(2, 0),
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
// Packet arrives on FETCH stream.
MoqtObject object = {
/*fetch_id=*/1,
@@ -2397,7 +2402,7 @@
FullTrackName track_namespace = FullTrackName{"foo"};
MoqtSubscribeAnnounces announces = {
track_namespace,
- /*parameters=*/MoqtSubscribeParameters(),
+ /*parameters=*/VersionSpecificParameters(),
};
webtransport::test::MockStream control_stream;
std::unique_ptr<MoqtControlParserVisitor> stream_input =
@@ -2422,7 +2427,7 @@
FullTrackName track_namespace = FullTrackName{"foo"};
MoqtSubscribeAnnounces announces = {
track_namespace,
- /*parameters=*/MoqtSubscribeParameters(),
+ /*parameters=*/VersionSpecificParameters(),
};
webtransport::test::MockStream control_stream;
std::unique_ptr<MoqtControlParserVisitor> stream_input =
@@ -2448,12 +2453,12 @@
fetch_task = std::move(task);
},
Location(0, 0), 4, std::nullopt, 128, std::nullopt,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
MoqtFetchOk ok = {
/*subscribe_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/Location(3, 25),
- MoqtSubscribeParameters(),
+ VersionSpecificParameters(),
};
stream_input->OnFetchOkMessage(ok);
ASSERT_NE(fetch_task, nullptr);
@@ -2478,7 +2483,7 @@
fetch_task = std::move(task);
},
Location(0, 0), 4, std::nullopt, 128, std::nullopt,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
MoqtFetchError error = {
/*subscribe_id=*/0,
/*error_code=*/SubscribeErrorCode::kUnauthorized,
@@ -2513,7 +2518,7 @@
});
},
Location(0, 0), 4, std::nullopt, 128, std::nullopt,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
// Build queue of packets to arrive.
std::queue<quiche::QuicheBuffer> headers;
std::queue<std::string> payloads;
@@ -2553,7 +2558,7 @@
/*subscribe_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/Location(3, 25),
- MoqtSubscribeParameters(),
+ VersionSpecificParameters(),
};
stream_input->OnFetchOkMessage(ok);
ASSERT_NE(fetch_task, nullptr);
@@ -2583,7 +2588,7 @@
[&]() { objects_available = true; });
},
Location(0, 0), 4, std::nullopt, 128, std::nullopt,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
// Build queue of packets to arrive.
std::queue<quiche::QuicheBuffer> headers;
std::queue<std::string> payloads;
@@ -2623,7 +2628,7 @@
/*subscribe_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/Location(3, 25),
- MoqtSubscribeParameters(),
+ VersionSpecificParameters(),
};
stream_input->OnFetchOkMessage(ok);
ASSERT_NE(fetch_task, nullptr);
@@ -2959,19 +2964,22 @@
MockSubscribeRemoteTrackVisitor remote_track_visitor;
EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
EXPECT_FALSE(session_.SubscribeAnnounces(
- FullTrackName{"foo"}, +[](FullTrackName /*track_namespace*/,
- std::optional<SubscribeErrorCode> /*error*/,
- absl::string_view /*reason*/) {}));
+ FullTrackName{"foo"},
+ +[](FullTrackName /*track_namespace*/,
+ std::optional<SubscribeErrorCode> /*error*/,
+ absl::string_view /*reason*/) {},
+ VersionSpecificParameters()));
session_.Announce(
FullTrackName{"foo"},
+[](FullTrackName /*track_namespace*/,
- std::optional<MoqtAnnounceErrorReason> /*error*/) {});
+ std::optional<MoqtAnnounceErrorReason> /*error*/) {},
+ VersionSpecificParameters());
EXPECT_FALSE(session_.Fetch(
FullTrackName{"foo", "bar"},
+[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, Location(0, 0), 5,
- std::nullopt, 127, std::nullopt, MoqtSubscribeParameters()));
+ std::nullopt, 127, std::nullopt, VersionSpecificParameters()));
// Error on additional GOAWAY.
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -3000,7 +3008,7 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kAnnounceError), _));
stream_input->OnAnnounceMessage(
- MoqtAnnounce(FullTrackName("foo", "bar"), MoqtSubscribeParameters()));
+ MoqtAnnounce(FullTrackName("foo", "bar"), VersionSpecificParameters()));
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kFetchError), _));
MoqtFetch fetch = DefaultFetch();
@@ -3016,19 +3024,22 @@
MockSubscribeRemoteTrackVisitor remote_track_visitor;
EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
EXPECT_FALSE(session_.SubscribeAnnounces(
- FullTrackName{"foo"}, +[](FullTrackName /*track_namespace*/,
- std::optional<SubscribeErrorCode> /*error*/,
- absl::string_view /*reason*/) {}));
+ FullTrackName{"foo"},
+ +[](FullTrackName /*track_namespace*/,
+ std::optional<SubscribeErrorCode> /*error*/,
+ absl::string_view /*reason*/) {},
+ VersionSpecificParameters()));
session_.Announce(
FullTrackName{"foo"},
+[](FullTrackName /*track_namespace*/,
- std::optional<MoqtAnnounceErrorReason> /*error*/) {});
+ std::optional<MoqtAnnounceErrorReason> /*error*/) {},
+ VersionSpecificParameters());
EXPECT_FALSE(session_.Fetch(
FullTrackName{"foo", "bar"},
+[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, Location(0, 0), 5,
- std::nullopt, 127, std::nullopt, MoqtSubscribeParameters()));
+ std::nullopt, 127, std::nullopt, VersionSpecificParameters()));
session_.GoAway("");
// GoAway timer fires.
auto* goaway_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
@@ -3085,13 +3096,13 @@
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
MoqtSubscribeOk ok = {
/*subscribe_id=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/std::nullopt,
- /*parameters=*/MoqtSubscribeParameters(),
+ /*parameters=*/VersionSpecificParameters(),
};
stream_input->OnSubscribeOkMessage(ok);
constexpr uint64_t kNumStreams = 3;
@@ -3143,13 +3154,13 @@
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
MoqtSubscribeOk ok = {
/*subscribe_id=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/std::nullopt,
- /*parameters=*/MoqtSubscribeParameters(),
+ /*parameters=*/VersionSpecificParameters(),
};
stream_input->OnSubscribeOkMessage(ok);
constexpr uint64_t kNumStreams = 3;
@@ -3198,13 +3209,13 @@
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
- MoqtSubscribeParameters()));
+ VersionSpecificParameters()));
MoqtSubscribeOk ok = {
/*subscribe_id=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/std::nullopt,
- /*parameters=*/MoqtSubscribeParameters(),
+ /*parameters=*/VersionSpecificParameters(),
};
stream_input->OnSubscribeOkMessage(ok);
constexpr uint64_t kNumStreams = 3;
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 2581248..b323979 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -115,15 +115,13 @@
bool end_of_message) = 0;
virtual void OnSubscribeDone(FullTrackName full_track_name) = 0;
};
- SubscribeRemoteTrack(
- const MoqtSubscribe& subscribe, Visitor* visitor,
- quic::QuicTimeDelta delivery_timeout = quic::QuicTimeDelta::Infinite())
+ SubscribeRemoteTrack(const MoqtSubscribe& subscribe, Visitor* visitor)
: RemoteTrack(subscribe.full_track_name, subscribe.subscribe_id,
SubscribeWindow(subscribe.start.value_or(Location()),
subscribe.end_group)),
track_alias_(subscribe.track_alias),
visitor_(visitor),
- delivery_timeout_(delivery_timeout),
+ delivery_timeout_(subscribe.parameters.delivery_timeout),
subscribe_(std::make_unique<MoqtSubscribe>(subscribe)) {}
~SubscribeRemoteTrack() override {
if (subscribe_done_alarm_ != nullptr) {
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 910c433..8b9703e 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -56,7 +56,7 @@
/*group_order=*/std::nullopt,
/*start=*/Location(2, 0),
std::nullopt,
- MoqtSubscribeParameters(),
+ VersionSpecificParameters(),
};
SubscribeRemoteTrack track_;
};
@@ -107,7 +107,7 @@
/*start_object=*/Location(1, 1),
/*end_group=*/3,
/*end_object=*/100,
- /*parameters=*/MoqtSubscribeParameters(),
+ VersionSpecificParameters(),
};
// The pointer held by the application.
UpstreamFetch fetch_;
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.cc b/quiche/quic/moqt/test_tools/mock_moqt_session.cc
index 565b1db..5753aa8 100644
--- a/quiche/quic/moqt/test_tools/mock_moqt_session.cc
+++ b/quiche/quic/moqt/test_tools/mock_moqt_session.cc
@@ -127,14 +127,14 @@
ON_CALL(*this, SubscribeCurrentObject)
.WillByDefault([this](const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters) {
+ VersionSpecificParameters) {
return Subscribe(name, visitor, SubscribeWindow());
});
ON_CALL(*this, SubscribeAbsolute(_, _, _, _, _))
.WillByDefault([this](const FullTrackName& name, uint64_t start_group,
uint64_t start_object,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters) {
+ VersionSpecificParameters) {
return Subscribe(
name, visitor,
SubscribeWindow(Location(start_group, start_object)));
@@ -143,7 +143,7 @@
.WillByDefault([this](const FullTrackName& name, uint64_t start_group,
uint64_t start_object, uint64_t end_group,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters) {
+ VersionSpecificParameters) {
return Subscribe(
name, visitor,
SubscribeWindow(Location(start_group, start_object), end_group));
@@ -158,7 +158,7 @@
Location start, uint64_t end_group,
std::optional<uint64_t> end_object, MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters) {
+ VersionSpecificParameters parameters) {
auto track_publisher = publisher_->GetTrack(name);
if (!track_publisher.ok()) {
std::move(callback)(std::make_unique<MoqtFailedFetch>(
@@ -174,7 +174,7 @@
.WillByDefault([this](const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
uint64_t num_previous_groups,
- MoqtSubscribeParameters parameters) {
+ VersionSpecificParameters parameters) {
return JoiningFetch(
name, visitor,
[name, visitor](std::unique_ptr<MoqtFetchTask> fetch) {
@@ -195,7 +195,7 @@
uint64_t num_previous_groups,
MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters) {
+ VersionSpecificParameters parameters) {
SubscribeCurrentObject(name, visitor, parameters);
auto track_publisher = publisher_->GetTrack(name);
if (!track_publisher.ok()) {
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.h b/quiche/quic/moqt/test_tools/mock_moqt_session.h
index 0deb44d..50405e4 100644
--- a/quiche/quic/moqt/test_tools/mock_moqt_session.h
+++ b/quiche/quic/moqt/test_tools/mock_moqt_session.h
@@ -42,18 +42,18 @@
MOCK_METHOD(bool, SubscribeAbsolute,
(const FullTrackName& name, uint64_t start_group,
uint64_t start_object, SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters),
+ VersionSpecificParameters parameters),
(override));
MOCK_METHOD(bool, SubscribeAbsolute,
(const FullTrackName& name, uint64_t start_group,
uint64_t start_object, uint64_t end_group,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters),
+ VersionSpecificParameters parameters),
(override));
MOCK_METHOD(bool, SubscribeCurrentObject,
(const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters),
+ VersionSpecificParameters parameters),
(override));
MOCK_METHOD(void, Unsubscribe, (const FullTrackName& name), (override));
MOCK_METHOD(bool, Fetch,
@@ -61,13 +61,13 @@
Location start, uint64_t end_group,
std::optional<uint64_t> end_object, MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters),
+ VersionSpecificParameters parameters),
(override));
MOCK_METHOD(bool, JoiningFetch,
(const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
uint64_t num_previous_groups,
- MoqtSubscribeParameters parameters),
+ VersionSpecificParameters parameters),
(override));
MOCK_METHOD(bool, JoiningFetch,
(const FullTrackName& name,
@@ -75,7 +75,7 @@
FetchResponseCallback callback, uint64_t num_previous_groups,
MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
- MoqtSubscribeParameters parameters),
+ VersionSpecificParameters parameters),
(override));
private:
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc b/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc
index f067f31..1df0121 100644
--- a/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc
+++ b/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc
@@ -45,7 +45,7 @@
OnReply(FullTrackName("doesn't", "exist"), Eq(std::nullopt),
Optional(HasSubstr("not found"))));
session_.SubscribeCurrentObject(FullTrackName("doesn't", "exist"), &visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
}
TEST_F(MockMoqtSessionTest, SubscribeCurrentObject) {
@@ -53,7 +53,7 @@
EXPECT_CALL(visitor,
OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt)));
session_.SubscribeCurrentObject(TrackName(), &visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
EXPECT_CALL(visitor,
OnObjectFragment(TrackName(), Location(0, 0), _, _, "test", _));
track_->AddObject(quic::test::MemSliceFromString("test"), /*key=*/true);
@@ -68,7 +68,7 @@
EXPECT_CALL(visitor,
OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt)));
session_.SubscribeAbsolute(TrackName(), 1, 0, 1, &visitor,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(1, 0), _,
MoqtObjectStatus::kNormal, "b", _));
EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(1, 1), _,
@@ -89,7 +89,7 @@
[&](std::unique_ptr<MoqtFetchTask> new_fetch) {
fetch = std::move(new_fetch);
},
- Location(0, 1), 0, 2, 0x80, std::nullopt, MoqtSubscribeParameters());
+ Location(0, 1), 0, 2, 0x80, std::nullopt, VersionSpecificParameters());
PublishedObject object;
ASSERT_EQ(fetch->GetNextObject(object), MoqtFetchTask::kSuccess);
EXPECT_EQ(object.payload.AsStringView(), "b");
@@ -113,7 +113,7 @@
MoqtObjectStatus::kEndOfGroup, "", _));
EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(3, 0), _,
MoqtObjectStatus::kNormal, "d", _));
- session_.JoiningFetch(TrackName(), &visitor, 2, MoqtSubscribeParameters());
+ session_.JoiningFetch(TrackName(), &visitor, 2, VersionSpecificParameters());
EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(3, 1), _,
MoqtObjectStatus::kEndOfGroup, "", _));
EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(4, 0), _,
@@ -125,7 +125,7 @@
testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor;
EXPECT_CALL(visitor,
OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt)));
- session_.JoiningFetch(TrackName(), &visitor, 0, MoqtSubscribeParameters());
+ session_.JoiningFetch(TrackName(), &visitor, 0, VersionSpecificParameters());
EXPECT_CALL(visitor,
OnObjectFragment(TrackName(), Location(0, 0), _, _, "test", _));
track_->AddObject(quic::test::MemSliceFromString("test"), /*key=*/true);
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index a1f4b52..611292a 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -192,7 +192,7 @@
Location{0, 0},
4,
std::nullopt,
- MoqtSubscribeParameters(),
+ VersionSpecificParameters(),
};
std::unique_ptr<MoqtFetchTask> task;
auto [it, success] = session->upstream_by_id_.try_emplace(
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 180ee7a..63b60ec 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -19,6 +19,7 @@
#include "quiche/quic/core/quic_data_reader.h"
#include "quiche/quic/core/quic_data_writer.h"
#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/platform/api/quic_logging.h"
@@ -373,11 +374,12 @@
class QUICHE_NO_EXPORT ClientSetupMessage : public TestMessageBase {
public:
explicit ClientSetupMessage(bool webtrans) : TestMessageBase() {
+ client_setup_.parameters.using_webtrans = webtrans;
if (webtrans) {
// Should not send PATH.
- client_setup_.path = std::nullopt;
- raw_packet_[2] = 0x07; // adjust payload length (-5)
- raw_packet_[6] = 0x01; // only two parameters
+ client_setup_.parameters.path = "";
+ raw_packet_[1] = 0x06; // adjust payload length (-5)
+ raw_packet_[5] = 0x01; // only one parameter
SetWireImage(raw_packet_, sizeof(raw_packet_) - 5);
} else {
SetWireImage(raw_packet_, sizeof(raw_packet_));
@@ -398,24 +400,18 @@
return false;
}
}
- if (cast.path != client_setup_.path) {
- QUIC_LOG(INFO) << "CLIENT_SETUP path mismatch";
- return false;
- }
- if (cast.max_subscribe_id != client_setup_.max_subscribe_id) {
- QUIC_LOG(INFO) << "CLIENT_SETUP max_subscribe_id mismatch";
+ if (cast.parameters != client_setup_.parameters) {
+ QUIC_LOG(INFO) << "CLIENT_SETUP parameter mismatch";
return false;
}
return true;
}
void ExpandVarints() override {
- if (client_setup_.path.has_value()) {
- ExpandVarintsImpl("--vvvvvvv-vv---");
- // first two bytes are already a 2B varint. Also, don't expand parameter
- // varints because that messes up the parameter length field.
+ if (!client_setup_.parameters.path.empty()) {
+ ExpandVarintsImpl("vvvvvvvvvv---");
} else {
- ExpandVarintsImpl("--vvvvvvv-");
+ ExpandVarintsImpl("vvvvvvvv");
}
}
@@ -424,58 +420,51 @@
}
private:
- uint8_t raw_packet_[15] = {
- 0x40, 0x40, 0x0c, // type
+ uint8_t raw_packet_[13] = {
+ 0x20, 0x0b, // type
0x02, 0x01, 0x02, // versions
- 0x02, // 3 parameters
- 0x02, 0x01, 0x32, // max_subscribe_id = 50
+ 0x02, // 2 parameters
+ 0x02, 0x32, // max_request_id = 50
0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
};
MoqtClientSetup client_setup_ = {
/*supported_versions=*/std::vector<MoqtVersion>(
{static_cast<MoqtVersion>(1), static_cast<MoqtVersion>(2)}),
- /*path=*/"foo",
- /*max_subscribe_id=*/50,
+ MoqtSessionParameters(quic::Perspective::IS_CLIENT, "foo", 50),
};
};
class QUICHE_NO_EXPORT ServerSetupMessage : public TestMessageBase {
public:
- explicit ServerSetupMessage() : TestMessageBase() {
+ explicit ServerSetupMessage(bool webtrans) : TestMessageBase() {
+ server_setup_.parameters.using_webtrans = webtrans;
SetWireImage(raw_packet_, sizeof(raw_packet_));
}
bool EqualFieldValues(MessageStructuredData& values) const override {
auto cast = std::get<MoqtServerSetup>(values);
- if (cast.selected_version != server_setup_.selected_version) {
- QUIC_LOG(INFO) << "SERVER_SETUP selected version mismatch";
- return false;
- }
- if (cast.max_subscribe_id != server_setup_.max_subscribe_id) {
- QUIC_LOG(INFO) << "SERVER_SETUP max_subscribe_id mismatch";
+ if (cast.parameters != server_setup_.parameters) {
+ QUIC_LOG(INFO) << "SERVER_SETUP parameter mismatch";
return false;
}
return true;
}
- void ExpandVarints() override {
- ExpandVarintsImpl("--vvvvv-"); // first two bytes are already a 2b
- // varint
- }
+ void ExpandVarints() override { ExpandVarintsImpl("vvvvvv"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(server_setup_);
}
private:
- uint8_t raw_packet_[8] = {
- 0x40, 0x41, 0x05, // type
- 0x01, 0x01, // version, two parameters
- 0x02, 0x01, 0x32, // max_subscribe_id = 50
+ uint8_t raw_packet_[6] = {
+ 0x21, 0x04, // type
+ 0x01, 0x01, // version, one parameter
+ 0x02, 0x32, // max_request_id = 50
};
MoqtServerSetup server_setup_ = {
/*selected_version=*/static_cast<MoqtVersion>(1),
- /*max_subscribe_id=*/50,
+ MoqtSessionParameters(quic::Perspective::IS_SERVER, 50),
};
};
@@ -523,7 +512,7 @@
}
void ExpandVarints() override {
- ExpandVarintsImpl("vvvvvv---v------vvvvvv---vv--vv--");
+ ExpandVarintsImpl("vvvvvv---v------vvvvvv---v--");
}
MessageStructuredData structured_data() const override {
@@ -531,8 +520,8 @@
}
private:
- uint8_t raw_packet_[33] = {
- 0x03, 0x1f, 0x01, 0x02, // id and alias
+ uint8_t raw_packet_[28] = {
+ 0x03, 0x1a, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, // subscriber priority = 0x20
@@ -541,10 +530,9 @@
0x04, // start_group = 4 (relative previous)
0x01, // start_object = 1 (absolute)
// No EndGroup or EndObject
- 0x03, // 3 parameters
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
- 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms
- 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000 ms
+ 0x02, // 2 parameters
+ 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
MoqtSubscribe subscribe_ = {
@@ -555,10 +543,8 @@
/*group_order=*/MoqtDeliveryOrder::kDescending,
/*start=*/Location(4, 1),
/*end_group=*/std::nullopt,
- /*parameters=*/
- MoqtSubscribeParameters{
- "bar", quic::QuicTimeDelta::FromMilliseconds(10000),
- quic::QuicTimeDelta::FromMilliseconds(10000), std::nullopt},
+ VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000),
+ "bar"),
};
};
@@ -593,7 +579,7 @@
return true;
}
- void ExpandVarints() override { ExpandVarintsImpl("vvvv--vvvvv--vv--"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvvv--vvvv--v--"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(subscribe_ok_);
@@ -610,13 +596,13 @@
}
private:
- uint8_t raw_packet_[17] = {
- 0x04, 0x0f, 0x01, 0x03, // subscribe_id = 1, expires = 3
+ uint8_t raw_packet_[15] = {
+ 0x04, 0x0d, 0x01, 0x03, // subscribe_id = 1, expires = 3
0x02, 0x01, // group_order = 2, content exists
0x0c, 0x14, // largest_group_id = 12, largest_object_id = 20,
0x02, // 2 parameters
- 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000
- 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000
+ 0x02, 0x67, 0x10, // delivery_timeout = 10000
+ 0x04, 0x67, 0x10, // max_cache_duration = 10000
};
MoqtSubscribeOk subscribe_ok_ = {
@@ -624,10 +610,8 @@
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(3),
/*group_order=*/MoqtDeliveryOrder::kDescending,
/*largest_id=*/Location(12, 20),
- /*parameters=*/
- MoqtSubscribeParameters{
- std::nullopt, quic::QuicTimeDelta::FromMilliseconds(10000),
- quic::QuicTimeDelta::FromMilliseconds(10000), std::nullopt},
+ VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000),
+ quic::QuicTimeDelta::FromMilliseconds(10000)),
};
};
@@ -792,19 +776,18 @@
return true;
}
- void ExpandVarints() override { ExpandVarintsImpl("vvvvvv-vvv---"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvvvvv-vv--"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(subscribe_update_);
}
private:
- uint8_t raw_packet_[16] = {
- 0x02, 0x0e, 0x02, 0x03, 0x01, 0x05, // start and end sequences
+ uint8_t raw_packet_[11] = {
+ 0x02, 0x09, 0x02, 0x03, 0x01, 0x05, // start and end sequences
0xaa, // subscriber_priority
- 0x02, // 2 parameters
- 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000
- 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000
+ 0x01, // 1 parameter
+ 0x02, 0x67, 0x10, // delivery_timeout = 10000
};
MoqtSubscribeUpdate subscribe_update_ = {
@@ -812,10 +795,8 @@
/*start=*/Location(3, 1),
/*end_group=*/4,
/*subscriber_priority=*/0xaa,
- /*parameters=*/
- MoqtSubscribeParameters{
- std::nullopt, quic::QuicTimeDelta::FromMilliseconds(10000),
- quic::QuicTimeDelta::FromMilliseconds(10000), std::nullopt},
+ VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000),
+ quic::QuicTimeDelta::Infinite()),
};
};
@@ -832,32 +813,28 @@
return false;
}
if (cast.parameters != announce_.parameters) {
- QUIC_LOG(INFO) << "ANNOUNCE MESSAGE authorization info mismatch";
+ QUIC_LOG(INFO) << "ANNOUNCE MESSAGE parameter mismatch";
return false;
}
return true;
}
- void ExpandVarints() override { ExpandVarintsImpl("vvvv---vvv---vv--"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvvv---vvv---"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(announce_);
}
private:
- uint8_t raw_packet_[17] = {
- 0x06, 0x0f, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
- 0x02, // 2 parameters
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
- 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000ms
+ uint8_t raw_packet_[13] = {
+ 0x06, 0x0b, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x01, // 1 parameter
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
MoqtAnnounce announce_ = {
/*track_namespace=*/FullTrackName{"foo"},
- /*parameters=*/
- MoqtSubscribeParameters{"bar", std::nullopt,
- quic::QuicTimeDelta::FromMilliseconds(10000),
- std::nullopt},
+ VersionSpecificParameters("bar"),
};
};
@@ -990,23 +967,30 @@
QUIC_LOG(INFO) << "TRACK STATUS REQUEST track name mismatch";
return false;
}
+ if (cast.parameters != track_status_request_.parameters) {
+ QUIC_LOG(INFO) << "TRACK STATUS REQUEST parameter mismatch";
+ return false;
+ }
return true;
}
- void ExpandVarints() override { ExpandVarintsImpl("vvvv---v----"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvvv---v----vvv---"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(track_status_request_);
}
private:
- uint8_t raw_packet_[12] = {
- 0x0d, 0x0a, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ uint8_t raw_packet_[18] = {
+ 0x0d, 0x10, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
+ 0x01, // 1 parameter
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
MoqtTrackStatusRequest track_status_request_ = {
/*full_track_name=*/FullTrackName({"foo", "abcd"}),
+ VersionSpecificParameters("bar"),
};
};
@@ -1065,20 +1049,27 @@
QUIC_LOG(INFO) << "TRACK STATUS last object mismatch";
return false;
}
+ if (cast.parameters != track_status_.parameters) {
+ QUIC_LOG(INFO) << "TRACK STATUS parameters mismatch";
+ return false;
+ }
return true;
}
- void ExpandVarints() override { ExpandVarintsImpl("vvvv---v----vvv"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvvv---v----vvvvv--v--"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(track_status_);
}
private:
- uint8_t raw_packet_[15] = {
- 0x0e, 0x0d, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ uint8_t raw_packet_[22] = {
+ 0x0e, 0x14, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x00, 0x0c, 0x14, // status, last_group, last_object
+ 0x02, // 2 parameters
+ 0x02, 0x67, 0x10, // Delivery Timeout = 10000
+ 0x04, 0x67, 0x10, // Max Cache Duration = 10000
};
MoqtTrackStatus track_status_ = {
@@ -1086,6 +1077,8 @@
/*status_code=*/MoqtTrackStatusCode::kInProgress,
/*last_group=*/12,
/*last_object=*/20,
+ VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000),
+ quic::QuicTimeDelta::FromMilliseconds(10000)),
};
};
@@ -1149,13 +1142,12 @@
uint8_t raw_packet_[13] = {
0x11, 0x0b, 0x01, 0x03, 0x66, 0x6f, 0x6f, // namespace = "foo"
0x01, // 1 parameter
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
};
MoqtSubscribeAnnounces subscribe_namespace_ = {
/*track_namespace=*/FullTrackName{"foo"},
- /*parameters=*/
- MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, std::nullopt},
+ VersionSpecificParameters("bar"),
};
};
@@ -1391,7 +1383,7 @@
0x03, 0x62, 0x61, 0x72, // track_name = "bar"
0x01, 0x02, // start_object = 1, 2
0x05, 0x07, // end_object = 5, 6
- 0x01, 0x02, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz"
+ 0x01, 0x03, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz"
};
MoqtFetch fetch_ = {
@@ -1403,8 +1395,7 @@
/*start_object=*/Location{1, 2},
/*end_group=*/5,
/*end_object=*/6,
- /*parameters=*/
- MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt},
+ VersionSpecificParameters("baz"),
};
};
@@ -1490,7 +1481,7 @@
0x01, // group_order = kAscending
0x02, // type = kJoining
0x02, 0x02, // joining_subscribe_id = 2, 2 groups
- 0x01, 0x02, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz"
+ 0x01, 0x03, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz"
};
MoqtFetch fetch_ = {
@@ -1503,8 +1494,7 @@
/*start_object=*/Location{1, 2},
/*end_group=*/5,
/*end_object=*/6,
- /*parameters=*/
- MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt},
+ VersionSpecificParameters("baz"),
};
};
@@ -1572,20 +1562,20 @@
}
private:
- uint8_t raw_packet_[12] = {
- 0x18, 0x0a,
- 0x01, // subscribe_id = 1
- 0x01, // group_order = kAscending
- 0x05, 0x04, // largest_object = 5, 4
- 0x01, 0x02, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz"
+ uint8_t raw_packet_[10] = {
+ 0x18, 0x08,
+ 0x01, // subscribe_id = 1
+ 0x01, // group_order = kAscending
+ 0x05, 0x04, // largest_object = 5, 4
+ 0x01, 0x04, 0x67, 0x10, // MaxCacheDuration = 10000
};
MoqtFetchOk fetch_ok_ = {
/*subscribe_id =*/1,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*start_object=*/Location{5, 4},
- /*parameters=*/
- MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt},
+ VersionSpecificParameters(quic::QuicTimeDelta::Infinite(),
+ quic::QuicTimeDelta::FromMilliseconds(10000)),
};
};
@@ -1768,7 +1758,7 @@
case MoqtMessageType::kClientSetup:
return std::make_unique<ClientSetupMessage>(is_webtrans);
case MoqtMessageType::kServerSetup:
- return std::make_unique<ServerSetupMessage>();
+ return std::make_unique<ServerSetupMessage>(is_webtrans);
default:
return nullptr;
}
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index 87d17ca..e10ae33 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -69,10 +69,10 @@
"do not subscribe\n";
return std::nullopt;
}
- if (session_->SubscribeCurrentObject(
- *track_name, &remote_track_visitor_,
- MoqtSubscribeParameters{std::string(GetUsername(my_track_name_)),
- std::nullopt, std::nullopt, std::nullopt})) {
+ VersionSpecificParameters parameters(
+ std::string(GetUsername(my_track_name_)));
+ if (session_->SubscribeCurrentObject(*track_name, &remote_track_visitor_,
+ parameters)) {
++subscribes_to_make_;
other_users_.emplace(*track_name);
}
@@ -230,7 +230,7 @@
std::cout << "Announcing " << GetUserNamespace(my_track_name_).ToString()
<< "\n";
session_->Announce(GetUserNamespace(my_track_name_),
- std::move(announce_callback));
+ std::move(announce_callback), VersionSpecificParameters());
// Send SUBSCRIBE_ANNOUNCE. Pop 3 levels of namespace to get to {moq-chat,
// chat-id}
@@ -248,10 +248,11 @@
<< " accepted\n";
return;
};
- session_->SubscribeAnnounces(
- GetChatNamespace(my_track_name_), std::move(subscribe_announces_callback),
- MoqtSubscribeParameters{std::string(GetUsername(my_track_name_)),
- std::nullopt, std::nullopt, std::nullopt});
+ VersionSpecificParameters parameters(
+ std::string(GetUsername(my_track_name_)));
+ session_->SubscribeAnnounces(GetChatNamespace(my_track_name_),
+ std::move(subscribe_announces_callback),
+ parameters);
while (session_is_open_ && is_syncing()) {
RunEventLoop();
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc
index 3fae683..46a4e89 100644
--- a/quiche/quic/moqt/tools/chat_server.cc
+++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -49,8 +49,9 @@
return std::nullopt;
}
std::cout << "Received ANNOUNCE for " << track_namespace.ToString() << "\n";
- session_->SubscribeCurrentObject(
- *track_name_, server_->remote_track_visitor(), MoqtSubscribeParameters());
+ session_->SubscribeCurrentObject(*track_name_,
+ server_->remote_track_visitor(),
+ moqt::VersionSpecificParameters());
server_->AddUser(*track_name_);
return std::nullopt;
}
@@ -113,7 +114,8 @@
GetUserNamespace(track_name),
absl::bind_front(&ChatServer::ChatServerSessionHandler::
OnOutgoingAnnounceReply,
- this));
+ this),
+ moqt::VersionSpecificParameters());
}
return std::optional<MoqtSubscribeErrorReason>();
};
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
index 6e0dd7e..0cc404e 100644
--- a/quiche/quic/moqt/tools/chat_server.h
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -71,7 +71,8 @@
track_namespace,
absl::bind_front(&ChatServer::ChatServerSessionHandler::
OnOutgoingAnnounceReply,
- this));
+ this),
+ VersionSpecificParameters());
return;
}
}
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
index 85de291..8a2a6c3 100644
--- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -155,7 +155,7 @@
FullTrackName full_track_name = track_namespace;
full_track_name.AddElement(track);
session_->JoiningFetch(full_track_name, &it->second, 0,
- MoqtSubscribeParameters());
+ VersionSpecificParameters());
}
return std::nullopt;
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index 59f8ee9..a2f3b67 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -442,7 +442,7 @@
// server does not yet have an active subscription, so the client has
// some catching up to do.
generator_.Start();
- MoqtSubscribeParameters subscription_parameters;
+ VersionSpecificParameters subscription_parameters;
if (!parameters_.delivery_timeout.IsInfinite()) {
subscription_parameters.delivery_timeout = parameters_.delivery_timeout;
}