blob: e9a093b0f84e07871382476e0bbd6b20138bf9df [file] [log] [blame]
// Copyright (c) 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_parser.h"
#include <algorithm>
#include <array>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <optional>
#include <string>
#include "absl/base/casts.h"
#include "absl/cleanup/cleanup.h"
#include "absl/container/fixed_array.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "quiche/quic/core/quic_data_reader.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_data_reader.h"
#include "quiche/common/quiche_stream.h"
namespace moqt {
namespace {
bool ParseDeliveryOrder(uint8_t raw_value,
std::optional<MoqtDeliveryOrder>& output) {
switch (raw_value) {
case 0x00:
output = std::nullopt;
return true;
case 0x01:
output = MoqtDeliveryOrder::kAscending;
return true;
case 0x02:
output = MoqtDeliveryOrder::kDescending;
return true;
default:
return false;
}
}
uint64_t SignedVarintUnserializedForm(uint64_t value) {
if (value & 0x01) {
return -(value >> 1);
}
return value >> 1;
}
bool IsAllowedStreamType(uint64_t value) {
constexpr std::array kAllowedStreamTypes = {
MoqtDataStreamType::kStreamHeaderSubgroup,
MoqtDataStreamType::kStreamHeaderFetch, MoqtDataStreamType::kPadding};
for (MoqtDataStreamType type : kAllowedStreamTypes) {
if (static_cast<uint64_t>(type) == value) {
return true;
}
}
return false;
}
std::optional<uint64_t> ReadVarInt62FromStream(quiche::ReadStream& stream,
bool& fin_read) {
fin_read = false;
quiche::ReadStream::PeekResult peek_result = stream.PeekNextReadableRegion();
if (peek_result.peeked_data.empty()) {
if (peek_result.fin_next) {
fin_read = stream.SkipBytes(0);
QUICHE_DCHECK(fin_read);
}
return std::nullopt;
}
char first_byte = peek_result.peeked_data[0];
size_t varint_size =
1 << ((absl::bit_cast<uint8_t>(first_byte) & 0b11000000) >> 6);
if (stream.ReadableBytes() < varint_size) {
return std::nullopt;
}
char buffer[8];
absl::Span<char> bytes_to_read =
absl::MakeSpan(buffer).subspan(0, varint_size);
quiche::ReadStream::ReadResult read_result = stream.Read(bytes_to_read);
QUICHE_DCHECK_EQ(read_result.bytes_read, varint_size);
fin_read = read_result.fin;
quiche::QuicheDataReader reader(buffer, read_result.bytes_read);
uint64_t result;
bool success = reader.ReadVarInt62(&result);
QUICHE_DCHECK(success);
QUICHE_DCHECK(reader.IsDoneReading());
return result;
}
// Reads from |reader| to list. Returns false if there is a read error.
bool ParseKeyValuePairList(quic::QuicDataReader& reader,
KeyValuePairList& list) {
list.clear();
uint64_t num_params;
if (!reader.ReadVarInt62(&num_params)) {
return false;
}
for (uint64_t i = 0; i < num_params; ++i) {
uint64_t type;
if (!reader.ReadVarInt62(&type)) {
return false;
}
if (type % 2 == 1) {
absl::string_view bytes;
if (!reader.ReadStringPieceVarInt62(&bytes)) {
return false;
}
list.insert(type, bytes);
continue;
}
uint64_t value;
if (!reader.ReadVarInt62(&value)) {
return false;
}
list.insert(type, value);
}
return true;
}
void KeyValuePairListToMoqtSessionParameters(const KeyValuePairList& parameters,
MoqtSessionParameters& out) {
parameters.ForEach(
[&](uint64_t key, uint64_t value) {
SetupParameter parameter = static_cast<SetupParameter>(key);
switch (parameter) {
case SetupParameter::kMaxRequestId:
out.max_request_id = value;
break;
case SetupParameter::kMaxAuthTokenCacheSize:
out.max_auth_token_cache_size = value;
break;
case SetupParameter::kSupportObjectAcks:
out.support_object_acks = (value == 1);
break;
default:
break;
}
return true;
},
[&](uint64_t key, absl::string_view value) {
SetupParameter parameter = static_cast<SetupParameter>(key);
switch (parameter) {
case SetupParameter::kPath:
out.path = value;
break;
default:
break;
}
return true;
});
}
} // namespace
void MoqtControlParser::ReadAndDispatchMessages() {
if (no_more_data_) {
ParseError("Data after end of stream");
return;
}
if (processing_) {
return;
}
processing_ = true;
auto on_return = absl::MakeCleanup([&] { processing_ = false; });
while (!no_more_data_) {
bool fin_read = false;
// Read the message type.
if (!message_type_.has_value()) {
message_type_ = ReadVarInt62FromStream(stream_, fin_read);
if (fin_read) {
ParseError("FIN on control stream");
return;
}
if (!message_type_.has_value()) {
return;
}
}
QUICHE_DCHECK(message_type_.has_value());
// Read the message length.
if (!message_size_.has_value()) {
if (stream_.ReadableBytes() < 2) {
return;
}
std::array<char, 2> size_bytes;
quiche::ReadStream::ReadResult result =
stream_.Read(absl::MakeSpan(size_bytes));
if (result.bytes_read != 2) {
ParseError(MoqtError::kInternalError,
"Stream returned incorrect ReadableBytes");
return;
}
if (result.fin) {
ParseError("FIN on control stream");
return;
}
message_size_ = static_cast<uint16_t>(size_bytes[0]) << 8 |
static_cast<uint16_t>(size_bytes[1]);
if (*message_size_ > kMaxMessageHeaderSize) {
ParseError(MoqtError::kInternalError,
absl::StrCat("Cannot parse control messages more than ",
kMaxMessageHeaderSize, " bytes"));
return;
}
}
QUICHE_DCHECK(message_size_.has_value());
// Read the message if it's fully received.
//
// CAUTION: if the flow control windows are too low, and
// kMaxMessageHeaderSize is too high, this will cause a deadlock.
if (stream_.ReadableBytes() < *message_size_) {
return;
}
absl::FixedArray<char> message(*message_size_);
quiche::ReadStream::ReadResult result =
stream_.Read(absl::MakeSpan(message));
if (result.bytes_read != *message_size_) {
ParseError("Stream returned incorrect ReadableBytes");
return;
}
if (result.fin) {
ParseError("FIN on control stream");
return;
}
ProcessMessage(absl::string_view(message.data(), message.size()),
static_cast<MoqtMessageType>(*message_type_));
message_type_.reset();
message_size_.reset();
}
}
size_t MoqtControlParser::ProcessMessage(absl::string_view data,
MoqtMessageType message_type) {
quic::QuicDataReader reader(data);
size_t bytes_read;
switch (message_type) {
case MoqtMessageType::kClientSetup:
bytes_read = ProcessClientSetup(reader);
break;
case MoqtMessageType::kServerSetup:
bytes_read = ProcessServerSetup(reader);
break;
case MoqtMessageType::kSubscribe:
bytes_read = ProcessSubscribe(reader);
break;
case MoqtMessageType::kSubscribeOk:
bytes_read = ProcessSubscribeOk(reader);
break;
case MoqtMessageType::kSubscribeError:
bytes_read = ProcessSubscribeError(reader);
break;
case MoqtMessageType::kUnsubscribe:
bytes_read = ProcessUnsubscribe(reader);
break;
case MoqtMessageType::kSubscribeDone:
bytes_read = ProcessSubscribeDone(reader);
break;
case MoqtMessageType::kSubscribeUpdate:
bytes_read = ProcessSubscribeUpdate(reader);
break;
case MoqtMessageType::kAnnounce:
bytes_read = ProcessAnnounce(reader);
break;
case MoqtMessageType::kAnnounceOk:
bytes_read = ProcessAnnounceOk(reader);
break;
case MoqtMessageType::kAnnounceError:
bytes_read = ProcessAnnounceError(reader);
break;
case MoqtMessageType::kAnnounceCancel:
bytes_read = ProcessAnnounceCancel(reader);
break;
case MoqtMessageType::kTrackStatusRequest:
bytes_read = ProcessTrackStatusRequest(reader);
break;
case MoqtMessageType::kUnannounce:
bytes_read = ProcessUnannounce(reader);
break;
case MoqtMessageType::kTrackStatus:
bytes_read = ProcessTrackStatus(reader);
break;
case MoqtMessageType::kGoAway:
bytes_read = ProcessGoAway(reader);
break;
case MoqtMessageType::kSubscribeAnnounces:
bytes_read = ProcessSubscribeAnnounces(reader);
break;
case MoqtMessageType::kSubscribeAnnouncesOk:
bytes_read = ProcessSubscribeAnnouncesOk(reader);
break;
case MoqtMessageType::kSubscribeAnnouncesError:
bytes_read = ProcessSubscribeAnnouncesError(reader);
break;
case MoqtMessageType::kUnsubscribeAnnounces:
bytes_read = ProcessUnsubscribeAnnounces(reader);
break;
case MoqtMessageType::kMaxRequestId:
bytes_read = ProcessMaxRequestId(reader);
break;
case MoqtMessageType::kFetch:
bytes_read = ProcessFetch(reader);
break;
case MoqtMessageType::kFetchCancel:
bytes_read = ProcessFetchCancel(reader);
break;
case MoqtMessageType::kFetchOk:
bytes_read = ProcessFetchOk(reader);
break;
case MoqtMessageType::kFetchError:
bytes_read = ProcessFetchError(reader);
break;
case MoqtMessageType::kRequestsBlocked:
bytes_read = ProcessRequestsBlocked(reader);
break;
case moqt::MoqtMessageType::kObjectAck:
bytes_read = ProcessObjectAck(reader);
break;
default:
ParseError("Unknown message type");
bytes_read = 0;
break;
}
if (bytes_read != data.size() || bytes_read == 0) {
ParseError("Message length does not match payload length");
return 0;
}
return bytes_read;
}
size_t MoqtControlParser::ProcessClientSetup(quic::QuicDataReader& reader) {
MoqtClientSetup setup;
setup.parameters.using_webtrans = uses_web_transport_;
setup.parameters.perspective = quic::Perspective::IS_CLIENT;
uint64_t number_of_supported_versions;
if (!reader.ReadVarInt62(&number_of_supported_versions)) {
return 0;
}
uint64_t version;
for (uint64_t i = 0; i < number_of_supported_versions; ++i) {
if (!reader.ReadVarInt62(&version)) {
return 0;
}
setup.supported_versions.push_back(static_cast<MoqtVersion>(version));
}
KeyValuePairList parameters;
if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
MoqtError error = ValidateSetupParameters(parameters, uses_web_transport_,
quic::Perspective::IS_SERVER);
if (error != MoqtError::kNoError) {
ParseError(error, "Client SETUP contains invalid parameters");
return 0;
}
KeyValuePairListToMoqtSessionParameters(parameters, setup.parameters);
// TODO(martinduke): Validate construction of the PATH (Sec 8.3.2.1)
visitor_.OnClientSetupMessage(setup);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessServerSetup(quic::QuicDataReader& reader) {
MoqtServerSetup setup;
setup.parameters.using_webtrans = uses_web_transport_;
setup.parameters.perspective = quic::Perspective::IS_SERVER;
uint64_t version;
if (!reader.ReadVarInt62(&version)) {
return 0;
}
setup.selected_version = static_cast<MoqtVersion>(version);
KeyValuePairList parameters;
if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
MoqtError error = ValidateSetupParameters(parameters, uses_web_transport_,
quic::Perspective::IS_CLIENT);
if (error != MoqtError::kNoError) {
ParseError(error, "Server SETUP contains invalid parameters");
return 0;
}
KeyValuePairListToMoqtSessionParameters(parameters, setup.parameters);
visitor_.OnServerSetupMessage(setup);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessSubscribe(quic::QuicDataReader& reader) {
MoqtSubscribe subscribe;
uint64_t filter, group, object;
uint8_t group_order, forward;
absl::string_view track_name;
if (!reader.ReadVarInt62(&subscribe.request_id) ||
!reader.ReadVarInt62(&subscribe.track_alias) ||
!ReadTrackNamespace(reader, subscribe.full_track_name) ||
!reader.ReadStringPieceVarInt62(&track_name) ||
!reader.ReadUInt8(&subscribe.subscriber_priority) ||
!reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&forward) ||
!reader.ReadVarInt62(&filter)) {
return 0;
}
subscribe.full_track_name.AddElement(track_name);
if (!ParseDeliveryOrder(group_order, subscribe.group_order)) {
ParseError("Invalid group order value in SUBSCRIBE");
return 0;
}
if (forward > 1) {
ParseError("Invalid forward value in SUBSCRIBE");
return 0;
}
subscribe.forward = (forward == 1);
subscribe.filter_type = static_cast<MoqtFilterType>(filter);
switch (subscribe.filter_type) {
case MoqtFilterType::kNextGroupStart:
case MoqtFilterType::kLatestObject:
break;
case MoqtFilterType::kAbsoluteStart:
case MoqtFilterType::kAbsoluteRange:
if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) {
return 0;
}
subscribe.start = Location(group, object);
if (subscribe.filter_type == MoqtFilterType::kAbsoluteStart) {
break;
}
if (!reader.ReadVarInt62(&group)) {
return 0;
}
subscribe.end_group = group;
if (*subscribe.end_group < subscribe.start->group) {
ParseError("End group is less than start group");
return 0;
}
break;
default:
ParseError("Invalid filter type");
return 0;
}
KeyValuePairList parameters;
if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
if (!ValidateVersionSpecificParameters(parameters,
MoqtMessageType::kSubscribe)) {
ParseError("SUBSCRIBE contains invalid parameters");
return 0;
}
if (!KeyValuePairListToVersionSpecificParameters(parameters,
subscribe.parameters)) {
return 0;
}
visitor_.OnSubscribeMessage(subscribe);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessSubscribeOk(quic::QuicDataReader& reader) {
MoqtSubscribeOk subscribe_ok;
uint64_t milliseconds;
uint8_t group_order;
uint8_t content_exists;
if (!reader.ReadVarInt62(&subscribe_ok.subscribe_id) ||
!reader.ReadVarInt62(&milliseconds) || !reader.ReadUInt8(&group_order) ||
!reader.ReadUInt8(&content_exists)) {
return 0;
}
if (content_exists > 1) {
ParseError("SUBSCRIBE_OK ContentExists has invalid value");
return 0;
}
if (group_order != 0x01 && group_order != 0x02) {
ParseError("Invalid group order value in SUBSCRIBE_OK");
return 0;
}
subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds);
subscribe_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
if (content_exists) {
subscribe_ok.largest_id = Location();
if (!reader.ReadVarInt62(&subscribe_ok.largest_id->group) ||
!reader.ReadVarInt62(&subscribe_ok.largest_id->object)) {
return 0;
}
}
KeyValuePairList parameters;
if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
if (!ValidateVersionSpecificParameters(parameters,
MoqtMessageType::kSubscribeOk)) {
ParseError("SUBSCRIBE_OK contains invalid parameters");
return 0;
}
if (!KeyValuePairListToVersionSpecificParameters(parameters,
subscribe_ok.parameters)) {
return 0;
}
visitor_.OnSubscribeOkMessage(subscribe_ok);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessSubscribeError(quic::QuicDataReader& reader) {
MoqtSubscribeError subscribe_error;
uint64_t error_code;
if (!reader.ReadVarInt62(&subscribe_error.subscribe_id) ||
!reader.ReadVarInt62(&error_code) ||
!reader.ReadStringVarInt62(subscribe_error.reason_phrase) ||
!reader.ReadVarInt62(&subscribe_error.track_alias)) {
return 0;
}
subscribe_error.error_code = static_cast<SubscribeErrorCode>(error_code);
visitor_.OnSubscribeErrorMessage(subscribe_error);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessUnsubscribe(quic::QuicDataReader& reader) {
MoqtUnsubscribe unsubscribe;
if (!reader.ReadVarInt62(&unsubscribe.subscribe_id)) {
return 0;
}
visitor_.OnUnsubscribeMessage(unsubscribe);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessSubscribeDone(quic::QuicDataReader& reader) {
MoqtSubscribeDone subscribe_done;
uint64_t value;
if (!reader.ReadVarInt62(&subscribe_done.subscribe_id) ||
!reader.ReadVarInt62(&value) ||
!reader.ReadVarInt62(&subscribe_done.stream_count) ||
!reader.ReadStringVarInt62(subscribe_done.reason_phrase)) {
return 0;
}
subscribe_done.status_code = static_cast<SubscribeDoneCode>(value);
visitor_.OnSubscribeDoneMessage(subscribe_done);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessSubscribeUpdate(quic::QuicDataReader& reader) {
MoqtSubscribeUpdate subscribe_update;
uint64_t start_group, start_object, end_group;
if (!reader.ReadVarInt62(&subscribe_update.subscribe_id) ||
!reader.ReadVarInt62(&start_group) ||
!reader.ReadVarInt62(&start_object) || !reader.ReadVarInt62(&end_group) ||
!reader.ReadUInt8(&subscribe_update.subscriber_priority)) {
return 0;
}
KeyValuePairList parameters;
if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
if (!ValidateVersionSpecificParameters(parameters,
MoqtMessageType::kSubscribeUpdate)) {
ParseError("SUBSCRIBE_UPDATE contains invalid parameters");
return 0;
}
if (!KeyValuePairListToVersionSpecificParameters(
parameters, subscribe_update.parameters)) {
return 0;
}
subscribe_update.start = Location(start_group, start_object);
if (end_group > 0) {
subscribe_update.end_group = end_group - 1;
if (subscribe_update.end_group < start_group) {
ParseError("End group is less than start group");
return 0;
}
}
visitor_.OnSubscribeUpdateMessage(subscribe_update);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessAnnounce(quic::QuicDataReader& reader) {
MoqtAnnounce announce;
if (!ReadTrackNamespace(reader, announce.track_namespace)) {
return 0;
}
KeyValuePairList parameters;
if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
if (!ValidateVersionSpecificParameters(parameters,
MoqtMessageType::kAnnounce)) {
ParseError("ANNOUNCE contains invalid parameters");
return 0;
}
if (!KeyValuePairListToVersionSpecificParameters(parameters,
announce.parameters)) {
return 0;
}
visitor_.OnAnnounceMessage(announce);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessAnnounceOk(quic::QuicDataReader& reader) {
MoqtAnnounceOk announce_ok;
if (!ReadTrackNamespace(reader, announce_ok.track_namespace)) {
return 0;
}
visitor_.OnAnnounceOkMessage(announce_ok);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessAnnounceError(quic::QuicDataReader& reader) {
MoqtAnnounceError announce_error;
if (!ReadTrackNamespace(reader, announce_error.track_namespace)) {
return 0;
}
uint64_t error_code;
if (!reader.ReadVarInt62(&error_code) ||
!reader.ReadStringVarInt62(announce_error.reason_phrase)) {
return 0;
}
announce_error.error_code = static_cast<SubscribeErrorCode>(error_code);
visitor_.OnAnnounceErrorMessage(announce_error);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessAnnounceCancel(quic::QuicDataReader& reader) {
MoqtAnnounceCancel announce_cancel;
if (!ReadTrackNamespace(reader, announce_cancel.track_namespace)) {
return 0;
}
uint64_t error_code;
if (!reader.ReadVarInt62(&error_code) ||
!reader.ReadStringVarInt62(announce_cancel.reason_phrase)) {
return 0;
}
announce_cancel.error_code = static_cast<SubscribeErrorCode>(error_code);
visitor_.OnAnnounceCancelMessage(announce_cancel);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessTrackStatusRequest(
quic::QuicDataReader& reader) {
MoqtTrackStatusRequest track_status_request;
if (!ReadTrackNamespace(reader, track_status_request.full_track_name)) {
return 0;
}
absl::string_view name;
if (!reader.ReadStringPieceVarInt62(&name)) {
return 0;
}
track_status_request.full_track_name.AddElement(name);
KeyValuePairList parameters;
if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
if (!ValidateVersionSpecificParameters(
parameters, MoqtMessageType::kTrackStatusRequest)) {
ParseError("TRACK_STATUS_REQUEST message contains invalid parameters");
return 0;
}
if (!KeyValuePairListToVersionSpecificParameters(
parameters, track_status_request.parameters)) {
return 0;
}
visitor_.OnTrackStatusRequestMessage(track_status_request);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessUnannounce(quic::QuicDataReader& reader) {
MoqtUnannounce unannounce;
if (!ReadTrackNamespace(reader, unannounce.track_namespace)) {
return 0;
}
visitor_.OnUnannounceMessage(unannounce);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessTrackStatus(quic::QuicDataReader& reader) {
MoqtTrackStatus track_status;
if (!ReadTrackNamespace(reader, track_status.full_track_name)) {
return 0;
}
absl::string_view name;
if (!reader.ReadStringPieceVarInt62(&name)) {
return 0;
}
track_status.full_track_name.AddElement(name);
uint64_t value;
if (!reader.ReadVarInt62(&value) ||
!reader.ReadVarInt62(&track_status.last_group) ||
!reader.ReadVarInt62(&track_status.last_object)) {
return 0;
}
track_status.status_code = static_cast<MoqtTrackStatusCode>(value);
KeyValuePairList parameters;
if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
if (!ValidateVersionSpecificParameters(parameters,
MoqtMessageType::kTrackStatus)) {
ParseError("TRACK_STATUS message contains invalid parameters");
return 0;
}
if (!KeyValuePairListToVersionSpecificParameters(parameters,
track_status.parameters)) {
return 0;
}
visitor_.OnTrackStatusMessage(track_status);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessGoAway(quic::QuicDataReader& reader) {
MoqtGoAway goaway;
if (!reader.ReadStringVarInt62(goaway.new_session_uri)) {
return 0;
}
visitor_.OnGoAwayMessage(goaway);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessSubscribeAnnounces(
quic::QuicDataReader& reader) {
MoqtSubscribeAnnounces subscribe_announces;
if (!ReadTrackNamespace(reader, subscribe_announces.track_namespace)) {
return 0;
}
KeyValuePairList parameters;
if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
if (!ValidateVersionSpecificParameters(
parameters, MoqtMessageType::kSubscribeAnnounces)) {
ParseError("SUBSCRIBE_ANNOUNCES message contains invalid parameters");
return 0;
}
if (!KeyValuePairListToVersionSpecificParameters(
parameters, subscribe_announces.parameters)) {
return 0;
}
visitor_.OnSubscribeAnnouncesMessage(subscribe_announces);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessSubscribeAnnouncesOk(
quic::QuicDataReader& reader) {
MoqtSubscribeAnnouncesOk subscribe_namespace_ok;
if (!ReadTrackNamespace(reader, subscribe_namespace_ok.track_namespace)) {
return 0;
}
visitor_.OnSubscribeAnnouncesOkMessage(subscribe_namespace_ok);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessSubscribeAnnouncesError(
quic::QuicDataReader& reader) {
MoqtSubscribeAnnouncesError subscribe_namespace_error;
uint64_t error_code;
if (!ReadTrackNamespace(reader, subscribe_namespace_error.track_namespace) ||
!reader.ReadVarInt62(&error_code) ||
!reader.ReadStringVarInt62(subscribe_namespace_error.reason_phrase)) {
return 0;
}
subscribe_namespace_error.error_code =
static_cast<SubscribeErrorCode>(error_code);
visitor_.OnSubscribeAnnouncesErrorMessage(subscribe_namespace_error);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessUnsubscribeAnnounces(
quic::QuicDataReader& reader) {
MoqtUnsubscribeAnnounces unsubscribe_namespace;
if (!ReadTrackNamespace(reader, unsubscribe_namespace.track_namespace)) {
return 0;
}
visitor_.OnUnsubscribeAnnouncesMessage(unsubscribe_namespace);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessMaxRequestId(quic::QuicDataReader& reader) {
MoqtMaxRequestId max_request_id;
if (!reader.ReadVarInt62(&max_request_id.max_request_id)) {
return 0;
}
visitor_.OnMaxRequestIdMessage(max_request_id);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessFetch(quic::QuicDataReader& reader) {
MoqtFetch fetch;
absl::string_view track_name;
uint8_t group_order;
uint64_t end_object;
uint64_t type;
if (!reader.ReadVarInt62(&fetch.fetch_id) ||
!reader.ReadUInt8(&fetch.subscriber_priority) ||
!reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&type)) {
return 0;
}
if (!ParseDeliveryOrder(group_order, fetch.group_order)) {
ParseError("Invalid group order value in FETCH message");
return 0;
}
switch (static_cast<FetchType>(type)) {
case FetchType::kJoining: {
uint64_t joining_subscribe_id;
uint64_t preceding_group_offset;
if (!reader.ReadVarInt62(&joining_subscribe_id) ||
!reader.ReadVarInt62(&preceding_group_offset)) {
return 0;
}
fetch.joining_fetch =
JoiningFetch{joining_subscribe_id, preceding_group_offset};
break;
}
case FetchType::kStandalone: {
fetch.joining_fetch = std::nullopt;
if (!ReadTrackNamespace(reader, fetch.full_track_name) ||
!reader.ReadStringPieceVarInt62(&track_name) ||
!reader.ReadVarInt62(&fetch.start_object.group) ||
!reader.ReadVarInt62(&fetch.start_object.object) ||
!reader.ReadVarInt62(&fetch.end_group) ||
!reader.ReadVarInt62(&end_object)) {
return 0;
}
// Elements that have to be translated from the literal value.
fetch.full_track_name.AddElement(track_name);
fetch.end_object =
end_object == 0 ? std::optional<uint64_t>() : (end_object - 1);
if (fetch.end_group < fetch.start_object.group ||
(fetch.end_group == fetch.start_object.group &&
fetch.end_object.has_value() &&
*fetch.end_object < fetch.start_object.object)) {
ParseError("End object comes before start object in FETCH");
return 0;
}
break;
}
default:
ParseError("Invalid FETCH type");
return 0;
}
KeyValuePairList parameters;
if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
if (!ValidateVersionSpecificParameters(parameters, MoqtMessageType::kFetch)) {
ParseError("FETCH message contains invalid parameters");
return 0;
}
if (!KeyValuePairListToVersionSpecificParameters(parameters,
fetch.parameters)) {
return 0;
};
visitor_.OnFetchMessage(fetch);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) {
MoqtFetchCancel fetch_cancel;
if (!reader.ReadVarInt62(&fetch_cancel.subscribe_id)) {
return 0;
}
visitor_.OnFetchCancelMessage(fetch_cancel);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) {
MoqtFetchOk fetch_ok;
uint8_t group_order;
KeyValuePairList parameters;
if (!reader.ReadVarInt62(&fetch_ok.subscribe_id) ||
!reader.ReadUInt8(&group_order) ||
!reader.ReadVarInt62(&fetch_ok.largest_id.group) ||
!reader.ReadVarInt62(&fetch_ok.largest_id.object) ||
!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
if (group_order != 0x01 && group_order != 0x02) {
ParseError("Invalid group order value in FETCH_OK");
return 0;
}
if (!ValidateVersionSpecificParameters(parameters,
MoqtMessageType::kFetchOk)) {
ParseError("FETCH_OK message contains invalid parameters");
return 0;
}
fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
if (!KeyValuePairListToVersionSpecificParameters(parameters,
fetch_ok.parameters)) {
return 0;
}
visitor_.OnFetchOkMessage(fetch_ok);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessFetchError(quic::QuicDataReader& reader) {
MoqtFetchError fetch_error;
uint64_t error_code;
if (!reader.ReadVarInt62(&fetch_error.subscribe_id) ||
!reader.ReadVarInt62(&error_code) ||
!reader.ReadStringVarInt62(fetch_error.reason_phrase)) {
return 0;
}
fetch_error.error_code = static_cast<SubscribeErrorCode>(error_code);
visitor_.OnFetchErrorMessage(fetch_error);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessRequestsBlocked(quic::QuicDataReader& reader) {
MoqtRequestsBlocked requests_blocked;
if (!reader.ReadVarInt62(&requests_blocked.max_request_id)) {
return 0;
}
visitor_.OnRequestsBlockedMessage(requests_blocked);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtControlParser::ProcessObjectAck(quic::QuicDataReader& reader) {
MoqtObjectAck object_ack;
uint64_t raw_delta;
if (!reader.ReadVarInt62(&object_ack.subscribe_id) ||
!reader.ReadVarInt62(&object_ack.group_id) ||
!reader.ReadVarInt62(&object_ack.object_id) ||
!reader.ReadVarInt62(&raw_delta)) {
return 0;
}
object_ack.delta_from_deadline = quic::QuicTimeDelta::FromMicroseconds(
SignedVarintUnserializedForm(raw_delta));
visitor_.OnObjectAckMessage(object_ack);
return reader.PreviouslyReadPayload().length();
}
void MoqtControlParser::ParseError(absl::string_view reason) {
ParseError(MoqtError::kProtocolViolation, reason);
}
void MoqtControlParser::ParseError(MoqtError error_code,
absl::string_view reason) {
if (parsing_error_) {
return; // Don't send multiple parse errors.
}
no_more_data_ = true;
parsing_error_ = true;
visitor_.OnParsingError(error_code, reason);
}
bool MoqtControlParser::ReadTrackNamespace(quic::QuicDataReader& reader,
FullTrackName& full_track_name) {
QUICHE_DCHECK(full_track_name.empty());
uint64_t num_elements;
if (!reader.ReadVarInt62(&num_elements)) {
return false;
}
if (num_elements == 0 || num_elements > kMaxNamespaceElements) {
ParseError(MoqtError::kProtocolViolation,
"Invalid number of namespace elements");
return false;
}
for (uint64_t i = 0; i < num_elements; ++i) {
absl::string_view element;
if (!reader.ReadStringPieceVarInt62(&element)) {
return false;
}
full_track_name.AddElement(element);
}
return true;
}
// Returns false if there is a protocol violation.
bool MoqtControlParser::KeyValuePairListToVersionSpecificParameters(
const KeyValuePairList& parameters, VersionSpecificParameters& out) {
return parameters.ForEach(
[&](uint64_t key, uint64_t value) {
VersionSpecificParameter parameter =
static_cast<VersionSpecificParameter>(key);
switch (parameter) {
case VersionSpecificParameter::kDeliveryTimeout:
out.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(value);
break;
case VersionSpecificParameter::kMaxCacheDuration:
out.max_cache_duration =
quic::QuicTimeDelta::FromMilliseconds(value);
break;
case VersionSpecificParameter::kOackWindowSize:
out.oack_window_size = quic::QuicTimeDelta::FromMicroseconds(value);
break;
default:
break;
}
return true;
},
[&](uint64_t key, absl::string_view value) {
VersionSpecificParameter parameter =
static_cast<VersionSpecificParameter>(key);
switch (parameter) {
case VersionSpecificParameter::kAuthorizationToken:
if (!ParseAuthTokenParameter(value, out)) {
return false;
}
break;
default:
break;
}
return true;
});
}
bool MoqtControlParser::ParseAuthTokenParameter(
absl::string_view field, VersionSpecificParameters& out) {
quic::QuicDataReader reader(field);
AuthTokenType token_type;
absl::string_view token;
uint64_t value;
if (!reader.ReadVarInt62(&value) || value > AuthTokenAliasType::kMaxValue) {
ParseError(MoqtError::kKeyValueFormattingError,
"Invalid Authorization Token Alias type");
return false;
}
AuthTokenAliasType alias_type = static_cast<AuthTokenAliasType>(value);
switch (alias_type) {
case AuthTokenAliasType::kUseValue:
if (!reader.ReadVarInt62(&value)) {
ParseError(MoqtError::kKeyValueFormattingError,
"Malformed Authorization Token Parameter");
return false;
}
if (value > AuthTokenType::kMaxAuthTokenType) {
ParseError(MoqtError::kKeyValueFormattingError,
"Invalid Authorization Token Type");
return false;
}
token_type = static_cast<AuthTokenType>(value);
token = reader.PeekRemainingPayload();
break;
case AuthTokenAliasType::kUseAlias:
if (!reader.ReadVarInt62(&value)) {
ParseError(MoqtError::kKeyValueFormattingError,
"Malformed Authorization Token Parameter");
return false;
}
// TODO: Implement support for cache_size > 0
ParseError(MoqtError::kKeyValueFormattingError,
"Unknown Auth Token Alias");
return false;
case AuthTokenAliasType::kRegister:
if (!reader.ReadVarInt62(&value)) {
ParseError(MoqtError::kKeyValueFormattingError,
"Malformed Authorization Token Parameter");
return false;
}
if (!reader.ReadVarInt62(&value)) {
ParseError(MoqtError::kKeyValueFormattingError,
"Malformed Authorization Token Parameter");
return false;
}
token_type = static_cast<AuthTokenType>(value);
token = reader.PeekRemainingPayload();
if (auth_token_cache_size_ + sizeof(uint64_t) + token.length() >
max_auth_token_cache_size_) {
ParseError(MoqtError::kAuthTokenCacheOverflow,
"Too many authorization token tags");
return false;
}
break;
// TODO: Add to the cache.
// TODO: Check if the alias is already in use.
QUICHE_NOTREACHED();
break;
case AuthTokenAliasType::kDelete:
if (!reader.ReadVarInt62(&value)) {
ParseError(MoqtError::kKeyValueFormattingError,
"Malformed Authorization Token Parameter");
return false;
}
// TODO: Implement support for cache_size > 0
ParseError(MoqtError::kKeyValueFormattingError,
"Unknown Auth Token Alias");
return false;
}
// Validate cache operations.
out.authorization_token.push_back(AuthToken(token_type, token));
return true;
}
void MoqtDataParser::ParseError(absl::string_view reason) {
if (parsing_error_) {
return; // Don't send multiple parse errors.
}
next_input_ = kFailed;
no_more_data_ = true;
parsing_error_ = true;
visitor_.OnParsingError(MoqtError::kProtocolViolation, reason);
}
std::optional<absl::string_view> ParseDatagram(absl::string_view data,
MoqtObject& object_metadata) {
uint64_t type_raw, object_status_raw;
absl::string_view extensions;
quic::QuicDataReader reader(data);
if (!reader.ReadVarInt62(&type_raw) ||
!reader.ReadVarInt62(&object_metadata.track_alias) ||
!reader.ReadVarInt62(&object_metadata.group_id) ||
!reader.ReadVarInt62(&object_metadata.object_id) ||
!reader.ReadUInt8(&object_metadata.publisher_priority) ||
!reader.ReadStringPieceVarInt62(&extensions)) {
return std::nullopt;
}
object_metadata.extension_headers = std::string(extensions);
if (static_cast<MoqtDatagramType>(type_raw) ==
MoqtDatagramType::kObjectStatus) {
object_metadata.payload_length = 0;
if (!reader.ReadVarInt62(&object_status_raw)) {
return std::nullopt;
}
object_metadata.object_status = IntegerToObjectStatus(object_status_raw);
return "";
}
absl::string_view payload;
if (!reader.ReadStringPieceVarInt62(&payload)) {
return std::nullopt;
}
object_metadata.object_status = MoqtObjectStatus::kNormal;
object_metadata.payload_length = payload.length();
return payload;
}
void MoqtDataParser::ReadDataUntil(StopCondition stop_condition) {
if (processing_) {
QUICHE_BUG(MoqtDataParser_reentry)
<< "Calling ProcessData() when ProcessData() is already in progress.";
return;
}
processing_ = true;
auto on_return = absl::MakeCleanup([&] { processing_ = false; });
State last_state = state();
for (;;) {
ParseNextItemFromStream();
if (state() == last_state || no_more_data_ || stop_condition()) {
break;
}
last_state = state();
}
}
std::optional<uint64_t> MoqtDataParser::ReadVarInt62NoFin() {
bool fin_read = false;
std::optional<uint64_t> result = ReadVarInt62FromStream(stream_, fin_read);
if (fin_read) {
ParseError("Unexpected FIN received in the middle of a header");
return std::nullopt;
}
return result;
}
std::optional<uint8_t> MoqtDataParser::ReadUint8NoFin() {
char buffer[1];
quiche::ReadStream::ReadResult read_result =
stream_.Read(absl::MakeSpan(buffer));
if (read_result.fin) {
ParseError("Unexpected FIN received in the middle of a header");
return std::nullopt;
}
if (read_result.bytes_read == 0) {
return std::nullopt;
}
return absl::bit_cast<uint8_t>(buffer[0]);
}
void MoqtDataParser::AdvanceParserState() {
QUICHE_DCHECK(type_ == MoqtDataStreamType::kStreamHeaderSubgroup ||
type_ == MoqtDataStreamType::kStreamHeaderFetch);
const bool is_fetch = type_ == MoqtDataStreamType::kStreamHeaderFetch;
switch (next_input_) {
// The state table is factored into a separate function (rather than
// inlined) in order to separate the order of elements from the way they are
// parsed.
case kStreamType:
next_input_ = kTrackAlias;
break;
case kTrackAlias:
next_input_ = kGroupId;
break;
case kGroupId:
next_input_ = kSubgroupId;
break;
case kSubgroupId:
next_input_ = is_fetch ? kObjectId : kPublisherPriority;
break;
case kPublisherPriority:
next_input_ = is_fetch ? kExtensionSize : kObjectId;
break;
case kObjectId:
next_input_ = is_fetch ? kPublisherPriority : kExtensionSize;
break;
case kExtensionBody:
next_input_ = kObjectPayloadLength;
break;
case kStatus:
case kData:
next_input_ = is_fetch ? kGroupId : kObjectId;
break;
case kExtensionSize: // Either kExtensionBody or
// kObjectPayloadLength.
case kObjectPayloadLength: // Either kStatus or kData depending on length.
case kPadding: // Handled separately.
case kFailed: // Should cause parsing to cease.
QUICHE_NOTREACHED();
break;
}
}
void MoqtDataParser::ParseNextItemFromStream() {
if (CheckForFinWithoutData()) {
return;
}
switch (next_input_) {
case kStreamType: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
if (!IsAllowedStreamType(*value_read)) {
ParseError("Invalid stream type supplied");
return;
}
type_ = static_cast<MoqtDataStreamType>(*value_read);
switch (*type_) {
case MoqtDataStreamType::kStreamHeaderSubgroup:
case MoqtDataStreamType::kStreamHeaderFetch:
AdvanceParserState();
break;
case MoqtDataStreamType::kPadding:
next_input_ = kPadding;
break;
}
}
return;
}
case kTrackAlias: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
metadata_.track_alias = *value_read;
AdvanceParserState();
}
return;
}
case kGroupId: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
metadata_.group_id = *value_read;
AdvanceParserState();
}
return;
}
case kSubgroupId: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
metadata_.subgroup_id = *value_read;
AdvanceParserState();
}
return;
}
case kPublisherPriority: {
std::optional<uint8_t> value_read = ReadUint8NoFin();
if (value_read.has_value()) {
metadata_.publisher_priority = *value_read;
AdvanceParserState();
}
return;
}
case kObjectId: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
metadata_.object_id = *value_read;
AdvanceParserState();
}
return;
}
case kExtensionSize: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
metadata_.extension_headers.clear();
payload_length_remaining_ = *value_read;
next_input_ = (value_read == 0) ? kObjectPayloadLength : kExtensionBody;
}
return;
}
case kObjectPayloadLength: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
metadata_.payload_length = *value_read;
payload_length_remaining_ = *value_read;
if (metadata_.payload_length > 0) {
metadata_.object_status = MoqtObjectStatus::kNormal;
next_input_ = kData;
} else {
next_input_ = kStatus;
}
}
return;
}
case kStatus: {
bool fin_read = false;
std::optional<uint64_t> value_read =
ReadVarInt62FromStream(stream_, fin_read);
if (value_read.has_value()) {
metadata_.object_status = IntegerToObjectStatus(*value_read);
if (metadata_.object_status == MoqtObjectStatus::kInvalidObjectStatus) {
ParseError("Invalid object status provided");
return;
}
++num_objects_read_;
visitor_.OnObjectMessage(metadata_, "", /*end_of_message=*/true);
AdvanceParserState();
}
if (fin_read) {
no_more_data_ = true;
return;
}
return;
}
case kExtensionBody:
case kData: {
while (payload_length_remaining_ > 0) {
quiche::ReadStream::PeekResult peek_result =
stream_.PeekNextReadableRegion();
if (!peek_result.has_data()) {
return;
}
if (peek_result.fin_next && payload_length_remaining_ > 0) {
ParseError("FIN received at an unexpected point in the stream");
return;
}
size_t chunk_size =
std::min(payload_length_remaining_, peek_result.peeked_data.size());
payload_length_remaining_ -= chunk_size;
bool done = payload_length_remaining_ == 0;
if (next_input_ == kData) {
visitor_.OnObjectMessage(
metadata_, peek_result.peeked_data.substr(0, chunk_size), done);
const bool fin = stream_.SkipBytes(chunk_size);
if (done) {
++num_objects_read_;
no_more_data_ |= fin;
AdvanceParserState();
}
} else {
absl::StrAppend(&metadata_.extension_headers,
peek_result.peeked_data.substr(0, chunk_size));
if (stream_.SkipBytes(chunk_size)) {
ParseError("FIN received at an unexpected point in the stream");
return;
}
if (done) {
AdvanceParserState();
}
}
}
return;
}
case kPadding:
no_more_data_ |= stream_.SkipBytes(stream_.ReadableBytes());
return;
case kFailed:
return;
}
}
void MoqtDataParser::ReadAllData() {
ReadDataUntil(+[]() { return false; });
}
void MoqtDataParser::ReadStreamType() {
return ReadDataUntil([this]() { return type_.has_value(); });
}
void MoqtDataParser::ReadTrackAlias() {
return ReadDataUntil(
[this]() { return type_.has_value() && next_input_ != kTrackAlias; });
}
void MoqtDataParser::ReadAtMostOneObject() {
const size_t num_objects_read_initial = num_objects_read_;
return ReadDataUntil(
[&]() { return num_objects_read_ != num_objects_read_initial; });
}
bool MoqtDataParser::CheckForFinWithoutData() {
if (!stream_.PeekNextReadableRegion().fin_next) {
return false;
}
const bool valid_state =
(type_ == MoqtDataStreamType::kStreamHeaderSubgroup &&
next_input_ == kObjectId) ||
(type_ == MoqtDataStreamType::kStreamHeaderFetch &&
next_input_ == kGroupId);
if (!valid_state || num_objects_read_ == 0) {
ParseError("FIN received at an unexpected point in the stream");
return true;
}
return stream_.SkipBytes(0);
}
} // namespace moqt