| // Copyright (c) 2023 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "quiche/quic/moqt/moqt_parser.h" |
| |
| #include <algorithm> |
| #include <array> |
| #include <cstddef> |
| #include <cstdint> |
| #include <cstring> |
| #include <optional> |
| #include <string> |
| |
| #include "absl/base/casts.h" |
| #include "absl/cleanup/cleanup.h" |
| #include "absl/container/fixed_array.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/types/span.h" |
| #include "quiche/quic/core/quic_data_reader.h" |
| #include "quiche/quic/core/quic_time.h" |
| #include "quiche/quic/core/quic_types.h" |
| #include "quiche/quic/moqt/moqt_messages.h" |
| #include "quiche/quic/moqt/moqt_priority.h" |
| #include "quiche/common/platform/api/quiche_bug_tracker.h" |
| #include "quiche/common/platform/api/quiche_logging.h" |
| #include "quiche/common/quiche_data_reader.h" |
| #include "quiche/common/quiche_stream.h" |
| |
| namespace moqt { |
| |
| namespace { |
| |
| bool ParseDeliveryOrder(uint8_t raw_value, |
| std::optional<MoqtDeliveryOrder>& output) { |
| switch (raw_value) { |
| case 0x00: |
| output = std::nullopt; |
| return true; |
| case 0x01: |
| output = MoqtDeliveryOrder::kAscending; |
| return true; |
| case 0x02: |
| output = MoqtDeliveryOrder::kDescending; |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| uint64_t SignedVarintUnserializedForm(uint64_t value) { |
| if (value & 0x01) { |
| return -(value >> 1); |
| } |
| return value >> 1; |
| } |
| |
| bool IsAllowedStreamType(uint64_t value) { |
| constexpr std::array kAllowedStreamTypes = { |
| MoqtDataStreamType::kStreamHeaderSubgroup, |
| MoqtDataStreamType::kStreamHeaderFetch, MoqtDataStreamType::kPadding}; |
| for (MoqtDataStreamType type : kAllowedStreamTypes) { |
| if (static_cast<uint64_t>(type) == value) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| std::optional<uint64_t> ReadVarInt62FromStream(quiche::ReadStream& stream, |
| bool& fin_read) { |
| fin_read = false; |
| |
| quiche::ReadStream::PeekResult peek_result = stream.PeekNextReadableRegion(); |
| if (peek_result.peeked_data.empty()) { |
| if (peek_result.fin_next) { |
| fin_read = stream.SkipBytes(0); |
| QUICHE_DCHECK(fin_read); |
| } |
| return std::nullopt; |
| } |
| char first_byte = peek_result.peeked_data[0]; |
| size_t varint_size = |
| 1 << ((absl::bit_cast<uint8_t>(first_byte) & 0b11000000) >> 6); |
| if (stream.ReadableBytes() < varint_size) { |
| return std::nullopt; |
| } |
| |
| char buffer[8]; |
| absl::Span<char> bytes_to_read = |
| absl::MakeSpan(buffer).subspan(0, varint_size); |
| quiche::ReadStream::ReadResult read_result = stream.Read(bytes_to_read); |
| QUICHE_DCHECK_EQ(read_result.bytes_read, varint_size); |
| fin_read = read_result.fin; |
| |
| quiche::QuicheDataReader reader(buffer, read_result.bytes_read); |
| uint64_t result; |
| bool success = reader.ReadVarInt62(&result); |
| QUICHE_DCHECK(success); |
| QUICHE_DCHECK(reader.IsDoneReading()); |
| return result; |
| } |
| |
| // Reads from |reader| to list. Returns false if there is a read error. |
| bool ParseKeyValuePairList(quic::QuicDataReader& reader, |
| KeyValuePairList& list) { |
| list.clear(); |
| uint64_t num_params; |
| if (!reader.ReadVarInt62(&num_params)) { |
| return false; |
| } |
| for (uint64_t i = 0; i < num_params; ++i) { |
| uint64_t type; |
| if (!reader.ReadVarInt62(&type)) { |
| return false; |
| } |
| if (type % 2 == 1) { |
| absl::string_view bytes; |
| if (!reader.ReadStringPieceVarInt62(&bytes)) { |
| return false; |
| } |
| list.insert(type, bytes); |
| continue; |
| } |
| uint64_t value; |
| if (!reader.ReadVarInt62(&value)) { |
| return false; |
| } |
| list.insert(type, value); |
| } |
| return true; |
| } |
| |
| void KeyValuePairListToMoqtSessionParameters(const KeyValuePairList& parameters, |
| MoqtSessionParameters& out) { |
| parameters.ForEach( |
| [&](uint64_t key, uint64_t value) { |
| SetupParameter parameter = static_cast<SetupParameter>(key); |
| switch (parameter) { |
| case SetupParameter::kMaxRequestId: |
| out.max_request_id = value; |
| break; |
| case SetupParameter::kMaxAuthTokenCacheSize: |
| out.max_auth_token_cache_size = value; |
| break; |
| case SetupParameter::kSupportObjectAcks: |
| out.support_object_acks = (value == 1); |
| break; |
| default: |
| break; |
| } |
| return true; |
| }, |
| [&](uint64_t key, absl::string_view value) { |
| SetupParameter parameter = static_cast<SetupParameter>(key); |
| switch (parameter) { |
| case SetupParameter::kPath: |
| out.path = value; |
| break; |
| default: |
| break; |
| } |
| return true; |
| }); |
| } |
| |
| } // namespace |
| |
| void MoqtControlParser::ReadAndDispatchMessages() { |
| if (no_more_data_) { |
| ParseError("Data after end of stream"); |
| return; |
| } |
| if (processing_) { |
| return; |
| } |
| processing_ = true; |
| auto on_return = absl::MakeCleanup([&] { processing_ = false; }); |
| while (!no_more_data_) { |
| bool fin_read = false; |
| // Read the message type. |
| if (!message_type_.has_value()) { |
| message_type_ = ReadVarInt62FromStream(stream_, fin_read); |
| if (fin_read) { |
| ParseError("FIN on control stream"); |
| return; |
| } |
| if (!message_type_.has_value()) { |
| return; |
| } |
| } |
| QUICHE_DCHECK(message_type_.has_value()); |
| |
| // Read the message length. |
| if (!message_size_.has_value()) { |
| if (stream_.ReadableBytes() < 2) { |
| return; |
| } |
| std::array<char, 2> size_bytes; |
| quiche::ReadStream::ReadResult result = |
| stream_.Read(absl::MakeSpan(size_bytes)); |
| if (result.bytes_read != 2) { |
| ParseError(MoqtError::kInternalError, |
| "Stream returned incorrect ReadableBytes"); |
| return; |
| } |
| if (result.fin) { |
| ParseError("FIN on control stream"); |
| return; |
| } |
| message_size_ = static_cast<uint16_t>(size_bytes[0]) << 8 | |
| static_cast<uint16_t>(size_bytes[1]); |
| if (*message_size_ > kMaxMessageHeaderSize) { |
| ParseError(MoqtError::kInternalError, |
| absl::StrCat("Cannot parse control messages more than ", |
| kMaxMessageHeaderSize, " bytes")); |
| return; |
| } |
| } |
| QUICHE_DCHECK(message_size_.has_value()); |
| |
| // Read the message if it's fully received. |
| // |
| // CAUTION: if the flow control windows are too low, and |
| // kMaxMessageHeaderSize is too high, this will cause a deadlock. |
| if (stream_.ReadableBytes() < *message_size_) { |
| return; |
| } |
| absl::FixedArray<char> message(*message_size_); |
| quiche::ReadStream::ReadResult result = |
| stream_.Read(absl::MakeSpan(message)); |
| if (result.bytes_read != *message_size_) { |
| ParseError("Stream returned incorrect ReadableBytes"); |
| return; |
| } |
| if (result.fin) { |
| ParseError("FIN on control stream"); |
| return; |
| } |
| |
| ProcessMessage(absl::string_view(message.data(), message.size()), |
| static_cast<MoqtMessageType>(*message_type_)); |
| message_type_.reset(); |
| message_size_.reset(); |
| } |
| } |
| |
| size_t MoqtControlParser::ProcessMessage(absl::string_view data, |
| MoqtMessageType message_type) { |
| quic::QuicDataReader reader(data); |
| size_t bytes_read; |
| switch (message_type) { |
| case MoqtMessageType::kClientSetup: |
| bytes_read = ProcessClientSetup(reader); |
| break; |
| case MoqtMessageType::kServerSetup: |
| bytes_read = ProcessServerSetup(reader); |
| break; |
| case MoqtMessageType::kSubscribe: |
| bytes_read = ProcessSubscribe(reader); |
| break; |
| case MoqtMessageType::kSubscribeOk: |
| bytes_read = ProcessSubscribeOk(reader); |
| break; |
| case MoqtMessageType::kSubscribeError: |
| bytes_read = ProcessSubscribeError(reader); |
| break; |
| case MoqtMessageType::kUnsubscribe: |
| bytes_read = ProcessUnsubscribe(reader); |
| break; |
| case MoqtMessageType::kSubscribeDone: |
| bytes_read = ProcessSubscribeDone(reader); |
| break; |
| case MoqtMessageType::kSubscribeUpdate: |
| bytes_read = ProcessSubscribeUpdate(reader); |
| break; |
| case MoqtMessageType::kAnnounce: |
| bytes_read = ProcessAnnounce(reader); |
| break; |
| case MoqtMessageType::kAnnounceOk: |
| bytes_read = ProcessAnnounceOk(reader); |
| break; |
| case MoqtMessageType::kAnnounceError: |
| bytes_read = ProcessAnnounceError(reader); |
| break; |
| case MoqtMessageType::kAnnounceCancel: |
| bytes_read = ProcessAnnounceCancel(reader); |
| break; |
| case MoqtMessageType::kTrackStatusRequest: |
| bytes_read = ProcessTrackStatusRequest(reader); |
| break; |
| case MoqtMessageType::kUnannounce: |
| bytes_read = ProcessUnannounce(reader); |
| break; |
| case MoqtMessageType::kTrackStatus: |
| bytes_read = ProcessTrackStatus(reader); |
| break; |
| case MoqtMessageType::kGoAway: |
| bytes_read = ProcessGoAway(reader); |
| break; |
| case MoqtMessageType::kSubscribeAnnounces: |
| bytes_read = ProcessSubscribeAnnounces(reader); |
| break; |
| case MoqtMessageType::kSubscribeAnnouncesOk: |
| bytes_read = ProcessSubscribeAnnouncesOk(reader); |
| break; |
| case MoqtMessageType::kSubscribeAnnouncesError: |
| bytes_read = ProcessSubscribeAnnouncesError(reader); |
| break; |
| case MoqtMessageType::kUnsubscribeAnnounces: |
| bytes_read = ProcessUnsubscribeAnnounces(reader); |
| break; |
| case MoqtMessageType::kMaxRequestId: |
| bytes_read = ProcessMaxRequestId(reader); |
| break; |
| case MoqtMessageType::kFetch: |
| bytes_read = ProcessFetch(reader); |
| break; |
| case MoqtMessageType::kFetchCancel: |
| bytes_read = ProcessFetchCancel(reader); |
| break; |
| case MoqtMessageType::kFetchOk: |
| bytes_read = ProcessFetchOk(reader); |
| break; |
| case MoqtMessageType::kFetchError: |
| bytes_read = ProcessFetchError(reader); |
| break; |
| case MoqtMessageType::kRequestsBlocked: |
| bytes_read = ProcessRequestsBlocked(reader); |
| break; |
| case moqt::MoqtMessageType::kObjectAck: |
| bytes_read = ProcessObjectAck(reader); |
| break; |
| default: |
| ParseError("Unknown message type"); |
| bytes_read = 0; |
| break; |
| } |
| if (bytes_read != data.size() || bytes_read == 0) { |
| ParseError("Message length does not match payload length"); |
| return 0; |
| } |
| return bytes_read; |
| } |
| |
| size_t MoqtControlParser::ProcessClientSetup(quic::QuicDataReader& reader) { |
| MoqtClientSetup setup; |
| setup.parameters.using_webtrans = uses_web_transport_; |
| setup.parameters.perspective = quic::Perspective::IS_CLIENT; |
| uint64_t number_of_supported_versions; |
| if (!reader.ReadVarInt62(&number_of_supported_versions)) { |
| return 0; |
| } |
| uint64_t version; |
| for (uint64_t i = 0; i < number_of_supported_versions; ++i) { |
| if (!reader.ReadVarInt62(&version)) { |
| return 0; |
| } |
| setup.supported_versions.push_back(static_cast<MoqtVersion>(version)); |
| } |
| KeyValuePairList parameters; |
| if (!ParseKeyValuePairList(reader, parameters)) { |
| return 0; |
| } |
| MoqtError error = ValidateSetupParameters(parameters, uses_web_transport_, |
| quic::Perspective::IS_SERVER); |
| if (error != MoqtError::kNoError) { |
| ParseError(error, "Client SETUP contains invalid parameters"); |
| return 0; |
| } |
| KeyValuePairListToMoqtSessionParameters(parameters, setup.parameters); |
| // TODO(martinduke): Validate construction of the PATH (Sec 8.3.2.1) |
| visitor_.OnClientSetupMessage(setup); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessServerSetup(quic::QuicDataReader& reader) { |
| MoqtServerSetup setup; |
| setup.parameters.using_webtrans = uses_web_transport_; |
| setup.parameters.perspective = quic::Perspective::IS_SERVER; |
| uint64_t version; |
| if (!reader.ReadVarInt62(&version)) { |
| return 0; |
| } |
| setup.selected_version = static_cast<MoqtVersion>(version); |
| KeyValuePairList parameters; |
| if (!ParseKeyValuePairList(reader, parameters)) { |
| return 0; |
| } |
| MoqtError error = ValidateSetupParameters(parameters, uses_web_transport_, |
| quic::Perspective::IS_CLIENT); |
| if (error != MoqtError::kNoError) { |
| ParseError(error, "Server SETUP contains invalid parameters"); |
| return 0; |
| } |
| KeyValuePairListToMoqtSessionParameters(parameters, setup.parameters); |
| visitor_.OnServerSetupMessage(setup); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessSubscribe(quic::QuicDataReader& reader) { |
| MoqtSubscribe subscribe; |
| uint64_t filter, group, object; |
| uint8_t group_order, forward; |
| absl::string_view track_name; |
| if (!reader.ReadVarInt62(&subscribe.request_id) || |
| !reader.ReadVarInt62(&subscribe.track_alias) || |
| !ReadTrackNamespace(reader, subscribe.full_track_name) || |
| !reader.ReadStringPieceVarInt62(&track_name) || |
| !reader.ReadUInt8(&subscribe.subscriber_priority) || |
| !reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&forward) || |
| !reader.ReadVarInt62(&filter)) { |
| return 0; |
| } |
| subscribe.full_track_name.AddElement(track_name); |
| if (!ParseDeliveryOrder(group_order, subscribe.group_order)) { |
| ParseError("Invalid group order value in SUBSCRIBE"); |
| return 0; |
| } |
| if (forward > 1) { |
| ParseError("Invalid forward value in SUBSCRIBE"); |
| return 0; |
| } |
| subscribe.forward = (forward == 1); |
| subscribe.filter_type = static_cast<MoqtFilterType>(filter); |
| switch (subscribe.filter_type) { |
| case MoqtFilterType::kNextGroupStart: |
| case MoqtFilterType::kLatestObject: |
| break; |
| case MoqtFilterType::kAbsoluteStart: |
| case MoqtFilterType::kAbsoluteRange: |
| if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) { |
| return 0; |
| } |
| subscribe.start = Location(group, object); |
| if (subscribe.filter_type == MoqtFilterType::kAbsoluteStart) { |
| break; |
| } |
| if (!reader.ReadVarInt62(&group)) { |
| return 0; |
| } |
| subscribe.end_group = group; |
| if (*subscribe.end_group < subscribe.start->group) { |
| ParseError("End group is less than start group"); |
| return 0; |
| } |
| break; |
| default: |
| ParseError("Invalid filter type"); |
| return 0; |
| } |
| KeyValuePairList parameters; |
| if (!ParseKeyValuePairList(reader, parameters)) { |
| return 0; |
| } |
| if (!ValidateVersionSpecificParameters(parameters, |
| MoqtMessageType::kSubscribe)) { |
| ParseError("SUBSCRIBE contains invalid parameters"); |
| return 0; |
| } |
| if (!KeyValuePairListToVersionSpecificParameters(parameters, |
| subscribe.parameters)) { |
| return 0; |
| } |
| visitor_.OnSubscribeMessage(subscribe); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessSubscribeOk(quic::QuicDataReader& reader) { |
| MoqtSubscribeOk subscribe_ok; |
| uint64_t milliseconds; |
| uint8_t group_order; |
| uint8_t content_exists; |
| if (!reader.ReadVarInt62(&subscribe_ok.subscribe_id) || |
| !reader.ReadVarInt62(&milliseconds) || !reader.ReadUInt8(&group_order) || |
| !reader.ReadUInt8(&content_exists)) { |
| return 0; |
| } |
| if (content_exists > 1) { |
| ParseError("SUBSCRIBE_OK ContentExists has invalid value"); |
| return 0; |
| } |
| if (group_order != 0x01 && group_order != 0x02) { |
| ParseError("Invalid group order value in SUBSCRIBE_OK"); |
| return 0; |
| } |
| subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds); |
| subscribe_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order); |
| if (content_exists) { |
| subscribe_ok.largest_id = Location(); |
| if (!reader.ReadVarInt62(&subscribe_ok.largest_id->group) || |
| !reader.ReadVarInt62(&subscribe_ok.largest_id->object)) { |
| return 0; |
| } |
| } |
| KeyValuePairList parameters; |
| if (!ParseKeyValuePairList(reader, parameters)) { |
| return 0; |
| } |
| if (!ValidateVersionSpecificParameters(parameters, |
| MoqtMessageType::kSubscribeOk)) { |
| ParseError("SUBSCRIBE_OK contains invalid parameters"); |
| return 0; |
| } |
| if (!KeyValuePairListToVersionSpecificParameters(parameters, |
| subscribe_ok.parameters)) { |
| return 0; |
| } |
| visitor_.OnSubscribeOkMessage(subscribe_ok); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessSubscribeError(quic::QuicDataReader& reader) { |
| MoqtSubscribeError subscribe_error; |
| uint64_t error_code; |
| if (!reader.ReadVarInt62(&subscribe_error.subscribe_id) || |
| !reader.ReadVarInt62(&error_code) || |
| !reader.ReadStringVarInt62(subscribe_error.reason_phrase) || |
| !reader.ReadVarInt62(&subscribe_error.track_alias)) { |
| return 0; |
| } |
| subscribe_error.error_code = static_cast<SubscribeErrorCode>(error_code); |
| visitor_.OnSubscribeErrorMessage(subscribe_error); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessUnsubscribe(quic::QuicDataReader& reader) { |
| MoqtUnsubscribe unsubscribe; |
| if (!reader.ReadVarInt62(&unsubscribe.subscribe_id)) { |
| return 0; |
| } |
| visitor_.OnUnsubscribeMessage(unsubscribe); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessSubscribeDone(quic::QuicDataReader& reader) { |
| MoqtSubscribeDone subscribe_done; |
| uint64_t value; |
| if (!reader.ReadVarInt62(&subscribe_done.subscribe_id) || |
| !reader.ReadVarInt62(&value) || |
| !reader.ReadVarInt62(&subscribe_done.stream_count) || |
| !reader.ReadStringVarInt62(subscribe_done.reason_phrase)) { |
| return 0; |
| } |
| subscribe_done.status_code = static_cast<SubscribeDoneCode>(value); |
| visitor_.OnSubscribeDoneMessage(subscribe_done); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessSubscribeUpdate(quic::QuicDataReader& reader) { |
| MoqtSubscribeUpdate subscribe_update; |
| uint64_t start_group, start_object, end_group; |
| if (!reader.ReadVarInt62(&subscribe_update.subscribe_id) || |
| !reader.ReadVarInt62(&start_group) || |
| !reader.ReadVarInt62(&start_object) || !reader.ReadVarInt62(&end_group) || |
| !reader.ReadUInt8(&subscribe_update.subscriber_priority)) { |
| return 0; |
| } |
| KeyValuePairList parameters; |
| if (!ParseKeyValuePairList(reader, parameters)) { |
| return 0; |
| } |
| if (!ValidateVersionSpecificParameters(parameters, |
| MoqtMessageType::kSubscribeUpdate)) { |
| ParseError("SUBSCRIBE_UPDATE contains invalid parameters"); |
| return 0; |
| } |
| if (!KeyValuePairListToVersionSpecificParameters( |
| parameters, subscribe_update.parameters)) { |
| return 0; |
| } |
| subscribe_update.start = Location(start_group, start_object); |
| if (end_group > 0) { |
| subscribe_update.end_group = end_group - 1; |
| if (subscribe_update.end_group < start_group) { |
| ParseError("End group is less than start group"); |
| return 0; |
| } |
| } |
| visitor_.OnSubscribeUpdateMessage(subscribe_update); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessAnnounce(quic::QuicDataReader& reader) { |
| MoqtAnnounce announce; |
| if (!ReadTrackNamespace(reader, announce.track_namespace)) { |
| return 0; |
| } |
| KeyValuePairList parameters; |
| if (!ParseKeyValuePairList(reader, parameters)) { |
| return 0; |
| } |
| if (!ValidateVersionSpecificParameters(parameters, |
| MoqtMessageType::kAnnounce)) { |
| ParseError("ANNOUNCE contains invalid parameters"); |
| return 0; |
| } |
| if (!KeyValuePairListToVersionSpecificParameters(parameters, |
| announce.parameters)) { |
| return 0; |
| } |
| visitor_.OnAnnounceMessage(announce); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessAnnounceOk(quic::QuicDataReader& reader) { |
| MoqtAnnounceOk announce_ok; |
| if (!ReadTrackNamespace(reader, announce_ok.track_namespace)) { |
| return 0; |
| } |
| visitor_.OnAnnounceOkMessage(announce_ok); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessAnnounceError(quic::QuicDataReader& reader) { |
| MoqtAnnounceError announce_error; |
| if (!ReadTrackNamespace(reader, announce_error.track_namespace)) { |
| return 0; |
| } |
| uint64_t error_code; |
| if (!reader.ReadVarInt62(&error_code) || |
| !reader.ReadStringVarInt62(announce_error.reason_phrase)) { |
| return 0; |
| } |
| announce_error.error_code = static_cast<SubscribeErrorCode>(error_code); |
| visitor_.OnAnnounceErrorMessage(announce_error); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessAnnounceCancel(quic::QuicDataReader& reader) { |
| MoqtAnnounceCancel announce_cancel; |
| if (!ReadTrackNamespace(reader, announce_cancel.track_namespace)) { |
| return 0; |
| } |
| uint64_t error_code; |
| if (!reader.ReadVarInt62(&error_code) || |
| !reader.ReadStringVarInt62(announce_cancel.reason_phrase)) { |
| return 0; |
| } |
| announce_cancel.error_code = static_cast<SubscribeErrorCode>(error_code); |
| visitor_.OnAnnounceCancelMessage(announce_cancel); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessTrackStatusRequest( |
| quic::QuicDataReader& reader) { |
| MoqtTrackStatusRequest track_status_request; |
| if (!ReadTrackNamespace(reader, track_status_request.full_track_name)) { |
| return 0; |
| } |
| absl::string_view name; |
| if (!reader.ReadStringPieceVarInt62(&name)) { |
| return 0; |
| } |
| track_status_request.full_track_name.AddElement(name); |
| KeyValuePairList parameters; |
| if (!ParseKeyValuePairList(reader, parameters)) { |
| return 0; |
| } |
| if (!ValidateVersionSpecificParameters( |
| parameters, MoqtMessageType::kTrackStatusRequest)) { |
| ParseError("TRACK_STATUS_REQUEST message contains invalid parameters"); |
| return 0; |
| } |
| if (!KeyValuePairListToVersionSpecificParameters( |
| parameters, track_status_request.parameters)) { |
| return 0; |
| } |
| visitor_.OnTrackStatusRequestMessage(track_status_request); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessUnannounce(quic::QuicDataReader& reader) { |
| MoqtUnannounce unannounce; |
| if (!ReadTrackNamespace(reader, unannounce.track_namespace)) { |
| return 0; |
| } |
| visitor_.OnUnannounceMessage(unannounce); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessTrackStatus(quic::QuicDataReader& reader) { |
| MoqtTrackStatus track_status; |
| if (!ReadTrackNamespace(reader, track_status.full_track_name)) { |
| return 0; |
| } |
| absl::string_view name; |
| if (!reader.ReadStringPieceVarInt62(&name)) { |
| return 0; |
| } |
| track_status.full_track_name.AddElement(name); |
| uint64_t value; |
| if (!reader.ReadVarInt62(&value) || |
| !reader.ReadVarInt62(&track_status.last_group) || |
| !reader.ReadVarInt62(&track_status.last_object)) { |
| return 0; |
| } |
| track_status.status_code = static_cast<MoqtTrackStatusCode>(value); |
| KeyValuePairList parameters; |
| if (!ParseKeyValuePairList(reader, parameters)) { |
| return 0; |
| } |
| if (!ValidateVersionSpecificParameters(parameters, |
| MoqtMessageType::kTrackStatus)) { |
| ParseError("TRACK_STATUS message contains invalid parameters"); |
| return 0; |
| } |
| if (!KeyValuePairListToVersionSpecificParameters(parameters, |
| track_status.parameters)) { |
| return 0; |
| } |
| visitor_.OnTrackStatusMessage(track_status); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessGoAway(quic::QuicDataReader& reader) { |
| MoqtGoAway goaway; |
| if (!reader.ReadStringVarInt62(goaway.new_session_uri)) { |
| return 0; |
| } |
| visitor_.OnGoAwayMessage(goaway); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessSubscribeAnnounces( |
| quic::QuicDataReader& reader) { |
| MoqtSubscribeAnnounces subscribe_announces; |
| if (!ReadTrackNamespace(reader, subscribe_announces.track_namespace)) { |
| return 0; |
| } |
| KeyValuePairList parameters; |
| if (!ParseKeyValuePairList(reader, parameters)) { |
| return 0; |
| } |
| if (!ValidateVersionSpecificParameters( |
| parameters, MoqtMessageType::kSubscribeAnnounces)) { |
| ParseError("SUBSCRIBE_ANNOUNCES message contains invalid parameters"); |
| return 0; |
| } |
| if (!KeyValuePairListToVersionSpecificParameters( |
| parameters, subscribe_announces.parameters)) { |
| return 0; |
| } |
| visitor_.OnSubscribeAnnouncesMessage(subscribe_announces); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessSubscribeAnnouncesOk( |
| quic::QuicDataReader& reader) { |
| MoqtSubscribeAnnouncesOk subscribe_namespace_ok; |
| if (!ReadTrackNamespace(reader, subscribe_namespace_ok.track_namespace)) { |
| return 0; |
| } |
| visitor_.OnSubscribeAnnouncesOkMessage(subscribe_namespace_ok); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessSubscribeAnnouncesError( |
| quic::QuicDataReader& reader) { |
| MoqtSubscribeAnnouncesError subscribe_namespace_error; |
| uint64_t error_code; |
| if (!ReadTrackNamespace(reader, subscribe_namespace_error.track_namespace) || |
| !reader.ReadVarInt62(&error_code) || |
| !reader.ReadStringVarInt62(subscribe_namespace_error.reason_phrase)) { |
| return 0; |
| } |
| subscribe_namespace_error.error_code = |
| static_cast<SubscribeErrorCode>(error_code); |
| visitor_.OnSubscribeAnnouncesErrorMessage(subscribe_namespace_error); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessUnsubscribeAnnounces( |
| quic::QuicDataReader& reader) { |
| MoqtUnsubscribeAnnounces unsubscribe_namespace; |
| if (!ReadTrackNamespace(reader, unsubscribe_namespace.track_namespace)) { |
| return 0; |
| } |
| visitor_.OnUnsubscribeAnnouncesMessage(unsubscribe_namespace); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessMaxRequestId(quic::QuicDataReader& reader) { |
| MoqtMaxRequestId max_request_id; |
| if (!reader.ReadVarInt62(&max_request_id.max_request_id)) { |
| return 0; |
| } |
| visitor_.OnMaxRequestIdMessage(max_request_id); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessFetch(quic::QuicDataReader& reader) { |
| MoqtFetch fetch; |
| absl::string_view track_name; |
| uint8_t group_order; |
| uint64_t end_object; |
| uint64_t type; |
| if (!reader.ReadVarInt62(&fetch.fetch_id) || |
| !reader.ReadUInt8(&fetch.subscriber_priority) || |
| !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&type)) { |
| return 0; |
| } |
| if (!ParseDeliveryOrder(group_order, fetch.group_order)) { |
| ParseError("Invalid group order value in FETCH message"); |
| return 0; |
| } |
| switch (static_cast<FetchType>(type)) { |
| case FetchType::kJoining: { |
| uint64_t joining_subscribe_id; |
| uint64_t preceding_group_offset; |
| if (!reader.ReadVarInt62(&joining_subscribe_id) || |
| !reader.ReadVarInt62(&preceding_group_offset)) { |
| return 0; |
| } |
| fetch.joining_fetch = |
| JoiningFetch{joining_subscribe_id, preceding_group_offset}; |
| break; |
| } |
| case FetchType::kStandalone: { |
| fetch.joining_fetch = std::nullopt; |
| if (!ReadTrackNamespace(reader, fetch.full_track_name) || |
| !reader.ReadStringPieceVarInt62(&track_name) || |
| !reader.ReadVarInt62(&fetch.start_object.group) || |
| !reader.ReadVarInt62(&fetch.start_object.object) || |
| !reader.ReadVarInt62(&fetch.end_group) || |
| !reader.ReadVarInt62(&end_object)) { |
| return 0; |
| } |
| // Elements that have to be translated from the literal value. |
| fetch.full_track_name.AddElement(track_name); |
| fetch.end_object = |
| end_object == 0 ? std::optional<uint64_t>() : (end_object - 1); |
| if (fetch.end_group < fetch.start_object.group || |
| (fetch.end_group == fetch.start_object.group && |
| fetch.end_object.has_value() && |
| *fetch.end_object < fetch.start_object.object)) { |
| ParseError("End object comes before start object in FETCH"); |
| return 0; |
| } |
| break; |
| } |
| default: |
| ParseError("Invalid FETCH type"); |
| return 0; |
| } |
| KeyValuePairList parameters; |
| if (!ParseKeyValuePairList(reader, parameters)) { |
| return 0; |
| } |
| if (!ValidateVersionSpecificParameters(parameters, MoqtMessageType::kFetch)) { |
| ParseError("FETCH message contains invalid parameters"); |
| return 0; |
| } |
| if (!KeyValuePairListToVersionSpecificParameters(parameters, |
| fetch.parameters)) { |
| return 0; |
| }; |
| visitor_.OnFetchMessage(fetch); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) { |
| MoqtFetchCancel fetch_cancel; |
| if (!reader.ReadVarInt62(&fetch_cancel.subscribe_id)) { |
| return 0; |
| } |
| visitor_.OnFetchCancelMessage(fetch_cancel); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) { |
| MoqtFetchOk fetch_ok; |
| uint8_t group_order; |
| KeyValuePairList parameters; |
| if (!reader.ReadVarInt62(&fetch_ok.subscribe_id) || |
| !reader.ReadUInt8(&group_order) || |
| !reader.ReadVarInt62(&fetch_ok.largest_id.group) || |
| !reader.ReadVarInt62(&fetch_ok.largest_id.object) || |
| !ParseKeyValuePairList(reader, parameters)) { |
| return 0; |
| } |
| if (group_order != 0x01 && group_order != 0x02) { |
| ParseError("Invalid group order value in FETCH_OK"); |
| return 0; |
| } |
| if (!ValidateVersionSpecificParameters(parameters, |
| MoqtMessageType::kFetchOk)) { |
| ParseError("FETCH_OK message contains invalid parameters"); |
| return 0; |
| } |
| fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order); |
| if (!KeyValuePairListToVersionSpecificParameters(parameters, |
| fetch_ok.parameters)) { |
| return 0; |
| } |
| visitor_.OnFetchOkMessage(fetch_ok); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessFetchError(quic::QuicDataReader& reader) { |
| MoqtFetchError fetch_error; |
| uint64_t error_code; |
| if (!reader.ReadVarInt62(&fetch_error.subscribe_id) || |
| !reader.ReadVarInt62(&error_code) || |
| !reader.ReadStringVarInt62(fetch_error.reason_phrase)) { |
| return 0; |
| } |
| fetch_error.error_code = static_cast<SubscribeErrorCode>(error_code); |
| visitor_.OnFetchErrorMessage(fetch_error); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessRequestsBlocked(quic::QuicDataReader& reader) { |
| MoqtRequestsBlocked requests_blocked; |
| if (!reader.ReadVarInt62(&requests_blocked.max_request_id)) { |
| return 0; |
| } |
| visitor_.OnRequestsBlockedMessage(requests_blocked); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| size_t MoqtControlParser::ProcessObjectAck(quic::QuicDataReader& reader) { |
| MoqtObjectAck object_ack; |
| uint64_t raw_delta; |
| if (!reader.ReadVarInt62(&object_ack.subscribe_id) || |
| !reader.ReadVarInt62(&object_ack.group_id) || |
| !reader.ReadVarInt62(&object_ack.object_id) || |
| !reader.ReadVarInt62(&raw_delta)) { |
| return 0; |
| } |
| object_ack.delta_from_deadline = quic::QuicTimeDelta::FromMicroseconds( |
| SignedVarintUnserializedForm(raw_delta)); |
| visitor_.OnObjectAckMessage(object_ack); |
| return reader.PreviouslyReadPayload().length(); |
| } |
| |
| void MoqtControlParser::ParseError(absl::string_view reason) { |
| ParseError(MoqtError::kProtocolViolation, reason); |
| } |
| |
| void MoqtControlParser::ParseError(MoqtError error_code, |
| absl::string_view reason) { |
| if (parsing_error_) { |
| return; // Don't send multiple parse errors. |
| } |
| no_more_data_ = true; |
| parsing_error_ = true; |
| visitor_.OnParsingError(error_code, reason); |
| } |
| |
| bool MoqtControlParser::ReadTrackNamespace(quic::QuicDataReader& reader, |
| FullTrackName& full_track_name) { |
| QUICHE_DCHECK(full_track_name.empty()); |
| uint64_t num_elements; |
| if (!reader.ReadVarInt62(&num_elements)) { |
| return false; |
| } |
| if (num_elements == 0 || num_elements > kMaxNamespaceElements) { |
| ParseError(MoqtError::kProtocolViolation, |
| "Invalid number of namespace elements"); |
| return false; |
| } |
| for (uint64_t i = 0; i < num_elements; ++i) { |
| absl::string_view element; |
| if (!reader.ReadStringPieceVarInt62(&element)) { |
| return false; |
| } |
| full_track_name.AddElement(element); |
| } |
| return true; |
| } |
| |
| // Returns false if there is a protocol violation. |
| bool MoqtControlParser::KeyValuePairListToVersionSpecificParameters( |
| const KeyValuePairList& parameters, VersionSpecificParameters& out) { |
| return parameters.ForEach( |
| [&](uint64_t key, uint64_t value) { |
| VersionSpecificParameter parameter = |
| static_cast<VersionSpecificParameter>(key); |
| switch (parameter) { |
| case VersionSpecificParameter::kDeliveryTimeout: |
| out.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(value); |
| break; |
| case VersionSpecificParameter::kMaxCacheDuration: |
| out.max_cache_duration = |
| quic::QuicTimeDelta::FromMilliseconds(value); |
| break; |
| case VersionSpecificParameter::kOackWindowSize: |
| out.oack_window_size = quic::QuicTimeDelta::FromMicroseconds(value); |
| break; |
| default: |
| break; |
| } |
| return true; |
| }, |
| [&](uint64_t key, absl::string_view value) { |
| VersionSpecificParameter parameter = |
| static_cast<VersionSpecificParameter>(key); |
| switch (parameter) { |
| case VersionSpecificParameter::kAuthorizationToken: |
| if (!ParseAuthTokenParameter(value, out)) { |
| return false; |
| } |
| break; |
| default: |
| break; |
| } |
| return true; |
| }); |
| } |
| |
| bool MoqtControlParser::ParseAuthTokenParameter( |
| absl::string_view field, VersionSpecificParameters& out) { |
| quic::QuicDataReader reader(field); |
| AuthTokenType token_type; |
| absl::string_view token; |
| uint64_t value; |
| if (!reader.ReadVarInt62(&value) || value > AuthTokenAliasType::kMaxValue) { |
| ParseError(MoqtError::kKeyValueFormattingError, |
| "Invalid Authorization Token Alias type"); |
| return false; |
| } |
| AuthTokenAliasType alias_type = static_cast<AuthTokenAliasType>(value); |
| switch (alias_type) { |
| case AuthTokenAliasType::kUseValue: |
| if (!reader.ReadVarInt62(&value)) { |
| ParseError(MoqtError::kKeyValueFormattingError, |
| "Malformed Authorization Token Parameter"); |
| return false; |
| } |
| if (value > AuthTokenType::kMaxAuthTokenType) { |
| ParseError(MoqtError::kKeyValueFormattingError, |
| "Invalid Authorization Token Type"); |
| return false; |
| } |
| token_type = static_cast<AuthTokenType>(value); |
| token = reader.PeekRemainingPayload(); |
| break; |
| case AuthTokenAliasType::kUseAlias: |
| if (!reader.ReadVarInt62(&value)) { |
| ParseError(MoqtError::kKeyValueFormattingError, |
| "Malformed Authorization Token Parameter"); |
| return false; |
| } |
| // TODO: Implement support for cache_size > 0 |
| ParseError(MoqtError::kKeyValueFormattingError, |
| "Unknown Auth Token Alias"); |
| return false; |
| case AuthTokenAliasType::kRegister: |
| if (!reader.ReadVarInt62(&value)) { |
| ParseError(MoqtError::kKeyValueFormattingError, |
| "Malformed Authorization Token Parameter"); |
| return false; |
| } |
| if (!reader.ReadVarInt62(&value)) { |
| ParseError(MoqtError::kKeyValueFormattingError, |
| "Malformed Authorization Token Parameter"); |
| return false; |
| } |
| token_type = static_cast<AuthTokenType>(value); |
| token = reader.PeekRemainingPayload(); |
| if (auth_token_cache_size_ + sizeof(uint64_t) + token.length() > |
| max_auth_token_cache_size_) { |
| ParseError(MoqtError::kAuthTokenCacheOverflow, |
| "Too many authorization token tags"); |
| return false; |
| } |
| break; |
| // TODO: Add to the cache. |
| // TODO: Check if the alias is already in use. |
| QUICHE_NOTREACHED(); |
| break; |
| case AuthTokenAliasType::kDelete: |
| if (!reader.ReadVarInt62(&value)) { |
| ParseError(MoqtError::kKeyValueFormattingError, |
| "Malformed Authorization Token Parameter"); |
| return false; |
| } |
| // TODO: Implement support for cache_size > 0 |
| ParseError(MoqtError::kKeyValueFormattingError, |
| "Unknown Auth Token Alias"); |
| return false; |
| } |
| // Validate cache operations. |
| out.authorization_token.push_back(AuthToken(token_type, token)); |
| return true; |
| } |
| |
| void MoqtDataParser::ParseError(absl::string_view reason) { |
| if (parsing_error_) { |
| return; // Don't send multiple parse errors. |
| } |
| next_input_ = kFailed; |
| no_more_data_ = true; |
| parsing_error_ = true; |
| visitor_.OnParsingError(MoqtError::kProtocolViolation, reason); |
| } |
| |
| std::optional<absl::string_view> ParseDatagram(absl::string_view data, |
| MoqtObject& object_metadata) { |
| uint64_t type_raw, object_status_raw; |
| absl::string_view extensions; |
| quic::QuicDataReader reader(data); |
| if (!reader.ReadVarInt62(&type_raw) || |
| !reader.ReadVarInt62(&object_metadata.track_alias) || |
| !reader.ReadVarInt62(&object_metadata.group_id) || |
| !reader.ReadVarInt62(&object_metadata.object_id) || |
| !reader.ReadUInt8(&object_metadata.publisher_priority) || |
| !reader.ReadStringPieceVarInt62(&extensions)) { |
| return std::nullopt; |
| } |
| object_metadata.extension_headers = std::string(extensions); |
| if (static_cast<MoqtDatagramType>(type_raw) == |
| MoqtDatagramType::kObjectStatus) { |
| object_metadata.payload_length = 0; |
| if (!reader.ReadVarInt62(&object_status_raw)) { |
| return std::nullopt; |
| } |
| object_metadata.object_status = IntegerToObjectStatus(object_status_raw); |
| return ""; |
| } |
| |
| absl::string_view payload; |
| if (!reader.ReadStringPieceVarInt62(&payload)) { |
| return std::nullopt; |
| } |
| object_metadata.object_status = MoqtObjectStatus::kNormal; |
| object_metadata.payload_length = payload.length(); |
| return payload; |
| } |
| |
| void MoqtDataParser::ReadDataUntil(StopCondition stop_condition) { |
| if (processing_) { |
| QUICHE_BUG(MoqtDataParser_reentry) |
| << "Calling ProcessData() when ProcessData() is already in progress."; |
| return; |
| } |
| processing_ = true; |
| auto on_return = absl::MakeCleanup([&] { processing_ = false; }); |
| |
| State last_state = state(); |
| for (;;) { |
| ParseNextItemFromStream(); |
| if (state() == last_state || no_more_data_ || stop_condition()) { |
| break; |
| } |
| last_state = state(); |
| } |
| } |
| |
| std::optional<uint64_t> MoqtDataParser::ReadVarInt62NoFin() { |
| bool fin_read = false; |
| std::optional<uint64_t> result = ReadVarInt62FromStream(stream_, fin_read); |
| if (fin_read) { |
| ParseError("Unexpected FIN received in the middle of a header"); |
| return std::nullopt; |
| } |
| return result; |
| } |
| |
| std::optional<uint8_t> MoqtDataParser::ReadUint8NoFin() { |
| char buffer[1]; |
| quiche::ReadStream::ReadResult read_result = |
| stream_.Read(absl::MakeSpan(buffer)); |
| if (read_result.fin) { |
| ParseError("Unexpected FIN received in the middle of a header"); |
| return std::nullopt; |
| } |
| if (read_result.bytes_read == 0) { |
| return std::nullopt; |
| } |
| return absl::bit_cast<uint8_t>(buffer[0]); |
| } |
| |
| void MoqtDataParser::AdvanceParserState() { |
| QUICHE_DCHECK(type_ == MoqtDataStreamType::kStreamHeaderSubgroup || |
| type_ == MoqtDataStreamType::kStreamHeaderFetch); |
| const bool is_fetch = type_ == MoqtDataStreamType::kStreamHeaderFetch; |
| switch (next_input_) { |
| // The state table is factored into a separate function (rather than |
| // inlined) in order to separate the order of elements from the way they are |
| // parsed. |
| case kStreamType: |
| next_input_ = kTrackAlias; |
| break; |
| case kTrackAlias: |
| next_input_ = kGroupId; |
| break; |
| case kGroupId: |
| next_input_ = kSubgroupId; |
| break; |
| case kSubgroupId: |
| next_input_ = is_fetch ? kObjectId : kPublisherPriority; |
| break; |
| case kPublisherPriority: |
| next_input_ = is_fetch ? kExtensionSize : kObjectId; |
| break; |
| case kObjectId: |
| next_input_ = is_fetch ? kPublisherPriority : kExtensionSize; |
| break; |
| case kExtensionBody: |
| next_input_ = kObjectPayloadLength; |
| break; |
| case kStatus: |
| case kData: |
| next_input_ = is_fetch ? kGroupId : kObjectId; |
| break; |
| |
| case kExtensionSize: // Either kExtensionBody or |
| // kObjectPayloadLength. |
| case kObjectPayloadLength: // Either kStatus or kData depending on length. |
| case kPadding: // Handled separately. |
| case kFailed: // Should cause parsing to cease. |
| QUICHE_NOTREACHED(); |
| break; |
| } |
| } |
| |
| void MoqtDataParser::ParseNextItemFromStream() { |
| if (CheckForFinWithoutData()) { |
| return; |
| } |
| switch (next_input_) { |
| case kStreamType: { |
| std::optional<uint64_t> value_read = ReadVarInt62NoFin(); |
| if (value_read.has_value()) { |
| if (!IsAllowedStreamType(*value_read)) { |
| ParseError("Invalid stream type supplied"); |
| return; |
| } |
| type_ = static_cast<MoqtDataStreamType>(*value_read); |
| switch (*type_) { |
| case MoqtDataStreamType::kStreamHeaderSubgroup: |
| case MoqtDataStreamType::kStreamHeaderFetch: |
| AdvanceParserState(); |
| break; |
| case MoqtDataStreamType::kPadding: |
| next_input_ = kPadding; |
| break; |
| } |
| } |
| return; |
| } |
| |
| case kTrackAlias: { |
| std::optional<uint64_t> value_read = ReadVarInt62NoFin(); |
| if (value_read.has_value()) { |
| metadata_.track_alias = *value_read; |
| AdvanceParserState(); |
| } |
| return; |
| } |
| |
| case kGroupId: { |
| std::optional<uint64_t> value_read = ReadVarInt62NoFin(); |
| if (value_read.has_value()) { |
| metadata_.group_id = *value_read; |
| AdvanceParserState(); |
| } |
| return; |
| } |
| |
| case kSubgroupId: { |
| std::optional<uint64_t> value_read = ReadVarInt62NoFin(); |
| if (value_read.has_value()) { |
| metadata_.subgroup_id = *value_read; |
| AdvanceParserState(); |
| } |
| return; |
| } |
| |
| case kPublisherPriority: { |
| std::optional<uint8_t> value_read = ReadUint8NoFin(); |
| if (value_read.has_value()) { |
| metadata_.publisher_priority = *value_read; |
| AdvanceParserState(); |
| } |
| return; |
| } |
| |
| case kObjectId: { |
| std::optional<uint64_t> value_read = ReadVarInt62NoFin(); |
| if (value_read.has_value()) { |
| metadata_.object_id = *value_read; |
| AdvanceParserState(); |
| } |
| return; |
| } |
| |
| case kExtensionSize: { |
| std::optional<uint64_t> value_read = ReadVarInt62NoFin(); |
| if (value_read.has_value()) { |
| metadata_.extension_headers.clear(); |
| payload_length_remaining_ = *value_read; |
| next_input_ = (value_read == 0) ? kObjectPayloadLength : kExtensionBody; |
| } |
| return; |
| } |
| |
| case kObjectPayloadLength: { |
| std::optional<uint64_t> value_read = ReadVarInt62NoFin(); |
| if (value_read.has_value()) { |
| metadata_.payload_length = *value_read; |
| payload_length_remaining_ = *value_read; |
| if (metadata_.payload_length > 0) { |
| metadata_.object_status = MoqtObjectStatus::kNormal; |
| next_input_ = kData; |
| } else { |
| next_input_ = kStatus; |
| } |
| } |
| return; |
| } |
| |
| case kStatus: { |
| bool fin_read = false; |
| std::optional<uint64_t> value_read = |
| ReadVarInt62FromStream(stream_, fin_read); |
| if (value_read.has_value()) { |
| metadata_.object_status = IntegerToObjectStatus(*value_read); |
| if (metadata_.object_status == MoqtObjectStatus::kInvalidObjectStatus) { |
| ParseError("Invalid object status provided"); |
| return; |
| } |
| |
| ++num_objects_read_; |
| visitor_.OnObjectMessage(metadata_, "", /*end_of_message=*/true); |
| AdvanceParserState(); |
| } |
| if (fin_read) { |
| no_more_data_ = true; |
| return; |
| } |
| return; |
| } |
| |
| case kExtensionBody: |
| case kData: { |
| while (payload_length_remaining_ > 0) { |
| quiche::ReadStream::PeekResult peek_result = |
| stream_.PeekNextReadableRegion(); |
| if (!peek_result.has_data()) { |
| return; |
| } |
| if (peek_result.fin_next && payload_length_remaining_ > 0) { |
| ParseError("FIN received at an unexpected point in the stream"); |
| return; |
| } |
| |
| size_t chunk_size = |
| std::min(payload_length_remaining_, peek_result.peeked_data.size()); |
| payload_length_remaining_ -= chunk_size; |
| bool done = payload_length_remaining_ == 0; |
| if (next_input_ == kData) { |
| visitor_.OnObjectMessage( |
| metadata_, peek_result.peeked_data.substr(0, chunk_size), done); |
| const bool fin = stream_.SkipBytes(chunk_size); |
| if (done) { |
| ++num_objects_read_; |
| no_more_data_ |= fin; |
| AdvanceParserState(); |
| } |
| } else { |
| absl::StrAppend(&metadata_.extension_headers, |
| peek_result.peeked_data.substr(0, chunk_size)); |
| if (stream_.SkipBytes(chunk_size)) { |
| ParseError("FIN received at an unexpected point in the stream"); |
| return; |
| } |
| if (done) { |
| AdvanceParserState(); |
| } |
| } |
| } |
| return; |
| } |
| |
| case kPadding: |
| no_more_data_ |= stream_.SkipBytes(stream_.ReadableBytes()); |
| return; |
| |
| case kFailed: |
| return; |
| } |
| } |
| |
| void MoqtDataParser::ReadAllData() { |
| ReadDataUntil(+[]() { return false; }); |
| } |
| |
| void MoqtDataParser::ReadStreamType() { |
| return ReadDataUntil([this]() { return type_.has_value(); }); |
| } |
| |
| void MoqtDataParser::ReadTrackAlias() { |
| return ReadDataUntil( |
| [this]() { return type_.has_value() && next_input_ != kTrackAlias; }); |
| } |
| |
| void MoqtDataParser::ReadAtMostOneObject() { |
| const size_t num_objects_read_initial = num_objects_read_; |
| return ReadDataUntil( |
| [&]() { return num_objects_read_ != num_objects_read_initial; }); |
| } |
| |
| bool MoqtDataParser::CheckForFinWithoutData() { |
| if (!stream_.PeekNextReadableRegion().fin_next) { |
| return false; |
| } |
| const bool valid_state = |
| (type_ == MoqtDataStreamType::kStreamHeaderSubgroup && |
| next_input_ == kObjectId) || |
| (type_ == MoqtDataStreamType::kStreamHeaderFetch && |
| next_input_ == kGroupId); |
| if (!valid_state || num_objects_read_ == 0) { |
| ParseError("FIN received at an unexpected point in the stream"); |
| return true; |
| } |
| return stream_.SkipBytes(0); |
| } |
| |
| } // namespace moqt |