blob: 3e6b0e5243eebdcbe1c6cca1d115ef07fd9f00a1 [file] [log] [blame] [edit]
// Copyright (c) 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_parser.h"
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <optional>
#include <string>
#include "absl/cleanup/cleanup.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_data_reader.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/common/platform/api/quiche_logging.h"
namespace moqt {
namespace {
bool ParseDeliveryOrder(uint8_t raw_value,
std::optional<MoqtDeliveryOrder>& output) {
switch (raw_value) {
case 0x00:
output = std::nullopt;
return true;
case 0x01:
output = MoqtDeliveryOrder::kAscending;
return true;
case 0x02:
output = MoqtDeliveryOrder::kDescending;
return true;
default:
return false;
}
}
uint64_t SignedVarintUnserializedForm(uint64_t value) {
if (value & 0x01) {
return -(value >> 1);
}
return value >> 1;
}
} // namespace
// The buffering philosophy is complicated, to minimize copying. Here is an
// overview:
// If the entire message body is present (except for OBJECT payload), it is
// parsed and delivered. If not, the partial body is buffered. (requiring a
// copy).
// Any OBJECT payload is always delivered to the application without copying.
// If something has been buffered, when more data arrives copy just enough of it
// to finish parsing that thing, then resume normal processing.
void MoqtParser::ProcessData(absl::string_view data, bool fin) {
if (no_more_data_) {
ParseError("Data after end of stream");
}
if (processing_) {
return;
}
processing_ = true;
auto on_return = absl::MakeCleanup([&] { processing_ = false; });
// Check for early fin
if (fin) {
no_more_data_ = true;
if (ObjectPayloadInProgress() &&
payload_length_remaining_ > data.length()) {
ParseError("End of stream before complete OBJECT PAYLOAD");
return;
}
if (!buffered_message_.empty() && data.empty()) {
ParseError("End of stream before complete message");
return;
}
}
std::optional<quic::QuicDataReader> reader = std::nullopt;
size_t original_buffer_size = buffered_message_.size();
// There are three cases: the parser has already delivered an OBJECT header
// and is now delivering payload; part of a message is in the buffer; or
// no message is in progress.
if (ObjectPayloadInProgress()) {
// This is additional payload for an OBJECT.
QUICHE_DCHECK(buffered_message_.empty());
if (!object_metadata_->payload_length.has_value()) {
// Deliver the data and exit.
visitor_.OnObjectMessage(*object_metadata_, data, fin);
if (fin) {
object_metadata_.reset();
}
return;
}
if (data.length() < payload_length_remaining_) {
// Does not finish the payload; deliver and exit.
visitor_.OnObjectMessage(*object_metadata_, data, false);
payload_length_remaining_ -= data.length();
return;
}
// Finishes the payload. Deliver and continue.
reader.emplace(data);
visitor_.OnObjectMessage(*object_metadata_,
data.substr(0, payload_length_remaining_), true);
reader->Seek(payload_length_remaining_);
payload_length_remaining_ = 0; // Expect a new object.
} else if (!buffered_message_.empty()) {
absl::StrAppend(&buffered_message_, data);
reader.emplace(buffered_message_);
} else {
// No message in progress.
reader.emplace(data);
}
size_t total_processed = 0;
while (!reader->IsDoneReading()) {
size_t message_len = ProcessMessage(reader->PeekRemainingPayload(), fin);
if (message_len == 0) {
if (reader->BytesRemaining() > kMaxMessageHeaderSize) {
ParseError(MoqtError::kInternalError,
"Cannot parse non-OBJECT messages > 2KB");
return;
}
if (fin) {
ParseError("FIN after incomplete message");
return;
}
if (buffered_message_.empty()) {
// If the buffer is not empty, |data| has already been copied there.
absl::StrAppend(&buffered_message_, reader->PeekRemainingPayload());
}
break;
}
// A message was successfully processed.
total_processed += message_len;
reader->Seek(message_len);
}
if (original_buffer_size > 0) {
buffered_message_.erase(0, total_processed);
}
}
// static
absl::string_view MoqtParser::ProcessDatagram(absl::string_view data,
MoqtObject& object_metadata) {
uint64_t value;
quic::QuicDataReader reader(data);
if (!reader.ReadVarInt62(&value)) {
return absl::string_view();
}
if (static_cast<MoqtMessageType>(value) != MoqtMessageType::kObjectDatagram) {
return absl::string_view();
}
size_t processed_data = ParseObjectHeader(reader, object_metadata,
MoqtMessageType::kObjectDatagram);
if (processed_data == 0) { // Incomplete header
return absl::string_view();
}
return reader.PeekRemainingPayload();
}
size_t MoqtParser::ProcessMessage(absl::string_view data, bool fin) {
uint64_t value;
quic::QuicDataReader reader(data);
if (ObjectStreamInitialized() && !ObjectPayloadInProgress()) {
// This is a follow-on object in a stream.
return ProcessObject(reader,
GetMessageTypeForForwardingPreference(
object_metadata_->forwarding_preference),
fin);
}
if (!reader.ReadVarInt62(&value)) {
return 0;
}
auto type = static_cast<MoqtMessageType>(value);
switch (type) {
case MoqtMessageType::kObjectDatagram:
ParseError("Received OBJECT_DATAGRAM on stream");
return 0;
case MoqtMessageType::kObjectStream:
case MoqtMessageType::kStreamHeaderTrack:
case MoqtMessageType::kStreamHeaderGroup:
return ProcessObject(reader, type, fin);
case MoqtMessageType::kClientSetup:
return ProcessClientSetup(reader);
case MoqtMessageType::kServerSetup:
return ProcessServerSetup(reader);
case MoqtMessageType::kSubscribe:
return ProcessSubscribe(reader);
case MoqtMessageType::kSubscribeOk:
return ProcessSubscribeOk(reader);
case MoqtMessageType::kSubscribeError:
return ProcessSubscribeError(reader);
case MoqtMessageType::kUnsubscribe:
return ProcessUnsubscribe(reader);
case MoqtMessageType::kSubscribeDone:
return ProcessSubscribeDone(reader);
case MoqtMessageType::kSubscribeUpdate:
return ProcessSubscribeUpdate(reader);
case MoqtMessageType::kAnnounce:
return ProcessAnnounce(reader);
case MoqtMessageType::kAnnounceOk:
return ProcessAnnounceOk(reader);
case MoqtMessageType::kAnnounceError:
return ProcessAnnounceError(reader);
case MoqtMessageType::kAnnounceCancel:
return ProcessAnnounceCancel(reader);
case MoqtMessageType::kTrackStatusRequest:
return ProcessTrackStatusRequest(reader);
case MoqtMessageType::kUnannounce:
return ProcessUnannounce(reader);
case MoqtMessageType::kTrackStatus:
return ProcessTrackStatus(reader);
case MoqtMessageType::kGoAway:
return ProcessGoAway(reader);
case moqt::MoqtMessageType::kObjectAck:
return ProcessObjectAck(reader);
default:
ParseError("Unknown message type");
return 0;
}
}
size_t MoqtParser::ProcessObject(quic::QuicDataReader& reader,
MoqtMessageType type, bool fin) {
size_t processed_data = 0;
QUICHE_DCHECK(!ObjectPayloadInProgress());
if (!ObjectStreamInitialized()) {
object_metadata_ = MoqtObject();
processed_data = ParseObjectHeader(reader, object_metadata_.value(), type);
if (processed_data == 0) {
object_metadata_.reset();
return 0;
}
}
// At this point, enough data has been processed to store in object_metadata_,
// even if there's nothing else in the buffer.
QUICHE_DCHECK(payload_length_remaining_ == 0);
switch (type) {
case MoqtMessageType::kStreamHeaderTrack:
if (!reader.ReadVarInt62(&object_metadata_->group_id)) {
return processed_data;
}
[[fallthrough]];
case MoqtMessageType::kStreamHeaderGroup: {
uint64_t length;
if (!reader.ReadVarInt62(&object_metadata_->object_id) ||
!reader.ReadVarInt62(&length)) {
return processed_data;
}
object_metadata_->payload_length = length;
uint64_t status = 0; // Defaults to kNormal.
if (length == 0 && !reader.ReadVarInt62(&status)) {
return processed_data;
}
object_metadata_->object_status = IntegerToObjectStatus(status);
break;
}
default:
break;
}
if (object_metadata_->object_status ==
MoqtObjectStatus::kInvalidObjectStatus) {
ParseError("Invalid object status");
return processed_data;
}
if (object_metadata_->object_status != MoqtObjectStatus::kNormal) {
// It is impossible to express an explicit length with this status.
if ((type == MoqtMessageType::kObjectStream ||
type == MoqtMessageType::kObjectDatagram) &&
reader.BytesRemaining() > 0) {
// There is additional data in the stream/datagram, which is an error.
ParseError("Object with non-normal status has payload");
return processed_data;
}
visitor_.OnObjectMessage(*object_metadata_, "", true);
return reader.PreviouslyReadPayload().length();
}
bool has_length = object_metadata_->payload_length.has_value();
bool received_complete_message = false;
size_t payload_to_draw = reader.BytesRemaining();
if (fin && has_length &&
*object_metadata_->payload_length > reader.BytesRemaining()) {
ParseError("Received FIN mid-payload");
return processed_data;
}
received_complete_message =
fin || (has_length &&
*object_metadata_->payload_length <= reader.BytesRemaining());
if (received_complete_message && has_length &&
*object_metadata_->payload_length < reader.BytesRemaining()) {
payload_to_draw = *object_metadata_->payload_length;
}
// The error case where there's a fin before the explicit length is complete
// is handled in ProcessData() in two separate places. Even though the
// message is "done" if fin regardless of has_length, it's bad to report to
// the application that the object is done if it hasn't reached the promised
// length.
visitor_.OnObjectMessage(
*object_metadata_,
reader.PeekRemainingPayload().substr(0, payload_to_draw),
received_complete_message);
reader.Seek(payload_to_draw);
payload_length_remaining_ =
has_length ? *object_metadata_->payload_length - payload_to_draw : 0;
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessClientSetup(quic::QuicDataReader& reader) {
MoqtClientSetup setup;
uint64_t number_of_supported_versions;
if (!reader.ReadVarInt62(&number_of_supported_versions)) {
return 0;
}
uint64_t version;
for (uint64_t i = 0; i < number_of_supported_versions; ++i) {
if (!reader.ReadVarInt62(&version)) {
return 0;
}
setup.supported_versions.push_back(static_cast<MoqtVersion>(version));
}
uint64_t num_params;
if (!reader.ReadVarInt62(&num_params)) {
return 0;
}
// Parse parameters
for (uint64_t i = 0; i < num_params; ++i) {
uint64_t type;
absl::string_view value;
if (!ReadParameter(reader, type, value)) {
return 0;
}
auto key = static_cast<MoqtSetupParameter>(type);
switch (key) {
case MoqtSetupParameter::kRole:
if (setup.role.has_value()) {
ParseError("ROLE parameter appears twice in SETUP");
return 0;
}
uint64_t index;
if (!StringViewToVarInt(value, index)) {
return 0;
}
if (index > static_cast<uint64_t>(MoqtRole::kRoleMax)) {
ParseError("Invalid ROLE parameter");
return 0;
}
setup.role = static_cast<MoqtRole>(index);
break;
case MoqtSetupParameter::kPath:
if (uses_web_transport_) {
ParseError(
"WebTransport connection is using PATH parameter in SETUP");
return 0;
}
if (setup.path.has_value()) {
ParseError("PATH parameter appears twice in CLIENT_SETUP");
return 0;
}
setup.path = value;
break;
case MoqtSetupParameter::kSupportObjectAcks:
uint64_t flag;
if (!StringViewToVarInt(value, flag) || flag > 1) {
ParseError("Invalid kSupportObjectAcks value");
return 0;
}
setup.supports_object_ack = static_cast<bool>(flag);
break;
default:
// Skip over the parameter.
break;
}
}
if (!setup.role.has_value()) {
ParseError("ROLE parameter missing from CLIENT_SETUP message");
return 0;
}
if (!uses_web_transport_ && !setup.path.has_value()) {
ParseError("PATH SETUP parameter missing from Client message over QUIC");
return 0;
}
visitor_.OnClientSetupMessage(setup);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessServerSetup(quic::QuicDataReader& reader) {
MoqtServerSetup setup;
uint64_t version;
if (!reader.ReadVarInt62(&version)) {
return 0;
}
setup.selected_version = static_cast<MoqtVersion>(version);
uint64_t num_params;
if (!reader.ReadVarInt62(&num_params)) {
return 0;
}
// Parse parameters
for (uint64_t i = 0; i < num_params; ++i) {
uint64_t type;
absl::string_view value;
if (!ReadParameter(reader, type, value)) {
return 0;
}
auto key = static_cast<MoqtSetupParameter>(type);
switch (key) {
case MoqtSetupParameter::kRole:
if (setup.role.has_value()) {
ParseError("ROLE parameter appears twice in SETUP");
return 0;
}
uint64_t index;
if (!StringViewToVarInt(value, index)) {
return 0;
}
if (index > static_cast<uint64_t>(MoqtRole::kRoleMax)) {
ParseError("Invalid ROLE parameter");
return 0;
}
setup.role = static_cast<MoqtRole>(index);
break;
case MoqtSetupParameter::kPath:
ParseError("PATH parameter in SERVER_SETUP");
return 0;
case MoqtSetupParameter::kSupportObjectAcks:
uint64_t flag;
if (!StringViewToVarInt(value, flag) || flag > 1) {
ParseError("Invalid kSupportObjectAcks value");
return 0;
}
setup.supports_object_ack = static_cast<bool>(flag);
break;
default:
// Skip over the parameter.
break;
}
}
if (!setup.role.has_value()) {
ParseError("ROLE parameter missing from SERVER_SETUP message");
return 0;
}
visitor_.OnServerSetupMessage(setup);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessSubscribe(quic::QuicDataReader& reader) {
MoqtSubscribe subscribe_request;
uint64_t filter, group, object;
uint8_t group_order;
if (!reader.ReadVarInt62(&subscribe_request.subscribe_id) ||
!reader.ReadVarInt62(&subscribe_request.track_alias) ||
!reader.ReadStringVarInt62(subscribe_request.track_namespace) ||
!reader.ReadStringVarInt62(subscribe_request.track_name) ||
!reader.ReadUInt8(&subscribe_request.subscriber_priority) ||
!reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&filter)) {
return 0;
}
if (!ParseDeliveryOrder(group_order, subscribe_request.group_order)) {
ParseError("Invalid group order value in SUBSCRIBE message");
return 0;
}
MoqtFilterType filter_type = static_cast<MoqtFilterType>(filter);
switch (filter_type) {
case MoqtFilterType::kLatestGroup:
subscribe_request.start_object = 0;
break;
case MoqtFilterType::kLatestObject:
break;
case MoqtFilterType::kAbsoluteStart:
case MoqtFilterType::kAbsoluteRange:
if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) {
return 0;
}
subscribe_request.start_group = group;
subscribe_request.start_object = object;
if (filter_type == MoqtFilterType::kAbsoluteStart) {
break;
}
if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) {
return 0;
}
subscribe_request.end_group = group;
if (subscribe_request.end_group < subscribe_request.start_group) {
ParseError("End group is less than start group");
return 0;
}
if (object == 0) {
subscribe_request.end_object = std::nullopt;
} else {
subscribe_request.end_object = object - 1;
if (subscribe_request.start_group == subscribe_request.end_group &&
subscribe_request.end_object < subscribe_request.start_object) {
ParseError("End object comes before start object");
return 0;
}
}
break;
default:
ParseError("Invalid filter type");
return 0;
}
uint64_t num_params;
if (!reader.ReadVarInt62(&num_params)) {
return 0;
}
for (uint64_t i = 0; i < num_params; ++i) {
uint64_t type;
absl::string_view value;
if (!ReadParameter(reader, type, value)) {
return 0;
}
auto key = static_cast<MoqtTrackRequestParameter>(type);
switch (key) {
case MoqtTrackRequestParameter::kAuthorizationInfo:
if (subscribe_request.parameters.authorization_info.has_value()) {
ParseError(
"AUTHORIZATION_INFO parameter appears twice in "
"SUBSCRIBE");
return 0;
}
subscribe_request.parameters.authorization_info = value;
break;
case MoqtTrackRequestParameter::kOackWindowSize: {
if (subscribe_request.parameters.object_ack_window.has_value()) {
ParseError("OACK_WINDOW_SIZE parameter appears twice in SUBSCRIBE");
return 0;
}
uint64_t raw_value;
if (!StringViewToVarInt(value, raw_value)) {
ParseError("OACK_WINDOW_SIZE parameter is not a valid varint");
return 0;
}
subscribe_request.parameters.object_ack_window =
quic::QuicTimeDelta::FromMicroseconds(raw_value);
break;
}
default:
// Skip over the parameter.
break;
}
}
visitor_.OnSubscribeMessage(subscribe_request);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessSubscribeOk(quic::QuicDataReader& reader) {
MoqtSubscribeOk subscribe_ok;
uint64_t milliseconds;
uint8_t group_order;
uint8_t content_exists;
if (!reader.ReadVarInt62(&subscribe_ok.subscribe_id) ||
!reader.ReadVarInt62(&milliseconds) || !reader.ReadUInt8(&group_order) ||
!reader.ReadUInt8(&content_exists)) {
return 0;
}
if (content_exists > 1) {
ParseError("SUBSCRIBE_OK ContentExists has invalid value");
return 0;
}
if (group_order != 0x01 && group_order != 0x02) {
ParseError("Invalid group order value in SUBSCRIBE_OK");
return 0;
}
subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds);
subscribe_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
if (content_exists) {
subscribe_ok.largest_id = FullSequence();
if (!reader.ReadVarInt62(&subscribe_ok.largest_id->group) ||
!reader.ReadVarInt62(&subscribe_ok.largest_id->object)) {
return 0;
}
}
visitor_.OnSubscribeOkMessage(subscribe_ok);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessSubscribeError(quic::QuicDataReader& reader) {
MoqtSubscribeError subscribe_error;
uint64_t error_code;
if (!reader.ReadVarInt62(&subscribe_error.subscribe_id) ||
!reader.ReadVarInt62(&error_code) ||
!reader.ReadStringVarInt62(subscribe_error.reason_phrase) ||
!reader.ReadVarInt62(&subscribe_error.track_alias)) {
return 0;
}
subscribe_error.error_code = static_cast<SubscribeErrorCode>(error_code);
visitor_.OnSubscribeErrorMessage(subscribe_error);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessUnsubscribe(quic::QuicDataReader& reader) {
MoqtUnsubscribe unsubscribe;
if (!reader.ReadVarInt62(&unsubscribe.subscribe_id)) {
return 0;
}
visitor_.OnUnsubscribeMessage(unsubscribe);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessSubscribeDone(quic::QuicDataReader& reader) {
MoqtSubscribeDone subscribe_done;
uint8_t content_exists;
uint64_t value;
if (!reader.ReadVarInt62(&subscribe_done.subscribe_id) ||
!reader.ReadVarInt62(&value) ||
!reader.ReadStringVarInt62(subscribe_done.reason_phrase) ||
!reader.ReadUInt8(&content_exists)) {
return 0;
}
subscribe_done.status_code = static_cast<SubscribeDoneCode>(value);
if (content_exists > 1) {
ParseError("SUBSCRIBE_DONE ContentExists has invalid value");
return 0;
}
if (content_exists == 1) {
subscribe_done.final_id = FullSequence();
if (!reader.ReadVarInt62(&subscribe_done.final_id->group) ||
!reader.ReadVarInt62(&subscribe_done.final_id->object)) {
return 0;
}
}
visitor_.OnSubscribeDoneMessage(subscribe_done);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessSubscribeUpdate(quic::QuicDataReader& reader) {
MoqtSubscribeUpdate subscribe_update;
uint64_t end_group, end_object, num_params;
if (!reader.ReadVarInt62(&subscribe_update.subscribe_id) ||
!reader.ReadVarInt62(&subscribe_update.start_group) ||
!reader.ReadVarInt62(&subscribe_update.start_object) ||
!reader.ReadVarInt62(&end_group) || !reader.ReadVarInt62(&end_object) ||
!reader.ReadUInt8(&subscribe_update.subscriber_priority) ||
!reader.ReadVarInt62(&num_params)) {
return 0;
}
if (end_group == 0) {
// end_group remains nullopt.
if (end_object > 0) {
ParseError("SUBSCRIBE_UPDATE has end_object but no end_group");
return 0;
}
} else {
subscribe_update.end_group = end_group - 1;
if (subscribe_update.end_group < subscribe_update.start_group) {
ParseError("End group is less than start group");
return 0;
}
}
if (end_object > 0) {
subscribe_update.end_object = end_object - 1;
if (subscribe_update.end_object.has_value() &&
subscribe_update.start_group == *subscribe_update.end_group &&
*subscribe_update.end_object < subscribe_update.start_object) {
ParseError("End object comes before start object");
return 0;
}
} else {
subscribe_update.end_object = std::nullopt;
}
for (uint64_t i = 0; i < num_params; ++i) {
uint64_t type;
absl::string_view value;
if (!ReadParameter(reader, type, value)) {
return 0;
}
auto key = static_cast<MoqtTrackRequestParameter>(type);
switch (key) {
case MoqtTrackRequestParameter::kAuthorizationInfo:
if (subscribe_update.authorization_info.has_value()) {
ParseError(
"AUTHORIZATION_INFO parameter appears twice in "
"SUBSCRIBE_UPDATE");
return 0;
}
subscribe_update.authorization_info = value;
break;
default:
// Skip over the parameter.
break;
}
}
visitor_.OnSubscribeUpdateMessage(subscribe_update);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessAnnounce(quic::QuicDataReader& reader) {
MoqtAnnounce announce;
if (!reader.ReadStringVarInt62(announce.track_namespace)) {
return 0;
}
uint64_t num_params;
if (!reader.ReadVarInt62(&num_params)) {
return 0;
}
for (uint64_t i = 0; i < num_params; ++i) {
uint64_t type;
absl::string_view value;
if (!ReadParameter(reader, type, value)) {
return 0;
}
auto key = static_cast<MoqtTrackRequestParameter>(type);
switch (key) {
case MoqtTrackRequestParameter::kAuthorizationInfo:
if (announce.authorization_info.has_value()) {
ParseError("AUTHORIZATION_INFO parameter appears twice in ANNOUNCE");
return 0;
}
announce.authorization_info = value;
break;
default:
// Skip over the parameter.
break;
}
}
visitor_.OnAnnounceMessage(announce);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessAnnounceOk(quic::QuicDataReader& reader) {
MoqtAnnounceOk announce_ok;
if (!reader.ReadStringVarInt62(announce_ok.track_namespace)) {
return 0;
}
visitor_.OnAnnounceOkMessage(announce_ok);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessAnnounceError(quic::QuicDataReader& reader) {
MoqtAnnounceError announce_error;
if (!reader.ReadStringVarInt62(announce_error.track_namespace)) {
return 0;
}
uint64_t error_code;
if (!reader.ReadVarInt62(&error_code)) {
return 0;
}
announce_error.error_code = static_cast<MoqtAnnounceErrorCode>(error_code);
if (!reader.ReadStringVarInt62(announce_error.reason_phrase)) {
return 0;
}
visitor_.OnAnnounceErrorMessage(announce_error);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessAnnounceCancel(quic::QuicDataReader& reader) {
MoqtAnnounceCancel announce_cancel;
if (!reader.ReadStringVarInt62(announce_cancel.track_namespace)) {
return 0;
}
visitor_.OnAnnounceCancelMessage(announce_cancel);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessTrackStatusRequest(quic::QuicDataReader& reader) {
MoqtTrackStatusRequest track_status_request;
if (!reader.ReadStringVarInt62(track_status_request.track_namespace)) {
return 0;
}
if (!reader.ReadStringVarInt62(track_status_request.track_name)) {
return 0;
}
visitor_.OnTrackStatusRequestMessage(track_status_request);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessUnannounce(quic::QuicDataReader& reader) {
MoqtUnannounce unannounce;
if (!reader.ReadStringVarInt62(unannounce.track_namespace)) {
return 0;
}
visitor_.OnUnannounceMessage(unannounce);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessTrackStatus(quic::QuicDataReader& reader) {
MoqtTrackStatus track_status;
uint64_t value;
if (!reader.ReadStringVarInt62(track_status.track_namespace) ||
!reader.ReadStringVarInt62(track_status.track_name) ||
!reader.ReadVarInt62(&value) ||
!reader.ReadVarInt62(&track_status.last_group) ||
!reader.ReadVarInt62(&track_status.last_object)) {
return 0;
}
track_status.status_code = static_cast<MoqtTrackStatusCode>(value);
visitor_.OnTrackStatusMessage(track_status);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessGoAway(quic::QuicDataReader& reader) {
MoqtGoAway goaway;
if (!reader.ReadStringVarInt62(goaway.new_session_uri)) {
return 0;
}
visitor_.OnGoAwayMessage(goaway);
return reader.PreviouslyReadPayload().length();
}
size_t MoqtParser::ProcessObjectAck(quic::QuicDataReader& reader) {
MoqtObjectAck object_ack;
uint64_t raw_delta;
if (!reader.ReadVarInt62(&object_ack.subscribe_id) ||
!reader.ReadVarInt62(&object_ack.group_id) ||
!reader.ReadVarInt62(&object_ack.object_id) ||
!reader.ReadVarInt62(&raw_delta)) {
return 0;
}
object_ack.delta_from_deadline = quic::QuicTimeDelta::FromMicroseconds(
SignedVarintUnserializedForm(raw_delta));
visitor_.OnObjectAckMessage(object_ack);
return reader.PreviouslyReadPayload().length();
}
// static
size_t MoqtParser::ParseObjectHeader(quic::QuicDataReader& reader,
MoqtObject& object, MoqtMessageType type) {
if (!reader.ReadVarInt62(&object.subscribe_id) ||
!reader.ReadVarInt62(&object.track_alias)) {
return 0;
}
if (type != MoqtMessageType::kStreamHeaderTrack &&
!reader.ReadVarInt62(&object.group_id)) {
return 0;
}
if (type != MoqtMessageType::kStreamHeaderTrack &&
type != MoqtMessageType::kStreamHeaderGroup &&
!reader.ReadVarInt62(&object.object_id)) {
return 0;
}
if (!reader.ReadUInt8(&object.publisher_priority)) {
return 0;
}
uint64_t status = 0; // Defaults to kNormal.
if ((type == MoqtMessageType::kObjectStream ||
type == MoqtMessageType::kObjectDatagram) &&
!reader.ReadVarInt62(&status)) {
return 0;
}
object.object_status = IntegerToObjectStatus(status);
object.forwarding_preference = GetForwardingPreference(type);
return reader.PreviouslyReadPayload().length();
}
void MoqtParser::ParseError(absl::string_view reason) {
ParseError(MoqtError::kProtocolViolation, reason);
}
void MoqtParser::ParseError(MoqtError error_code, absl::string_view reason) {
if (parsing_error_) {
return; // Don't send multiple parse errors.
}
no_more_data_ = true;
parsing_error_ = true;
visitor_.OnParsingError(error_code, reason);
}
bool MoqtParser::ReadVarIntPieceVarInt62(quic::QuicDataReader& reader,
uint64_t& result) {
uint64_t length;
if (!reader.ReadVarInt62(&length)) {
return false;
}
uint64_t actual_length = static_cast<uint64_t>(reader.PeekVarInt62Length());
if (length != actual_length) {
ParseError("Parameter VarInt has length field mismatch");
return false;
}
if (!reader.ReadVarInt62(&result)) {
return false;
}
return true;
}
bool MoqtParser::ReadParameter(quic::QuicDataReader& reader, uint64_t& type,
absl::string_view& value) {
if (!reader.ReadVarInt62(&type)) {
return false;
}
return reader.ReadStringPieceVarInt62(&value);
}
bool MoqtParser::StringViewToVarInt(absl::string_view& sv, uint64_t& vi) {
quic::QuicDataReader reader(sv);
if (static_cast<size_t>(reader.PeekVarInt62Length()) != sv.length()) {
ParseError(MoqtError::kParameterLengthMismatch,
"Parameter length does not match varint encoding");
return false;
}
reader.ReadVarInt62(&vi);
return true;
}
} // namespace moqt