blob: 8b52e6e76a9bc524a6a4a756d889c7f588083f05 [file] [log] [blame]
// Copyright (c) 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_parser.h"
#include <algorithm>
#include <array>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <optional>
#include <string>
#include <utility>
#include <variant>
#include <vector>
#include "absl/base/casts.h"
#include "absl/cleanup/cleanup.h"
#include "absl/container/fixed_array.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "quiche/http2/adapter/header_validator.h"
#include "quiche/quic/core/quic_data_reader.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_error.h"
#include "quiche/quic/moqt/moqt_key_value_pair.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_names.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_types.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_data_reader.h"
#include "quiche/common/quiche_status_utils.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
namespace {
uint64_t SignedVarintUnserializedForm(uint64_t value) {
if (value & 0x01) {
return -(value >> 1);
}
return value >> 1;
}
absl::Status KeyValueFormatError(absl::string_view message) {
return MoqtErrorStatusWithCode(message, MoqtError::kKeyValueFormattingError);
}
absl::Status CheckForTrailingData(const quic::QuicDataReader& reader) {
if (!reader.IsDoneReading()) {
return absl::InvalidArgumentError(
absl::StrCat("Control message has excess data of ",
reader.BytesRemaining(), " bytes at the end"));
}
return absl::OkStatus();
}
// |fin_read| is set to true if there is a FIN anywhere before the end of the
// varint.
std::optional<uint64_t> ReadVarInt62FromStream(webtransport::Stream& stream,
bool& fin_read) {
fin_read = false;
webtransport::Stream::PeekResult peek_result =
stream.PeekNextReadableRegion();
if (peek_result.peeked_data.empty()) {
if (peek_result.fin_next) {
fin_read = stream.SkipBytes(0);
QUICHE_DCHECK(fin_read);
}
return std::nullopt;
}
char first_byte = peek_result.peeked_data[0];
size_t varint_size =
1 << ((absl::bit_cast<uint8_t>(first_byte) & 0b11000000) >> 6);
if (stream.ReadableBytes() < varint_size) {
if (peek_result.all_data_received) {
fin_read = true;
}
return std::nullopt;
}
char buffer[8];
absl::Span<char> bytes_to_read =
absl::MakeSpan(buffer).subspan(0, varint_size);
webtransport::Stream::ReadResult read_result = stream.Read(bytes_to_read);
QUICHE_DCHECK_EQ(read_result.bytes_read, varint_size);
fin_read = read_result.fin;
quiche::QuicheDataReader reader(buffer, read_result.bytes_read);
uint64_t result;
bool success = reader.ReadVarInt62(&result);
QUICHE_DCHECK(success);
QUICHE_DCHECK(reader.IsDoneReading());
return result;
}
// Reads from |reader| to list. Returns false if there is a read error.
absl::Status ParseKeyValuePairList(quic::QuicDataReader& reader,
KeyValuePairList& list) {
list.clear();
uint64_t num_params;
if (!reader.ReadVarInt62(&num_params)) {
return absl::InvalidArgumentError(
"Unable to parse key-value pair list element count");
}
uint64_t type = 0;
for (uint64_t i = 0; i < num_params; ++i) {
uint64_t type_diff;
if (!reader.ReadVarInt62(&type_diff)) {
return absl::InvalidArgumentError(
"Unable to parse the key in a key-value pair");
}
type += type_diff;
if (type % 2 == 1) {
absl::string_view bytes;
if (!reader.ReadStringPieceVarInt62(&bytes)) {
return absl::InvalidArgumentError(
"Unable to read the string value in a key-value pair");
}
list.insert(type, bytes);
continue;
}
uint64_t value;
if (!reader.ReadVarInt62(&value)) {
return absl::InvalidArgumentError(
"Unable to read the integer value in a key-value pair");
}
list.insert(type, value);
}
return absl::OkStatus();
}
absl::Status ParseKeyValuePairListWithNoPrefix(quic::QuicDataReader& reader,
KeyValuePairList& list) {
list.clear();
uint64_t type = 0;
while (reader.BytesRemaining() > 0) {
uint64_t type_diff;
if (!reader.ReadVarInt62(&type_diff)) {
return absl::InvalidArgumentError(
"Unable to parse the key in a key-value pair");
}
type += type_diff;
if (type % 2 == 1) {
absl::string_view bytes;
if (!reader.ReadStringPieceVarInt62(&bytes)) {
return absl::InvalidArgumentError(
"Unable to read the string value in a key-value pair");
}
list.insert(type, bytes);
continue;
}
uint64_t value;
if (!reader.ReadVarInt62(&value)) {
return absl::InvalidArgumentError(
"Unable to read the integer value in a key-value pair");
}
list.insert(type, value);
}
return absl::OkStatus();
}
bool ParseAuthTokenParameter(absl::string_view field,
std::vector<AuthToken>& out) {
quic::QuicDataReader reader(field);
AuthTokenAliasType alias_type;
uint64_t alias;
AuthTokenType type;
absl::string_view token;
uint64_t value;
if (!reader.ReadVarInt62(&value)) {
return false;
}
alias_type = static_cast<AuthTokenAliasType>(value);
switch (alias_type) {
case AuthTokenAliasType::kUseValue:
if (!reader.ReadVarInt62(&value) ||
value > AuthTokenType::kMaxAuthTokenType) {
return false;
}
type = static_cast<AuthTokenType>(value);
token = reader.PeekRemainingPayload();
out.push_back(AuthToken(type, token));
break;
case AuthTokenAliasType::kUseAlias:
if (!reader.ReadVarInt62(&value)) {
return false;
}
out.push_back(AuthToken(value, alias_type));
break;
case AuthTokenAliasType::kRegister:
if (!reader.ReadVarInt62(&alias) || !reader.ReadVarInt62(&value)) {
return false;
}
type = static_cast<AuthTokenType>(value);
token = reader.PeekRemainingPayload();
out.push_back(AuthToken(alias, type, token));
break;
case AuthTokenAliasType::kDelete:
if (!reader.ReadVarInt62(&alias)) {
return false;
}
out.push_back(AuthToken(alias, alias_type));
break;
default: // invalid alias type
return false;
}
return true;
}
bool ParseLocation(absl::string_view field, Location& out) {
quic::QuicDataReader reader(field);
return reader.ReadVarInt62(&out.group) && reader.ReadVarInt62(&out.object) &&
reader.IsDoneReading();
}
absl::Status ParseSubscriptionFilter(absl::string_view field,
std::optional<SubscriptionFilter>& out) {
quic::QuicDataReader reader(field);
uint64_t value;
if (!reader.ReadVarInt62(&value)) {
return KeyValueFormatError("Unable to read subscription filter type");
}
uint64_t group, object;
switch (static_cast<MoqtFilterType>(value)) {
case MoqtFilterType::kLargestObject:
case MoqtFilterType::kNextGroupStart:
out.emplace(static_cast<MoqtFilterType>(value));
break;
case MoqtFilterType::kAbsoluteStart:
if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) {
return KeyValueFormatError("Invalid AbsoluteStart filter");
}
out.emplace(Location(group, object));
break;
case MoqtFilterType::kAbsoluteRange:
if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object) ||
!reader.ReadVarInt62(&value)) {
return KeyValueFormatError("Invalid AbsoluteRange filter");
}
if (value < group) { // end before start
return absl::InvalidArgumentError(
"AbsoluteRange filter specified with a start after the end");
}
out.emplace(Location(group, object), value);
break;
default: // invalid filter type
return absl::InvalidArgumentError("Invalid filter type");
}
return absl::OkStatus();
}
} // namespace
absl::Status SetupParameters::FromKeyValuePairList(
const KeyValuePairList& list) {
absl::Status status = absl::OkStatus();
uint64_t last_key;
bool result = list.ForEach(
[&](uint64_t key, std::variant<uint64_t, absl::string_view> value) {
last_key = key;
switch (static_cast<SetupParameter>(key)) {
case SetupParameter::kMaxRequestId:
if (max_request_id.has_value()) {
status = absl::InvalidArgumentError("Duplicate Setup Parameter");
return false;
}
max_request_id = std::get<uint64_t>(value);
break;
case SetupParameter::kMaxAuthTokenCacheSize:
if (max_auth_token_cache_size.has_value()) {
status = absl::InvalidArgumentError("Duplicate Setup Parameter");
return false;
}
max_auth_token_cache_size = std::get<uint64_t>(value);
break;
case SetupParameter::kPath:
if (path.has_value()) {
status = absl::InvalidArgumentError("Duplicate Setup Parameter");
return false;
}
if (!http2::adapter::HeaderValidator::IsValidPath(
std::get<absl::string_view>(value),
/*allow_fragment=*/false)) {
status = MoqtErrorStatusWithCode("Malformed path",
MoqtError::kMalformedPath);
return false;
}
path = std::get<absl::string_view>(value);
break;
case SetupParameter::kAuthorizationToken:
if (!ParseAuthTokenParameter(std::get<absl::string_view>(value),
authorization_tokens)) {
status = KeyValueFormatError("Malformed auth token parameter");
return false;
}
break;
case SetupParameter::kAuthority:
if (!http2::adapter::HeaderValidator::IsValidAuthority(
std::get<absl::string_view>(value))) {
status = MoqtErrorStatusWithCode("Invalid authority field",
MoqtError::kMalformedAuthority);
return false;
}
authority = std::get<absl::string_view>(value);
break;
case SetupParameter::kMoqtImplementation:
if (moqt_implementation.has_value()) {
status = absl::InvalidArgumentError("Duplicate Setup Parameter");
return false;
}
QUICHE_LOG(INFO) << "Peer MOQT implementation: "
<< std::get<absl::string_view>(value);
moqt_implementation = std::get<absl::string_view>(value);
break;
case SetupParameter::kSupportObjectAcks:
if (support_object_acks.has_value()) {
status = absl::InvalidArgumentError("Duplicate Setup Parameter");
return false;
}
if (std::get<uint64_t>(value) > 1) {
status =
KeyValueFormatError("SUPPORT_OBJECT_ACKS has to be 0 or 1");
return false;
}
support_object_acks = (std::get<uint64_t>(value) == 1);
break;
default:
break;
}
return true;
});
if (!result && status.ok()) {
return absl::InvalidArgumentError(
absl::StrCat("Failed to parse the value for the setup parameter key 0x",
absl::Hex(static_cast<uint64_t>(last_key))));
}
return status;
}
absl::Status MessageParameters::FromKeyValuePairList(
const KeyValuePairList& list) {
absl::Status status = absl::OkStatus();
uint64_t last_key;
bool result = list.ForEach([&](uint64_t key,
std::variant<uint64_t, absl::string_view>
value) {
last_key = key;
switch (static_cast<MessageParameter>(key)) {
case MessageParameter::kDeliveryTimeout:
if (delivery_timeout.has_value()) {
status = absl::InvalidArgumentError("Duplicate Message Parameter");
return false;
}
if (std::get<uint64_t>(value) == 0) {
status = absl::InvalidArgumentError("DELIVERY_TIMEOUT cannot be 0");
return false;
}
delivery_timeout =
quic::QuicTimeDelta::TryFromMilliseconds(std::get<uint64_t>(value))
.value_or(quic::QuicTimeDelta::Infinite());
break;
case MessageParameter::kAuthorizationToken:
if (!ParseAuthTokenParameter(std::get<absl::string_view>(value),
authorization_tokens)) {
status = KeyValueFormatError("Malformed auth token parameter");
return false;
}
break;
case MessageParameter::kExpires:
if (expires.has_value()) {
status = absl::InvalidArgumentError("Duplicate Message Parameter");
return false;
}
expires =
quic::QuicTimeDelta::TryFromMilliseconds(std::get<uint64_t>(value))
.value_or(quic::QuicTimeDelta::Infinite());
if (expires->IsZero()) {
expires = quic::QuicTimeDelta::Infinite();
}
break;
case MessageParameter::kLargestObject:
if (largest_object.has_value()) {
status = absl::InvalidArgumentError("Duplicate Message Parameter");
return false;
}
largest_object = Location();
if (!ParseLocation(std::get<absl::string_view>(value),
*largest_object)) {
status = KeyValueFormatError(
"Failed to parse location of the largest object");
return false;
}
break;
case MessageParameter::kForward:
if (forward_has_value()) {
status = absl::InvalidArgumentError("Duplicate Message Parameter");
return false;
}
if (std::get<uint64_t>(value) > 1) {
status = absl::InvalidArgumentError("FORWARD must be 0 or 1");
return false;
}
set_forward(std::get<uint64_t>(value) != 0);
break;
case MessageParameter::kSubscriberPriority:
if (subscriber_priority.has_value()) {
status = absl::InvalidArgumentError("Duplicate Message Parameter");
return false;
}
if (std::get<uint64_t>(value) > kMaxPriority) {
status =
absl::InvalidArgumentError("Subscriber priority exceeds maximum");
return false;
}
subscriber_priority =
static_cast<MoqtPriority>(std::get<uint64_t>(value));
break;
case MessageParameter::kSubscriptionFilter:
if (subscription_filter.has_value()) {
status = absl::InvalidArgumentError("Duplicate Message Parameter");
// TODO(martinduke): Support multiple subscription filters.
return false;
}
status = ParseSubscriptionFilter(std::get<absl::string_view>(value),
subscription_filter);
if (!status.ok()) {
return false;
}
break;
case MessageParameter::kGroupOrder:
if (group_order.has_value()) {
status = absl::InvalidArgumentError("Duplicate Message Parameter");
return false;
}
if (std::get<uint64_t>(value) > kMaxMoqtDeliveryOrder ||
std::get<uint64_t>(value) < kMinMoqtDeliveryOrder) {
status = absl::InvalidArgumentError(
"GROUP_ORDER is outside the valid range");
return false;
}
group_order = static_cast<MoqtDeliveryOrder>(std::get<uint64_t>(value));
break;
case MessageParameter::kNewGroupRequest:
if (new_group_request.has_value()) {
status = absl::InvalidArgumentError("Duplicate Message Parameter");
return false;
}
new_group_request = std::get<uint64_t>(value);
break;
case MessageParameter::kOackWindowSize:
if (oack_window_size.has_value()) {
status = absl::InvalidArgumentError("Duplicate Message Parameter");
return false;
}
oack_window_size =
quic::QuicTimeDelta::FromMicroseconds(std::get<uint64_t>(value));
break;
default:
// Unknown MessageParameters not allowed!
status = absl::InvalidArgumentError(
absl::StrCat("Unknown message parameter 0x",
absl::Hex(static_cast<uint64_t>(key))));
return false;
}
return true;
});
if (!result && status.ok()) {
return absl::InvalidArgumentError(absl::StrCat(
"Failed to parse the value for the message parameter key 0x",
absl::Hex(static_cast<uint64_t>(last_key))));
}
return status;
}
bool MoqtMessageTypeParser::ReadUntilMessageTypeKnown() {
if (message_type_.has_value()) {
return true;
}
bool fin_read = false;
message_type_ = ReadVarInt62FromStream(stream_, fin_read);
if (fin_read) {
return false;
}
return true;
}
void MoqtControlParser::ReadAndDispatchMessages() {
if (no_more_data_) {
ParseError("Data after end of stream");
return;
}
if (processing_) {
return;
}
processing_ = true;
auto on_return = absl::MakeCleanup([&] { processing_ = false; });
while (!no_more_data_) {
bool fin_read = false;
// Read the message type.
if (!message_type_.has_value()) {
message_type_ = ReadVarInt62FromStream(stream_, fin_read);
if (fin_read) {
ParseError("FIN on control stream");
return;
}
if (!message_type_.has_value()) {
return;
}
}
QUICHE_DCHECK(message_type_.has_value());
// Read the message length.
if (!message_size_.has_value()) {
if (stream_.ReadableBytes() < 2) {
return;
}
std::array<char, 2> size_bytes;
webtransport::Stream::ReadResult result =
stream_.Read(absl::MakeSpan(size_bytes));
if (result.bytes_read != 2) {
ParseError(MoqtError::kInternalError,
"Stream returned incorrect ReadableBytes");
return;
}
if (result.fin) {
ParseError("FIN on control stream");
return;
}
message_size_ = static_cast<uint16_t>(size_bytes[0]) << 8 |
static_cast<uint16_t>(size_bytes[1]);
if (*message_size_ > kMaxMessageHeaderSize) {
ParseError(MoqtError::kInternalError,
absl::StrCat("Cannot parse control messages more than ",
kMaxMessageHeaderSize, " bytes"));
return;
}
}
QUICHE_DCHECK(message_size_.has_value());
// Read the message if it's fully received.
//
// CAUTION: if the flow control windows are too low, and
// kMaxMessageHeaderSize is too high, this will cause a deadlock.
if (stream_.ReadableBytes() < *message_size_) {
return;
}
absl::FixedArray<char> message(*message_size_);
webtransport::Stream::ReadResult result =
stream_.Read(absl::MakeSpan(message));
if (result.bytes_read != *message_size_) {
ParseError("Stream returned incorrect ReadableBytes");
return;
}
if (result.fin) {
ParseError("FIN on control stream");
return;
}
ProcessMessage(absl::string_view(message.data(), message.size()),
static_cast<MoqtMessageType>(*message_type_));
message_type_.reset();
message_size_.reset();
}
}
void MoqtControlParser::ProcessMessage(absl::string_view data,
MoqtMessageType message_type) {
absl::Status status;
switch (message_type) {
case MoqtMessageType::kClientSetup:
status = ProcessClientSetup(data);
break;
case MoqtMessageType::kServerSetup:
status = ProcessServerSetup(data);
break;
case MoqtMessageType::kRequestOk:
status = ProcessRequestOk(data);
break;
case MoqtMessageType::kRequestError:
status = ProcessRequestError(data);
break;
case MoqtMessageType::kSubscribe:
status = ProcessSubscribe(data);
break;
case MoqtMessageType::kSubscribeOk:
status = ProcessSubscribeOk(data);
break;
case MoqtMessageType::kUnsubscribe:
status = ProcessUnsubscribe(data);
break;
case MoqtMessageType::kPublishDone:
status = ProcessPublishDone(data);
break;
case MoqtMessageType::kRequestUpdate:
status = ProcessRequestUpdate(data);
break;
case MoqtMessageType::kPublishNamespace:
status = ProcessPublishNamespace(data);
break;
case MoqtMessageType::kPublishNamespaceDone:
status = ProcessPublishNamespaceDone(data);
break;
case MoqtMessageType::kNamespace:
status = ProcessNamespace(data);
break;
case MoqtMessageType::kNamespaceDone:
status = ProcessNamespaceDone(data);
break;
case MoqtMessageType::kPublishNamespaceCancel:
status = ProcessPublishNamespaceCancel(data);
break;
case MoqtMessageType::kTrackStatus:
status = ProcessTrackStatus(data);
break;
case MoqtMessageType::kGoAway:
status = ProcessGoAway(data);
break;
case MoqtMessageType::kSubscribeNamespace:
status = ProcessSubscribeNamespace(data);
break;
case MoqtMessageType::kMaxRequestId:
status = ProcessMaxRequestId(data);
break;
case MoqtMessageType::kFetch:
status = ProcessFetch(data);
break;
case MoqtMessageType::kFetchCancel:
status = ProcessFetchCancel(data);
break;
case MoqtMessageType::kFetchOk:
status = ProcessFetchOk(data);
break;
case MoqtMessageType::kRequestsBlocked:
status = ProcessRequestsBlocked(data);
break;
case MoqtMessageType::kPublish:
status = ProcessPublish(data);
break;
case MoqtMessageType::kPublishOk:
status = ProcessPublishOk(data);
break;
case moqt::MoqtMessageType::kObjectAck:
status = ProcessObjectAck(data);
break;
default:
ParseError(absl::InvalidArgumentError(
absl::StrCat("Unknown control message type 0x",
absl::Hex(static_cast<uint64_t>(message_type)))));
return;
}
if (!status.ok()) {
ParseError(
quiche::AppendToStatus(status, " while parsing a message of type 0x",
absl::Hex(static_cast<uint64_t>(message_type))));
}
}
absl::Status MoqtControlParser::ProcessClientSetup(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtClientSetup setup;
KeyValuePairList parameters;
QUICHE_RETURN_IF_ERROR(ParseKeyValuePairList(reader, parameters));
QUICHE_RETURN_IF_ERROR(FillAndValidateSetupParameters(
parameters, setup.parameters, MoqtMessageType::kClientSetup));
// TODO(martinduke): Validate construction of the PATH (Sec 8.3.2.1)
visitor_.OnClientSetupMessage(setup);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessServerSetup(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtServerSetup setup;
KeyValuePairList parameters;
QUICHE_RETURN_IF_ERROR(ParseKeyValuePairList(reader, parameters));
QUICHE_RETURN_IF_ERROR(FillAndValidateSetupParameters(
parameters, setup.parameters, MoqtMessageType::kServerSetup));
visitor_.OnServerSetupMessage(setup);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessSubscribe(absl::string_view data,
MoqtMessageType message_type) {
quic::QuicDataReader reader(data);
MoqtSubscribe subscribe;
if (!reader.ReadVarInt62(&subscribe.request_id)) {
return absl::InvalidArgumentError("Failed to read request ID");
}
QUICHE_RETURN_IF_ERROR(ReadFullTrackName(reader, subscribe.full_track_name));
QUICHE_RETURN_IF_ERROR(
FillAndValidateMessageParameters(reader, subscribe.parameters));
if (message_type == MoqtMessageType::kTrackStatus) {
visitor_.OnTrackStatusMessage(subscribe);
} else {
visitor_.OnSubscribeMessage(subscribe);
}
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessSubscribeOk(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtSubscribeOk subscribe_ok;
if (!reader.ReadVarInt62(&subscribe_ok.request_id)) {
return absl::InvalidArgumentError("Failed to read the request ID");
}
if (!reader.ReadVarInt62(&subscribe_ok.track_alias)) {
return absl::InvalidArgumentError("Failed to read the track alias");
}
KeyValuePairList pairs;
QUICHE_RETURN_IF_ERROR(ParseKeyValuePairList(reader, pairs));
QUICHE_RETURN_IF_ERROR(subscribe_ok.parameters.FromKeyValuePairList(pairs));
QUICHE_RETURN_IF_ERROR(
ParseKeyValuePairListWithNoPrefix(reader, subscribe_ok.extensions));
if (!subscribe_ok.extensions.Validate()) {
return absl::InvalidArgumentError("Invalid SUBSCRIBE_OK track extensions");
}
visitor_.OnSubscribeOkMessage(subscribe_ok);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessRequestError(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtRequestError request_error;
uint64_t error_code;
uint64_t raw_interval;
if (!reader.ReadVarInt62(&request_error.request_id) ||
!reader.ReadVarInt62(&error_code) ||
!reader.ReadVarInt62(&raw_interval) ||
!reader.ReadStringVarInt62(request_error.reason_phrase)) {
return absl::InvalidArgumentError("Message missing fields");
}
request_error.error_code = static_cast<RequestErrorCode>(error_code);
request_error.retry_interval =
(raw_interval == 0)
? std::nullopt
: std::make_optional(
quic::QuicTimeDelta::FromMilliseconds(raw_interval - 1));
visitor_.OnRequestErrorMessage(request_error);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessUnsubscribe(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtUnsubscribe unsubscribe;
if (!reader.ReadVarInt62(&unsubscribe.request_id)) {
return absl::InvalidArgumentError("Message missing fields");
}
visitor_.OnUnsubscribeMessage(unsubscribe);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessPublishDone(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtPublishDone publish_done;
uint64_t value;
if (!reader.ReadVarInt62(&publish_done.request_id) ||
!reader.ReadVarInt62(&value) ||
!reader.ReadVarInt62(&publish_done.stream_count) ||
!reader.ReadStringVarInt62(publish_done.error_reason)) {
return absl::InvalidArgumentError("Message missing fields");
}
publish_done.status_code = static_cast<PublishDoneCode>(value);
visitor_.OnPublishDoneMessage(publish_done);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessRequestUpdate(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtRequestUpdate request_update;
if (!reader.ReadVarInt62(&request_update.request_id) ||
!reader.ReadVarInt62(&request_update.existing_request_id)) {
return absl::InvalidArgumentError("Message missing request IDs");
}
QUICHE_RETURN_IF_ERROR(
FillAndValidateMessageParameters(reader, request_update.parameters));
visitor_.OnRequestUpdateMessage(request_update);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessPublishNamespace(
absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtPublishNamespace publish_namespace;
if (!reader.ReadVarInt62(&publish_namespace.request_id)) {
return absl::InvalidArgumentError("Request ID missing");
}
QUICHE_RETURN_IF_ERROR(
ReadTrackNamespace(reader, publish_namespace.track_namespace));
QUICHE_RETURN_IF_ERROR(
FillAndValidateMessageParameters(reader, publish_namespace.parameters));
visitor_.OnPublishNamespaceMessage(publish_namespace);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessNamespace(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtNamespace _namespace;
QUICHE_RETURN_IF_ERROR(
ReadTrackNamespace(reader, _namespace.track_namespace_suffix));
visitor_.OnNamespaceMessage(_namespace);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessNamespaceDone(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtNamespaceDone namespace_done;
QUICHE_RETURN_IF_ERROR(
ReadTrackNamespace(reader, namespace_done.track_namespace_suffix));
visitor_.OnNamespaceDoneMessage(namespace_done);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessRequestOk(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtRequestOk request_ok;
if (!reader.ReadVarInt62(&request_ok.request_id)) {
return absl::InvalidArgumentError("Request ID missing");
}
QUICHE_RETURN_IF_ERROR(
FillAndValidateMessageParameters(reader, request_ok.parameters));
visitor_.OnRequestOkMessage(request_ok);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessPublishNamespaceDone(
absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtPublishNamespaceDone pn_done;
if (!reader.ReadVarInt62(&pn_done.request_id)) {
return absl::InvalidArgumentError("Request ID missing");
}
visitor_.OnPublishNamespaceDoneMessage(pn_done);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessPublishNamespaceCancel(
absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtPublishNamespaceCancel publish_namespace_cancel;
uint64_t error_code;
if (!reader.ReadVarInt62(&publish_namespace_cancel.request_id) ||
!reader.ReadVarInt62(&error_code) ||
!reader.ReadStringVarInt62(publish_namespace_cancel.error_reason)) {
return absl::InvalidArgumentError("Message missing fields");
}
publish_namespace_cancel.error_code =
static_cast<RequestErrorCode>(error_code);
visitor_.OnPublishNamespaceCancelMessage(publish_namespace_cancel);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessTrackStatus(absl::string_view data) {
return ProcessSubscribe(data, MoqtMessageType::kTrackStatus);
}
absl::Status MoqtControlParser::ProcessGoAway(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtGoAway goaway;
if (!reader.ReadStringVarInt62(goaway.new_session_uri)) {
return absl::InvalidArgumentError("Missing new session URI");
}
visitor_.OnGoAwayMessage(goaway);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessSubscribeNamespace(
absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtSubscribeNamespace subscribe_namespace;
uint64_t raw_option;
if (!reader.ReadVarInt62(&subscribe_namespace.request_id)) {
return absl::InvalidArgumentError("Request ID missing");
}
QUICHE_RETURN_IF_ERROR(
ReadTrackNamespace(reader, subscribe_namespace.track_namespace_prefix));
if (!reader.ReadVarInt62(&raw_option)) {
return absl::InvalidArgumentError("SUBSCRIBE_NAMESPACE option missing");
}
if (raw_option > kMaxSubscribeOption) {
return absl::InvalidArgumentError("Invalid SUBSCRIBE_NAMESPACE option");
}
subscribe_namespace.subscribe_options =
static_cast<SubscribeNamespaceOption>(raw_option);
QUICHE_RETURN_IF_ERROR(
FillAndValidateMessageParameters(reader, subscribe_namespace.parameters));
visitor_.OnSubscribeNamespaceMessage(subscribe_namespace);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessMaxRequestId(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtMaxRequestId max_request_id;
if (!reader.ReadVarInt62(&max_request_id.max_request_id)) {
return absl::InvalidArgumentError("Max request ID missing");
}
visitor_.OnMaxRequestIdMessage(max_request_id);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessFetch(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtFetch fetch;
uint64_t type;
if (!reader.ReadVarInt62(&fetch.request_id) || !reader.ReadVarInt62(&type)) {
return absl::InvalidArgumentError("Message missing fields");
}
switch (static_cast<FetchType>(type)) {
case FetchType::kAbsoluteJoining: {
uint64_t joining_request_id;
uint64_t joining_start;
if (!reader.ReadVarInt62(&joining_request_id) ||
!reader.ReadVarInt62(&joining_start)) {
return absl::InvalidArgumentError(
"Absolute joining parameters invalid");
}
fetch.fetch = JoiningFetchAbsolute{joining_request_id, joining_start};
break;
}
case FetchType::kRelativeJoining: {
uint64_t joining_request_id;
uint64_t joining_start;
if (!reader.ReadVarInt62(&joining_request_id) ||
!reader.ReadVarInt62(&joining_start)) {
return absl::InvalidArgumentError(
"Relative joining parameters invalid");
}
fetch.fetch = JoiningFetchRelative{joining_request_id, joining_start};
break;
}
case FetchType::kStandalone: {
fetch.fetch = StandaloneFetch();
StandaloneFetch& standalone_fetch =
std::get<StandaloneFetch>(fetch.fetch);
QUICHE_RETURN_IF_ERROR(
ReadFullTrackName(reader, standalone_fetch.full_track_name));
if (!reader.ReadVarInt62(&standalone_fetch.start_location.group) ||
!reader.ReadVarInt62(&standalone_fetch.start_location.object) ||
!reader.ReadVarInt62(&standalone_fetch.end_location.group) ||
!reader.ReadVarInt62(&standalone_fetch.end_location.object)) {
return absl::InvalidArgumentError(
"Standalone fetch parameters invalid");
}
if (standalone_fetch.end_location.object == 0) {
standalone_fetch.end_location.object = kMaxObjectId;
} else {
--standalone_fetch.end_location.object;
}
if (standalone_fetch.end_location < standalone_fetch.start_location) {
return absl::InvalidArgumentError(
"End object comes before start object in FETCH");
}
break;
}
default:
return absl::InvalidArgumentError("Invalid FETCH type");
}
QUICHE_RETURN_IF_ERROR(
FillAndValidateMessageParameters(reader, fetch.parameters));
visitor_.OnFetchMessage(fetch);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessFetchOk(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtFetchOk fetch_ok;
uint8_t end_of_track;
if (!reader.ReadVarInt62(&fetch_ok.request_id) ||
!reader.ReadUInt8(&end_of_track) ||
!reader.ReadVarInt62(&fetch_ok.end_location.group) ||
!reader.ReadVarInt62(&fetch_ok.end_location.object)) {
return absl::InvalidArgumentError("Message missing fields");
}
if (end_of_track > 0x01) {
return absl::InvalidArgumentError("Invalid end of track value in FETCH_OK");
}
if (fetch_ok.end_location.object == 0) {
fetch_ok.end_location.object = kMaxObjectId;
} else {
--fetch_ok.end_location.object;
}
fetch_ok.end_of_track = end_of_track == 1;
QUICHE_RETURN_IF_ERROR(
FillAndValidateMessageParameters(reader, fetch_ok.parameters));
QUICHE_RETURN_IF_ERROR(
ParseKeyValuePairListWithNoPrefix(reader, fetch_ok.extensions));
if (!fetch_ok.extensions.Validate()) {
return absl::InvalidArgumentError("Invalid FETCH_OK track extensions");
}
visitor_.OnFetchOkMessage(fetch_ok);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessFetchCancel(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtFetchCancel fetch_cancel;
if (!reader.ReadVarInt62(&fetch_cancel.request_id)) {
return absl::InvalidArgumentError("Request ID missing");
}
visitor_.OnFetchCancelMessage(fetch_cancel);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessRequestsBlocked(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtRequestsBlocked requests_blocked;
if (!reader.ReadVarInt62(&requests_blocked.max_request_id)) {
return absl::InvalidArgumentError("Max request ID missing");
}
visitor_.OnRequestsBlockedMessage(requests_blocked);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessPublish(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtPublish publish;
QUICHE_DCHECK(reader.PreviouslyReadPayload().empty());
if (!reader.ReadVarInt62(&publish.request_id)) {
return absl::InvalidArgumentError("Request ID missing");
}
QUICHE_RETURN_IF_ERROR(ReadFullTrackName(reader, publish.full_track_name));
if (!reader.ReadVarInt62(&publish.track_alias)) {
return absl::InvalidArgumentError("Track alias missing");
}
QUICHE_RETURN_IF_ERROR(
FillAndValidateMessageParameters(reader, publish.parameters));
QUICHE_RETURN_IF_ERROR(
ParseKeyValuePairListWithNoPrefix(reader, publish.extensions));
if (!publish.extensions.Validate()) {
return absl::InvalidArgumentError("Invalid PUBLISH track extensions");
}
visitor_.OnPublishMessage(publish);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessPublishOk(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtPublishOk publish_ok;
if (!reader.ReadVarInt62(&publish_ok.request_id)) {
return absl::InvalidArgumentError("Message missing fields");
}
QUICHE_RETURN_IF_ERROR(
FillAndValidateMessageParameters(reader, publish_ok.parameters));
visitor_.OnPublishOkMessage(publish_ok);
return CheckForTrailingData(reader);
}
absl::Status MoqtControlParser::ProcessObjectAck(absl::string_view data) {
quic::QuicDataReader reader(data);
MoqtObjectAck object_ack;
uint64_t raw_delta;
if (!reader.ReadVarInt62(&object_ack.subscribe_id) ||
!reader.ReadVarInt62(&object_ack.group_id) ||
!reader.ReadVarInt62(&object_ack.object_id) ||
!reader.ReadVarInt62(&raw_delta)) {
return absl::InvalidArgumentError("Message missing fields");
}
object_ack.delta_from_deadline = quic::QuicTimeDelta::FromMicroseconds(
SignedVarintUnserializedForm(raw_delta));
visitor_.OnObjectAckMessage(object_ack);
return CheckForTrailingData(reader);
}
void MoqtControlParser::ParseError(absl::string_view reason) {
ParseError(MoqtError::kProtocolViolation, reason);
}
void MoqtControlParser::ParseError(const absl::Status& status) {
ParseError(
GetMoqtErrorForStatus(status).value_or(MoqtError::kProtocolViolation),
status.message());
}
void MoqtControlParser::ParseError(MoqtError error_code,
absl::string_view reason) {
if (parsing_error_) {
return; // Don't send multiple parse errors.
}
no_more_data_ = true;
parsing_error_ = true;
visitor_.OnParsingError(error_code, reason);
}
absl::Status MoqtControlParser::ReadTrackNamespace(
quic::QuicDataReader& reader, TrackNamespace& track_namespace) {
QUICHE_DCHECK(track_namespace.empty());
uint64_t num_elements;
if (!reader.ReadVarInt62(&num_elements)) {
return absl::InvalidArgumentError(
"Unable to parse the number of namespace elements");
}
if (num_elements == 0 || num_elements > kMaxNamespaceElements) {
return absl::InvalidArgumentError("Invalid number of namespace elements");
}
absl::FixedArray<absl::string_view> elements(num_elements);
for (uint64_t i = 0; i < num_elements; ++i) {
if (!reader.ReadStringPieceVarInt62(&elements[i])) {
return absl::InvalidArgumentError(
"Namespace element shorter than specified");
}
}
if (!track_namespace.Append(elements)) {
return absl::InvalidArgumentError("Track namespace is too large");
}
return absl::OkStatus();
}
absl::Status MoqtControlParser::ReadFullTrackName(
quic::QuicDataReader& reader, FullTrackName& full_track_name) {
QUICHE_DCHECK(!full_track_name.IsValid());
TrackNamespace track_namespace;
QUICHE_RETURN_IF_ERROR(ReadTrackNamespace(reader, track_namespace));
absl::string_view name;
if (!reader.ReadStringPieceVarInt62(&name)) {
return absl::InvalidArgumentError("Unable to parse track name");
}
absl::StatusOr<FullTrackName> full_track_name_or =
FullTrackName::Create(std::move(track_namespace), std::string(name));
QUICHE_RETURN_IF_ERROR(full_track_name_or.status());
full_track_name = *std::move(full_track_name_or);
return absl::OkStatus();
}
absl::Status MoqtControlParser::FillAndValidateSetupParameters(
const KeyValuePairList& in, SetupParameters& out,
MoqtMessageType message_type) {
QUICHE_RETURN_IF_ERROR(out.FromKeyValuePairList(in));
MoqtError error =
SetupParametersAllowedByMessage(out, message_type, uses_web_transport_);
if (error != MoqtError::kNoError) {
return MoqtErrorStatusWithCode("Setup parameter parsing error", error);
}
return absl::OkStatus();
}
absl::Status MoqtControlParser::FillAndValidateMessageParameters(
quic::QuicDataReader& reader, MessageParameters& out) {
KeyValuePairList pairs;
QUICHE_RETURN_IF_ERROR(ParseKeyValuePairList(reader, pairs));
// All parameter types are allowed in all messages.
QUICHE_RETURN_IF_ERROR(out.FromKeyValuePairList(pairs));
return absl::OkStatus();
}
void MoqtDataParser::ParseError(absl::string_view reason) {
if (parsing_error_) {
return; // Don't send multiple parse errors.
}
next_input_ = kFailed;
no_more_data_ = true;
parsing_error_ = true;
visitor_.OnParsingError(MoqtError::kProtocolViolation, reason);
}
std::optional<absl::string_view> ParseDatagram(absl::string_view data,
MoqtObject& object_metadata,
bool& use_default_priority) {
uint64_t type_raw, object_status_raw;
absl::string_view extensions;
quic::QuicDataReader reader(data);
object_metadata = MoqtObject();
if (!reader.ReadVarInt62(&type_raw) ||
!reader.ReadVarInt62(&object_metadata.track_alias) ||
!reader.ReadVarInt62(&object_metadata.group_id)) {
return std::nullopt;
}
std::optional<MoqtDatagramType> datagram_type =
MoqtDatagramType::FromValue(type_raw);
if (!datagram_type.has_value()) {
return std::nullopt;
}
if (datagram_type->end_of_group()) {
object_metadata.object_status = MoqtObjectStatus::kEndOfGroup;
if (datagram_type->has_status()) {
QUICHE_BUG(Moqt_invalid_datagram_type)
<< "Invalid datagram type: " << type_raw;
return std::nullopt;
}
} else {
object_metadata.object_status = MoqtObjectStatus::kNormal;
}
if (datagram_type->has_object_id()) {
if (!reader.ReadVarInt62(&object_metadata.object_id)) {
return std::nullopt;
}
} else {
object_metadata.object_id = 0;
}
object_metadata.subgroup_id = std::nullopt;
use_default_priority = datagram_type->has_default_priority();
if (!use_default_priority &&
!reader.ReadUInt8(&object_metadata.publisher_priority)) {
return std::nullopt;
}
if (datagram_type->has_extension()) {
if (!reader.ReadStringPieceVarInt62(&extensions)) {
return std::nullopt;
}
if (extensions.empty()) {
// This is a session error.
return std::nullopt;
}
object_metadata.extension_headers = std::string(extensions);
}
if (datagram_type->has_status()) {
object_metadata.payload_length = 0;
if (!reader.ReadVarInt62(&object_status_raw)) {
return std::nullopt;
}
object_metadata.object_status = IntegerToObjectStatus(object_status_raw);
return "";
}
absl::string_view payload = reader.ReadRemainingPayload();
object_metadata.payload_length = payload.length();
return payload;
}
void MoqtDataParser::ReadDataUntil(StopCondition stop_condition) {
if (processing_) {
QUICHE_BUG(MoqtDataParser_reentry)
<< "Calling ProcessData() when ProcessData() is already in progress.";
return;
}
processing_ = true;
auto on_return = absl::MakeCleanup([&] { processing_ = false; });
State last_state = state();
for (;;) {
ParseNextItemFromStream();
if (state() == last_state || no_more_data_ || stop_condition()) {
break;
}
last_state = state();
}
}
std::optional<uint64_t> MoqtDataParser::ReadVarInt62NoFin() {
bool fin_read = false;
std::optional<uint64_t> result = ReadVarInt62FromStream(stream_, fin_read);
if (fin_read) { // FIN received before a complete varint.
ParseError("FIN after incomplete message");
return std::nullopt;
}
return result;
}
std::optional<uint8_t> MoqtDataParser::ReadUint8NoFin() {
char buffer[1];
webtransport::Stream::ReadResult read_result =
stream_.Read(absl::MakeSpan(buffer));
if (read_result.bytes_read == 0) {
return std::nullopt;
}
return absl::bit_cast<uint8_t>(buffer[0]);
}
MoqtDataParser::NextInput MoqtDataParser::AdvanceParserState() {
if (type_.IsFetch()) {
switch (next_input_) {
case kStreamType:
return kRequestId;
case kRequestId:
return kSerializationFlags;
case kSerializationFlags:
if (fetch_serialization_.has_group_id()) {
return kGroupId;
}
[[fallthrough]];
case kGroupId:
if (fetch_serialization_.is_datagram()) {
metadata_.subgroup_id = std::nullopt;
} else {
if (fetch_serialization_.has_subgroup_id()) {
return kSubgroupId;
}
if (fetch_serialization_.prior_subgroup_id_plus_one()) {
if (!metadata_.subgroup_id.has_value()) {
ParseError("reference to subgroup ID of prior datagram");
return kFailed;
}
++(*metadata_.subgroup_id);
} else if (fetch_serialization_.zero_subgroup_id()) {
metadata_.subgroup_id = 0;
} else if (!metadata_.subgroup_id.has_value()) {
QUICHE_DCHECK(fetch_serialization_.prior_subgroup_id());
ParseError("reference to subgroup ID of prior datagram");
return kFailed;
}
}
[[fallthrough]];
case kSubgroupId:
if (fetch_serialization_.has_object_id()) {
return kObjectId;
}
++metadata_.object_id;
[[fallthrough]];
case kObjectId:
if (fetch_serialization_.end_of_non_existent_range() ||
fetch_serialization_.end_of_unknown_range()) {
return kSerializationFlags;
}
if (fetch_serialization_.has_priority()) {
return kPublisherPriority;
}
[[fallthrough]];
case kPublisherPriority:
if (fetch_serialization_.has_extensions()) {
return kExtensionSize;
}
metadata_.extension_headers = "";
return kObjectPayloadLength;
case kExtensionBody:
return kObjectPayloadLength;
case kData:
return kSerializationFlags;
case kTrackAlias:
case kObjectPayloadLength:
case kAwaitingNextByte:
case kStatus:
case kFailed:
case kExtensionSize:
case kPadding:
QUICHE_NOTREACHED();
return next_input_;
}
}
switch (next_input_) {
// The state table is factored into a separate function (rather than
// inlined) in order to separate the order of elements from the way they are
// parsed.
case kStreamType:
return kTrackAlias;
case kTrackAlias:
return kGroupId;
case kGroupId:
if (type_.IsSubgroupPresent()) {
return kSubgroupId;
}
if (type_.SubgroupIsZero()) {
metadata_.subgroup_id = 0;
}
[[fallthrough]];
case kSubgroupId:
if (!type_.HasDefaultPriority()) {
return kPublisherPriority;
}
metadata_.publisher_priority = default_publisher_priority_;
[[fallthrough]];
case kPublisherPriority:
return kObjectId;
case kObjectId:
if (num_objects_read_ == 0 && type_.SubgroupIsFirstObjectId()) {
metadata_.subgroup_id = metadata_.object_id;
}
if (type_.AreExtensionHeadersPresent()) {
return kExtensionSize;
}
[[fallthrough]];
case kExtensionBody:
return kObjectPayloadLength;
case kStatus:
case kData:
case kAwaitingNextByte:
return kObjectId;
case kRequestId:
case kSerializationFlags:
case kExtensionSize:
case kObjectPayloadLength:
case kPadding:
case kFailed:
// Other transitions are either Fetch-only or handled in
// ParseNextItemFromStream.
QUICHE_NOTREACHED();
return next_input_;
}
}
void MoqtDataParser::ParseNextItemFromStream() {
if (CheckForFinWithoutData()) {
return;
}
switch (next_input_) {
case kStreamType: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (!value_read.has_value()) {
return;
}
std::optional<MoqtDataStreamType> type =
MoqtDataStreamType::FromValue(*value_read);
if (!type.has_value()) {
ParseError("Invalid stream type supplied");
return;
}
type_ = *type;
if (type_.IsPadding()) {
next_input_ = kPadding;
return;
}
if (type_.EndOfGroupInStream()) {
contains_end_of_group_ = true;
}
next_input_ = AdvanceParserState();
return;
}
case kRequestId:
case kTrackAlias: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
metadata_.track_alias = *value_read;
next_input_ = AdvanceParserState();
}
return;
}
case kSerializationFlags: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
std::optional<MoqtFetchSerialization> serialization =
MoqtFetchSerialization::FromValue(*value_read);
if (!serialization.has_value()) {
ParseError("Invalid serialization flags");
return;
}
if (num_objects_read_ == 0 &&
(serialization->prior_subgroup_id() ||
serialization->prior_subgroup_id_plus_one() ||
!serialization->has_object_id() ||
!serialization->has_group_id() ||
!serialization->has_priority())) {
ParseError("Invalid serialization flags for first object");
return;
}
fetch_serialization_ = *serialization;
next_input_ = AdvanceParserState();
}
return;
}
case kGroupId: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
if (type_.IsFetch() ||
!fetch_serialization_.end_of_non_existent_range() ||
!fetch_serialization_.end_of_unknown_range()) {
// Do not record range indicator group IDs because it will corrupt
// references to the previous object.
metadata_.group_id = *value_read;
}
next_input_ = AdvanceParserState();
}
return;
}
case kSubgroupId: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
metadata_.subgroup_id = *value_read;
next_input_ = AdvanceParserState();
}
return;
}
case kPublisherPriority: {
std::optional<uint8_t> value_read = ReadUint8NoFin();
if (value_read.has_value()) {
metadata_.publisher_priority = *value_read;
next_input_ = AdvanceParserState();
}
return;
}
case kObjectId: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
if (type_.IsFetch() ||
!fetch_serialization_.end_of_non_existent_range() ||
!fetch_serialization_.end_of_unknown_range()) {
// Do not record range indicator object IDs because it will corrupt
// references to the previous object.
if (type_.IsSubgroup() && last_object_id_.has_value()) {
metadata_.object_id = *value_read + *last_object_id_ + 1;
} else {
metadata_.object_id = *value_read;
}
}
last_object_id_ = metadata_.object_id;
next_input_ = AdvanceParserState();
}
// TODO(martinduke): Report something if the fetch serialization is an end
// of range indicator.
return;
}
case kExtensionSize: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
metadata_.extension_headers.clear();
payload_length_remaining_ = *value_read;
next_input_ = (value_read == 0) ? kObjectPayloadLength : kExtensionBody;
}
return;
}
case kObjectPayloadLength: {
std::optional<uint64_t> value_read = ReadVarInt62NoFin();
if (value_read.has_value()) {
metadata_.payload_length = *value_read;
payload_length_remaining_ = *value_read;
if (metadata_.payload_length > 0) {
metadata_.object_status = MoqtObjectStatus::kNormal;
next_input_ = kData;
} else {
next_input_ = kStatus;
}
}
return;
}
case kStatus: {
bool fin_read = false;
std::optional<uint64_t> value_read =
ReadVarInt62FromStream(stream_, fin_read);
if (value_read.has_value()) {
metadata_.object_status = IntegerToObjectStatus(*value_read);
if (metadata_.object_status == MoqtObjectStatus::kInvalidObjectStatus) {
ParseError("Invalid object status provided");
return;
}
++num_objects_read_;
// TODO(martinduke): If contains_end_of_group_ && fin_read, the track is
// malformed. There is no API to signal this to the session yet, but the
// contains_end_of_group_ logic is likely to substantially change in the
// spec. Don't bother to signal this for now; just ignore that the
// stream was supposed to conclude with kEndOfGroup and end it with the
// encoded status instead.
visitor_.OnObjectMessage(metadata_, "", /*end_of_message=*/true);
next_input_ = AdvanceParserState();
}
if (fin_read) {
visitor_.OnFin();
no_more_data_ = true;
return;
}
return;
}
case kExtensionBody:
case kData: {
while (payload_length_remaining_ > 0) {
webtransport::Stream::PeekResult peek_result =
stream_.PeekNextReadableRegion();
if (!peek_result.has_data()) {
return;
}
size_t chunk_size =
std::min(payload_length_remaining_, peek_result.peeked_data.size());
payload_length_remaining_ -= chunk_size;
bool done = payload_length_remaining_ == 0;
if (next_input_ == kData) {
no_more_data_ = peek_result.all_data_received &&
chunk_size == stream_.ReadableBytes();
if (!done && no_more_data_) {
ParseError("FIN received at an unexpected point in the stream");
return;
}
if (contains_end_of_group_) {
if (no_more_data_) {
metadata_.object_status = MoqtObjectStatus::kEndOfGroup;
} else if (done) {
// Don't signal done until the next byte arrives.
next_input_ = kAwaitingNextByte;
done = false;
}
}
visitor_.OnObjectMessage(
metadata_, peek_result.peeked_data.substr(0, chunk_size), done);
if (done) {
if (no_more_data_) {
visitor_.OnFin();
}
++num_objects_read_;
next_input_ = AdvanceParserState();
}
if (stream_.SkipBytes(chunk_size) && !no_more_data_) {
// Although there was no FIN, SkipBytes() can return true if the
// stream is reset, probably because OnObjectMessage() caused
// something to happen to the stream or the session.
no_more_data_ = true;
if (!done) {
ParseError("FIN received at an unexpected point in the stream");
}
}
} else {
absl::StrAppend(&metadata_.extension_headers,
peek_result.peeked_data.substr(0, chunk_size));
if (stream_.SkipBytes(chunk_size)) {
ParseError("FIN received at an unexpected point in the stream");
no_more_data_ = true;
return;
}
if (done) {
next_input_ = AdvanceParserState();
}
}
}
return;
}
case kAwaitingNextByte: {
QUICHE_NOTREACHED(); // CheckForFinWithoutData() should have handled it.
return;
}
case kPadding:
no_more_data_ |= stream_.SkipBytes(stream_.ReadableBytes());
return;
case kFailed:
return;
}
}
void MoqtDataParser::ReadAllData() {
ReadDataUntil(+[]() { return false; });
}
void MoqtDataParser::ReadStreamType() {
return ReadDataUntil([this]() { return next_input_ != kStreamType; });
}
void MoqtDataParser::ReadTrackAlias() {
return ReadDataUntil([this]() { return next_input_ > kTrackAlias; });
}
void MoqtDataParser::ReadAtMostOneObject() {
const size_t num_objects_read_initial = num_objects_read_;
return ReadDataUntil(
[&]() { return num_objects_read_ != num_objects_read_initial; });
}
bool MoqtDataParser::CheckForFinWithoutData() {
if (!stream_.PeekNextReadableRegion().fin_next) {
if (next_input_ == kAwaitingNextByte) {
// Data arrived; the last object was not EndOfGroup.
visitor_.OnObjectMessage(metadata_, "", /*end_of_message=*/true);
next_input_ = AdvanceParserState();
++num_objects_read_;
}
return false;
}
no_more_data_ = true;
const bool valid_state =
payload_length_remaining_ == 0 &&
((type_.IsSubgroup() && next_input_ == kObjectId) ||
(type_.IsFetch() && next_input_ == kSerializationFlags));
if (!valid_state) {
ParseError("FIN received at an unexpected point in the stream");
return true;
}
if (next_input_ == kAwaitingNextByte) {
metadata_.object_status = MoqtObjectStatus::kEndOfGroup;
visitor_.OnObjectMessage(metadata_, "", /*end_of_message=*/true);
}
visitor_.OnFin();
return stream_.SkipBytes(0);
}
} // namespace moqt