blob: 3de44cf1b7b224e6d37e12faf0b88a0137f3e0de [file] [log] [blame]
// Copyright (c) 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_parser.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <string>
#include "absl/cleanup/cleanup.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "quiche/quic/core/quic_data_reader.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_endian.h"
namespace moqt {
namespace {
// Minus the type, length, and payload, an OBJECT consists of 4 Varints.
constexpr size_t kMaxObjectHeaderSize = 32;
} // namespace
// The buffering philosophy is complicated, to minimize copying. Here is an
// overview:
// If the message type is present, this is stored in message_type_. If part of
// the message type varint is partially present, that is buffered (requiring a
// copy).
// Same for message length.
// If the entire message body is present (except for OBJECT payload), it is
// parsed and delivered. If not, the partial body is buffered. (requiring a
// copy).
// Any OBJECT payload is always delivered to the application without copying.
// If something has been buffered, when more data arrives copy just enough of it
// to finish parsing that thing, then resume normal processing.
void MoqtParser::ProcessData(absl::string_view data, bool end_of_stream) {
if (no_more_data_) {
if (!data.empty() || !end_of_stream) {
ParseError("Data after end of stream");
}
return;
}
if (processing_) {
return;
}
processing_ = true;
auto on_return = absl::MakeCleanup([&] { processing_ = false; });
no_more_data_ = end_of_stream;
quic::QuicDataReader reader(data);
if (!MaybeMergeDataWithBuffer(reader, end_of_stream)) {
return;
}
if (end_of_stream && reader.IsDoneReading() && object_metadata_.has_value()) {
// A FIN arrives while delivering OBJECT payload.
visitor_.OnObjectMessage(object_metadata_.value(), data, true);
EndOfMessage();
}
while (!reader.IsDoneReading()) {
absl::optional<size_t> processed;
if (!GetMessageTypeAndLength(reader)) {
absl::StrAppend(&buffered_message_, reader.PeekRemainingPayload());
break;
}
// Cursor is at start of the message.
if (end_of_stream && NoMessageLength()) {
*message_length_ = reader.BytesRemaining();
}
if (*message_type_ != MoqtMessageType::kObject &&
*message_type_ != MoqtMessageType::kGoAway) {
// Parse OBJECT in case the message is very large. GOAWAY is length zero,
// so always process.
if (NoMessageLength()) {
// Can't parse it yet.
absl::StrAppend(&buffered_message_, reader.PeekRemainingPayload());
break;
}
if (*message_length_ > kMaxMessageHeaderSize) {
ParseError("Message too long");
return;
}
if (*message_length_ > reader.BytesRemaining()) {
// There definitely isn't enough to process the message.
absl::StrAppend(&buffered_message_, reader.PeekRemainingPayload());
break;
}
}
processed = ProcessMessage(FetchMessage(reader));
if (!processed.has_value()) {
if (*message_type_ == MoqtMessageType::kObject &&
(NoMessageLength() || reader.BytesRemaining() < *message_length_)) {
// The parser can attempt to process OBJECT before receiving the whole
// message length. If it doesn't parse the varints, it will buffer the
// message.
absl::StrAppend(&buffered_message_, reader.PeekRemainingPayload());
break;
}
// Non-OBJECT or OBJECT with the complete specified length, but the data
// was not parseable.
ParseError("Not able to parse message given specified length");
return;
}
if (*processed == *message_length_) {
EndOfMessage();
} else {
if (*message_type_ != MoqtMessageType::kObject) {
// Partial processing of non-OBJECT is not allowed.
ParseError("Specified message length too long");
return;
}
// This is a partially processed OBJECT payload.
if (!NoMessageLength()) {
*message_length_ -= *processed;
}
}
if (!reader.Seek(*processed)) {
QUICHE_DCHECK(false);
ParseError("Internal Error");
}
}
if (end_of_stream &&
(!buffered_message_.empty() || object_metadata_.has_value() ||
message_type_.has_value() || message_length_.has_value())) {
// If the stream is ending, there should be no message in progress.
ParseError("Incomplete message at end of stream");
}
}
bool MoqtParser::MaybeMergeDataWithBuffer(quic::QuicDataReader& reader,
bool end_of_stream) {
// Copy as much information as necessary from |data| to complete the
// message or OBJECT header. Minimize unnecessary copying!
if (buffered_message_.empty()) {
return true;
}
quic::QuicDataReader buffer(buffered_message_);
if (!message_length_.has_value()) {
// The buffer contains part of the message type or length.
if (buffer.BytesRemaining() > buffer.PeekVarInt62Length()) {
ParseError("Internal Error");
QUICHE_DCHECK(false);
return false;
}
size_t bytes_needed = buffer.PeekVarInt62Length() - buffer.BytesRemaining();
if (bytes_needed > reader.BytesRemaining()) {
// Not enough to complete!
absl::StrAppend(&buffered_message_, reader.PeekRemainingPayload());
return false;
}
absl::StrAppend(&buffered_message_,
reader.PeekRemainingPayload().substr(0, bytes_needed));
if (!reader.Seek(bytes_needed)) {
QUICHE_DCHECK(false);
ParseError("Internal Error");
return false;
}
quic::QuicDataReader new_buffer(buffered_message_);
uint64_t value;
if (!new_buffer.ReadVarInt62(&value)) {
QUICHE_DCHECK(false);
ParseError("Internal Error");
return false;
}
if (message_type_.has_value()) {
message_length_ = value;
} else {
message_type_ = static_cast<MoqtMessageType>(value);
}
// GOAWAY is special. Report the message as soon as the type and length
// are complete.
if (message_type_.has_value() && message_length_.has_value() &&
*message_type_ == MoqtMessageType::kGoAway) {
ProcessGoAway(new_buffer.PeekRemainingPayload());
EndOfMessage();
return false;
}
// Proceed to normal parsing.
buffered_message_.clear();
return true;
}
// It's a partially buffered message
if (NoMessageLength()) {
if (end_of_stream) {
message_length_ = buffer.BytesRemaining() + reader.BytesRemaining();
} else if (*message_type_ != MoqtMessageType::kObject) {
absl::StrAppend(&buffered_message_, reader.PeekRemainingPayload());
return false;
}
}
if (*message_type_ == MoqtMessageType::kObject) {
// OBJECT is a special case. Append up to KMaxObjectHeaderSize bytes to the
// buffer and see if that allows parsing.
QUICHE_DCHECK(!object_metadata_.has_value());
size_t original_buffer_size = buffer.BytesRemaining();
size_t bytes_to_pull = reader.BytesRemaining();
// No check for *message_length_ == 0 below! Mutants will complain if there
// is a check. If message_length_ < original_buffer_size, the second
// argument will be a very large unsigned integer, which will be irrelevant
// due to std::min.
bytes_to_pull = std::min(reader.BytesRemaining(),
*message_length_ - original_buffer_size);
// Mutants complains that the line below doesn't fail any tests. This is a
// performance optimization to avoid copying large amounts of object payload
// into the buffer when only the OBJECT header will be processed. There is
// no observable behavior change if this line is removed.
bytes_to_pull = std::min(bytes_to_pull, kMaxObjectHeaderSize);
absl::StrAppend(&buffered_message_,
reader.PeekRemainingPayload().substr(0, bytes_to_pull));
absl::optional<size_t> processed =
ProcessObjectVarints(absl::string_view(buffered_message_));
if (!processed.has_value()) {
if ((!NoMessageLength() &&
buffered_message_.length() == *message_length_) ||
buffered_message_.length() > kMaxObjectHeaderSize) {
ParseError("Not able to parse buffered message given specified length");
}
return false;
}
if (*processed > 0 && !reader.Seek(*processed - original_buffer_size)) {
ParseError("Internal Error");
return false;
}
if (*processed == *message_length_) {
// This covers an edge case where the peer has sent an OBJECT message with
// no content.
visitor_.OnObjectMessage(object_metadata_.value(), absl::string_view(),
true);
EndOfMessage();
return true;
}
if (!NoMessageLength()) {
*message_length_ -= *processed;
}
// Object payload is never processed in the buffer.
buffered_message_.clear();
return true;
}
size_t bytes_to_pull =
(buffer.BytesRemaining() + reader.BytesRemaining() < *message_length_)
? reader.BytesRemaining()
: *message_length_ - buffer.BytesRemaining();
absl::StrAppend(&buffered_message_,
reader.PeekRemainingPayload().substr(0, bytes_to_pull));
if (!reader.Seek(bytes_to_pull)) {
QUICHE_DCHECK(false);
ParseError("Internal Error");
return false;
}
if (buffered_message_.length() < *message_length_) {
// Not enough bytes present.
return false;
}
absl::optional<size_t> processed =
ProcessMessage(absl::string_view(buffered_message_));
if (!processed.has_value()) {
ParseError("Not able to parse buffered message given specified length");
return false;
}
if (*processed != *message_length_) {
ParseError("Buffered message length too long for message contents");
return false;
}
EndOfMessage();
return true;
}
absl::optional<size_t> MoqtParser::ProcessMessage(absl::string_view data) {
switch (*message_type_) {
case MoqtMessageType::kObject:
return ProcessObject(data);
case MoqtMessageType::kSetup:
return ProcessSetup(data);
case MoqtMessageType::kSubscribeRequest:
return ProcessSubscribeRequest(data);
case MoqtMessageType::kSubscribeOk:
return ProcessSubscribeOk(data);
case MoqtMessageType::kSubscribeError:
return ProcessSubscribeError(data);
case MoqtMessageType::kUnsubscribe:
return ProcessUnsubscribe(data);
case MoqtMessageType::kAnnounce:
return ProcessAnnounce(data);
case MoqtMessageType::kAnnounceOk:
return ProcessAnnounceOk(data);
case MoqtMessageType::kAnnounceError:
return ProcessAnnounceError(data);
case MoqtMessageType::kUnannounce:
return ProcessUnannounce(data);
case MoqtMessageType::kGoAway:
return ProcessGoAway(data);
default:
ParseError("Unknown message type");
return absl::nullopt;
}
}
absl::optional<size_t> MoqtParser::ProcessObjectVarints(
absl::string_view data) {
if (object_metadata_.has_value()) {
return 0;
}
object_metadata_ = MoqtObject();
quic::QuicDataReader reader(data);
if (reader.ReadVarInt62(&object_metadata_->track_id) &&
reader.ReadVarInt62(&object_metadata_->group_sequence) &&
reader.ReadVarInt62(&object_metadata_->object_sequence) &&
reader.ReadVarInt62(&object_metadata_->object_send_order)) {
return reader.PreviouslyReadPayload().length();
}
object_metadata_ = absl::nullopt;
QUICHE_DCHECK(reader.PreviouslyReadPayload().length() < kMaxObjectHeaderSize);
return absl::nullopt;
}
absl::optional<size_t> MoqtParser::ProcessObject(absl::string_view data) {
quic::QuicDataReader reader(data);
size_t payload_length = *message_length_;
absl::optional<size_t> processed = ProcessObjectVarints(data);
if (!processed.has_value() && !object_metadata_.has_value()) {
// Could not obtain the whole object header.
return absl::nullopt;
}
if (!reader.Seek(*processed)) {
ParseError("Internal Error");
return absl::nullopt;
}
if (payload_length != 0) {
payload_length -= *processed;
}
QUICHE_DCHECK(NoMessageLength() || reader.BytesRemaining() <= payload_length);
visitor_.OnObjectMessage(
object_metadata_.value(), reader.PeekRemainingPayload(),
reader.BytesRemaining() == payload_length && !NoMessageLength());
return data.length();
}
absl::optional<size_t> MoqtParser::ProcessSetup(absl::string_view data) {
MoqtSetup setup;
quic::QuicDataReader reader(data);
if (perspective_ == quic::Perspective::IS_SERVER) {
if (!reader.ReadVarInt62(&setup.number_of_supported_versions)) {
return absl::nullopt;
}
} else {
setup.number_of_supported_versions = 1;
}
uint64_t value;
for (uint64_t i = 0; i < setup.number_of_supported_versions; ++i) {
if (!reader.ReadVarInt62(&value)) {
return absl::nullopt;
}
setup.supported_versions.push_back(value);
}
// Parse parameters
while (!reader.IsDoneReading()) {
if (!reader.ReadVarInt62(&value)) {
return absl::nullopt;
}
auto parameter_key = static_cast<MoqtSetupParameter>(value);
absl::string_view field;
switch (parameter_key) {
case MoqtSetupParameter::kRole:
if (setup.role.has_value()) {
ParseError("ROLE parameter appears twice in SETUP");
return absl::nullopt;
}
if (perspective_ == quic::Perspective::IS_CLIENT) {
ParseError("ROLE parameter sent by server in SETUP");
return absl::nullopt;
}
if (!ReadIntegerPieceVarInt62(reader, value)) {
return absl::nullopt;
}
setup.role = static_cast<MoqtRole>(value);
break;
case MoqtSetupParameter::kPath:
if (uses_web_transport_) {
ParseError(
"WebTransport connection is using PATH parameter in SETUP");
return absl::nullopt;
}
if (perspective_ == quic::Perspective::IS_CLIENT) {
ParseError("PATH parameter sent by server in SETUP");
return absl::nullopt;
}
if (setup.path.has_value()) {
ParseError("PATH parameter appears twice in SETUP");
return absl::nullopt;
}
if (!reader.ReadStringPieceVarInt62(&field)) {
return absl::nullopt;
}
setup.path = field;
break;
default:
// Skip over the parameter.
if (!reader.ReadStringPieceVarInt62(&field)) {
return absl::nullopt;
}
break;
}
}
if (perspective_ == quic::Perspective::IS_SERVER) {
if (!setup.role.has_value()) {
ParseError("ROLE SETUP parameter missing from Client message");
return absl::nullopt;
}
if (!uses_web_transport_ && !setup.path.has_value()) {
ParseError("PATH SETUP parameter missing from Client message over QUIC");
return absl::nullopt;
}
}
visitor_.OnSetupMessage(setup);
return reader.PreviouslyReadPayload().length();
}
absl::optional<size_t> MoqtParser::ProcessSubscribeRequest(
absl::string_view data) {
MoqtSubscribeRequest subscribe_request;
quic::QuicDataReader reader(data);
absl::string_view field;
if (!reader.ReadStringPieceVarInt62(&subscribe_request.full_track_name)) {
return absl::nullopt;
}
uint64_t value;
while (!reader.IsDoneReading()) {
if (!reader.ReadVarInt62(&value)) {
return absl::nullopt;
}
auto parameter_key = static_cast<MoqtTrackRequestParameter>(value);
switch (parameter_key) {
case MoqtTrackRequestParameter::kGroupSequence:
if (subscribe_request.group_sequence.has_value()) {
ParseError(
"GROUP_SEQUENCE parameter appears twice in SUBSCRIBE_REQUEST");
return absl::nullopt;
}
if (!ReadIntegerPieceVarInt62(reader, value)) {
return absl::nullopt;
}
subscribe_request.group_sequence = value;
break;
case MoqtTrackRequestParameter::kObjectSequence:
if (subscribe_request.object_sequence.has_value()) {
ParseError(
"OBJECT_SEQUENCE parameter appears twice in SUBSCRIBE_REQUEST");
return absl::nullopt;
}
if (!ReadIntegerPieceVarInt62(reader, value)) {
return absl::nullopt;
}
subscribe_request.object_sequence = value;
break;
case MoqtTrackRequestParameter::kAuthorizationInfo:
if (subscribe_request.authorization_info.has_value()) {
ParseError(
"AUTHORIZATION_INFO parameter appears twice in "
"SUBSCRIBE_REQUEST");
return absl::nullopt;
}
if (!reader.ReadStringPieceVarInt62(&field)) {
return absl::nullopt;
}
subscribe_request.authorization_info = field;
break;
default:
// Skip over the parameter.
if (!reader.ReadStringPieceVarInt62(&field)) {
return absl::nullopt;
}
break;
}
}
if (reader.IsDoneReading()) {
visitor_.OnSubscribeRequestMessage(subscribe_request);
}
return reader.PreviouslyReadPayload().length();
}
absl::optional<size_t> MoqtParser::ProcessSubscribeOk(absl::string_view data) {
MoqtSubscribeOk subscribe_ok;
quic::QuicDataReader reader(data);
if (!reader.ReadStringPieceVarInt62(&subscribe_ok.full_track_name)) {
return absl::nullopt;
}
if (!reader.ReadVarInt62(&subscribe_ok.track_id)) {
return absl::nullopt;
}
uint64_t milliseconds;
if (!reader.ReadVarInt62(&milliseconds)) {
return absl::nullopt;
}
subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds);
if (reader.IsDoneReading()) {
visitor_.OnSubscribeOkMessage(subscribe_ok);
}
return reader.PreviouslyReadPayload().length();
}
absl::optional<size_t> MoqtParser::ProcessSubscribeError(
absl::string_view data) {
MoqtSubscribeError subscribe_error;
quic::QuicDataReader reader(data);
if (!reader.ReadStringPieceVarInt62(&subscribe_error.full_track_name)) {
return absl::nullopt;
}
if (!reader.ReadVarInt62(&subscribe_error.error_code)) {
return absl::nullopt;
}
if (!reader.ReadStringPieceVarInt62(&subscribe_error.reason_phrase)) {
return absl::nullopt;
}
if (reader.IsDoneReading()) {
visitor_.OnSubscribeErrorMessage(subscribe_error);
}
return reader.PreviouslyReadPayload().length();
}
absl::optional<size_t> MoqtParser::ProcessUnsubscribe(absl::string_view data) {
MoqtUnsubscribe unsubscribe;
quic::QuicDataReader reader(data);
if (!reader.ReadStringPieceVarInt62(&unsubscribe.full_track_name)) {
return absl::nullopt;
}
if (reader.IsDoneReading()) {
visitor_.OnUnsubscribeMessage(unsubscribe);
}
return reader.PreviouslyReadPayload().length();
}
absl::optional<size_t> MoqtParser::ProcessAnnounce(absl::string_view data) {
MoqtAnnounce announce;
quic::QuicDataReader reader(data);
absl::string_view field;
if (!reader.ReadStringPieceVarInt62(&field)) {
return absl::nullopt;
}
announce.track_namespace = field;
bool saw_group_sequence = false, saw_object_sequence = false;
while (!reader.IsDoneReading()) {
uint64_t value;
if (!reader.ReadVarInt62(&value)) {
return absl::nullopt;
}
auto parameter_key = static_cast<MoqtTrackRequestParameter>(value);
switch (parameter_key) {
case MoqtTrackRequestParameter::kGroupSequence:
// Not used, but check for duplicates.
if (saw_group_sequence) {
ParseError("GROUP_SEQUENCE parameter appears twice in ANNOUNCE");
return absl::nullopt;
}
if (!reader.ReadStringPieceVarInt62(&field)) {
return absl::nullopt;
}
saw_group_sequence = true;
break;
case MoqtTrackRequestParameter::kObjectSequence:
// Not used, but check for duplicates.
if (saw_object_sequence) {
ParseError("OBJECT_SEQUENCE parameter appears twice in ANNOUNCE");
return absl::nullopt;
}
if (!reader.ReadStringPieceVarInt62(&field)) {
return absl::nullopt;
}
saw_object_sequence = true;
break;
case MoqtTrackRequestParameter::kAuthorizationInfo:
if (announce.authorization_info.has_value()) {
ParseError("AUTHORIZATION_INFO parameter appears twice in ANNOUNCE");
return absl::nullopt;
}
if (!reader.ReadStringPieceVarInt62(&field)) {
return absl::nullopt;
}
announce.authorization_info = field;
break;
default:
// Skip over the parameter.
if (!reader.ReadStringPieceVarInt62(&field)) {
return absl::nullopt;
}
break;
}
}
if (reader.IsDoneReading()) {
visitor_.OnAnnounceMessage(announce);
}
return reader.PreviouslyReadPayload().length();
}
absl::optional<size_t> MoqtParser::ProcessAnnounceOk(absl::string_view data) {
MoqtAnnounceOk announce_ok;
quic::QuicDataReader reader(data);
if (!reader.ReadStringPieceVarInt62(&announce_ok.track_namespace)) {
return absl::nullopt;
}
if (reader.IsDoneReading()) {
visitor_.OnAnnounceOkMessage(announce_ok);
}
return reader.PreviouslyReadPayload().length();
}
absl::optional<size_t> MoqtParser::ProcessAnnounceError(
absl::string_view data) {
MoqtAnnounceError announce_error;
quic::QuicDataReader reader(data);
if (!reader.ReadStringPieceVarInt62(&announce_error.track_namespace)) {
return absl::nullopt;
}
if (!reader.ReadVarInt62(&announce_error.error_code)) {
return absl::nullopt;
}
if (!reader.ReadStringPieceVarInt62(&announce_error.reason_phrase)) {
return absl::nullopt;
}
if (reader.IsDoneReading()) {
visitor_.OnAnnounceErrorMessage(announce_error);
}
return reader.PreviouslyReadPayload().length();
}
absl::optional<size_t> MoqtParser::ProcessUnannounce(absl::string_view data) {
MoqtUnannounce unannounce;
quic::QuicDataReader reader(data);
if (!reader.ReadStringPieceVarInt62(&unannounce.track_namespace)) {
return absl::nullopt;
}
if (reader.IsDoneReading()) {
visitor_.OnUnannounceMessage(unannounce);
}
return reader.PreviouslyReadPayload().length();
}
absl::optional<size_t> MoqtParser::ProcessGoAway(absl::string_view data) {
if (!data.empty()) {
// GOAWAY can only be followed by end_of_stream. Anything else is an error.
ParseError("GOAWAY has data following");
return absl::nullopt;
}
visitor_.OnGoAwayMessage();
return 0;
}
bool MoqtParser::GetMessageTypeAndLength(quic::QuicDataReader& reader) {
if (!message_type_.has_value()) {
uint64_t value;
if (!reader.ReadVarInt62(&value)) {
return false;
}
message_type_ = static_cast<MoqtMessageType>(value);
}
if (!message_length_.has_value()) {
uint64_t value;
if (!reader.ReadVarInt62(&value)) {
return false;
}
message_length_ = value;
}
return true;
}
void MoqtParser::EndOfMessage() {
buffered_message_.clear();
message_type_ = absl::nullopt;
message_length_ = absl::nullopt;
object_metadata_ = absl::nullopt;
}
absl::string_view MoqtParser::FetchMessage(quic::QuicDataReader& reader) {
if (message_length_ == 0) {
return reader.PeekRemainingPayload();
}
if (message_length_ > reader.BytesRemaining()) {
QUICHE_DCHECK(message_type_ == MoqtMessageType::kObject);
return reader.PeekRemainingPayload();
}
return reader.PeekRemainingPayload().substr(0, *message_length_);
}
void MoqtParser::ParseError(absl::string_view reason) {
if (parsing_error_) {
return; // Don't send multiple parse errors.
}
no_more_data_ = true;
parsing_error_ = true;
visitor_.OnParsingError(reason);
}
bool MoqtParser::ReadIntegerPieceVarInt62(quic::QuicDataReader& reader,
uint64_t& result) {
absl::string_view field;
if (!reader.ReadStringPieceVarInt62(&field)) {
return false;
}
if (field.size() > sizeof(uint64_t)) {
ParseError("Cannot parse explicit length integers longer than 8 bytes");
return false;
}
result = 0;
memcpy((uint8_t*)&result + sizeof(result) - field.size(), field.data(),
field.size());
result = quiche::QuicheEndian::NetToHost64(result);
return true;
}
} // namespace moqt