| // Copyright (c) 2023 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| // A parser for draft-ietf-moq-transport. |
| // TODO(vasilvv): possibly split this header into two. |
| |
| #ifndef QUICHE_QUIC_MOQT_MOQT_PARSER_H_ |
| #define QUICHE_QUIC_MOQT_MOQT_PARSER_H_ |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <optional> |
| #include <string> |
| |
| #include "absl/base/nullability.h" |
| #include "absl/cleanup/cleanup.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/types/span.h" |
| #include "quiche/quic/core/quic_data_reader.h" |
| #include "quiche/quic/moqt/moqt_error.h" |
| #include "quiche/quic/moqt/moqt_key_value_pair.h" |
| #include "quiche/quic/moqt/moqt_messages.h" |
| #include "quiche/quic/moqt/moqt_names.h" |
| #include "quiche/quic/moqt/moqt_priority.h" |
| #include "quiche/common/platform/api/quiche_export.h" |
| #include "quiche/common/quiche_callbacks.h" |
| #include "quiche/common/quiche_status_utils.h" |
| #include "quiche/web_transport/web_transport.h" |
| |
| namespace moqt { |
| |
| namespace test { |
| class MoqtDataParserPeer; |
| } |
| |
| // MoqtRawControlMessage represents an MOQT control message that has been |
| // unframed from the control stream, but not parsed yet. |
| struct MoqtRawControlMessage { |
| MoqtMessageType type; |
| std::string payload; |
| }; |
| |
| class MoqtDataParserVisitor { |
| public: |
| virtual ~MoqtDataParserVisitor() = default; |
| |
| // If |end_of_message| is true, |payload| contains the last bytes of the |
| // OBJECT payload. If not, there will be subsequent calls with further payload |
| // data. The parser retains ownership of |message| and |payload|, so the |
| // visitor needs to copy anything it wants to retain. |
| // If `message.object_status` == `kNormal`, the status must not be used until |
| // `end_of_message` is true, since a FIN can change the status. |
| virtual void OnObjectMessage(const MoqtObject& message, |
| absl::string_view payload, |
| bool end_of_message) = 0; |
| virtual void OnFin() = 0; |
| |
| virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0; |
| }; |
| |
| // MoqtControlStreamParser unframes MoQT control messages from the control |
| // stream without parsing the payload. |
| class QUICHE_EXPORT MoqtControlStreamParser { |
| public: |
| explicit MoqtControlStreamParser(webtransport::Stream* absl_nonnull stream) |
| : stream_(*stream) {} |
| |
| // MoqtControlStreamParser is not movable, since reading from the same stream |
| // through two different parsers would corrupt the state. |
| MoqtControlStreamParser(const MoqtControlStreamParser&) = delete; |
| MoqtControlStreamParser(MoqtControlStreamParser&& other) = delete; |
| MoqtControlStreamParser& operator=(const MoqtControlStreamParser&) = delete; |
| MoqtControlStreamParser& operator=(MoqtControlStreamParser&&) = delete; |
| |
| // Reads the next available message on the stream. Returns kUnavailable |
| // status if no complete message can be read; if FIN is read, `fin_read` will |
| // be set to true. |
| absl::StatusOr<MoqtRawControlMessage> ReadNextMessage(); |
| // Reads the type of the first message on the stream. |
| absl::StatusOr<MoqtMessageType> ReadFirstMessageType(); |
| |
| bool fin_read() const { return fin_read_; } |
| webtransport::Stream* stream() const { return &stream_; } |
| |
| // Initially, MoqtControlStreamParser does not allow a control stream to have |
| // a FIN. Once the type of the stream is established, that restriction can be |
| // lifted. |
| bool allow_fin() const { return allow_fin_; } |
| void set_allow_fin(bool allow_fin) { allow_fin_ = allow_fin; } |
| |
| private: |
| absl::StatusOr<MoqtRawControlMessage> ReadNextMessageInner(); |
| // Reads the message type from the stream. |
| absl::Status ReadMessageType(); |
| |
| webtransport::Stream& stream_; |
| std::optional<uint64_t> first_message_type_; |
| std::optional<uint64_t> current_message_type_; |
| std::optional<absl::Span<char>> current_message_remaining_; |
| std::string current_message_; |
| bool allow_fin_ = false; |
| bool error_encountered_ = false; |
| bool fin_read_ = false; |
| }; |
| |
| // MoqtControlMessageParser parses MOQT control messages. The parsing is |
| // stateless; the object itself only carries the context (protocol version and |
| // parameters) required to parse messages. |
| class MoqtControlMessageParser { |
| public: |
| // `moqt_version` is not currently used, as we only support one version. |
| MoqtControlMessageParser(absl::string_view /*moqt_version*/, |
| bool uses_web_transport) |
| : uses_web_transport_(uses_web_transport) {} |
| |
| // Parsers for individual messages. |
| absl::StatusOr<MoqtClientSetup> ProcessClientSetup( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtServerSetup> ProcessServerSetup( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtRequestOk> ProcessRequestOk(absl::string_view data) const; |
| absl::StatusOr<MoqtRequestError> ProcessRequestError( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtSubscribe> ProcessSubscribe(absl::string_view data) const; |
| absl::StatusOr<MoqtSubscribeOk> ProcessSubscribeOk( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtUnsubscribe> ProcessUnsubscribe( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtPublishDone> ProcessPublishDone( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtRequestUpdate> ProcessRequestUpdate( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtPublishNamespace> ProcessPublishNamespace( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtPublishNamespaceDone> ProcessPublishNamespaceDone( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtNamespace> ProcessNamespace(absl::string_view data) const; |
| absl::StatusOr<MoqtNamespaceDone> ProcessNamespaceDone( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtPublishNamespaceCancel> ProcessPublishNamespaceCancel( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtTrackStatus> ProcessTrackStatus( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtGoAway> ProcessGoAway(absl::string_view data) const; |
| absl::StatusOr<MoqtSubscribeNamespace> ProcessSubscribeNamespace( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtMaxRequestId> ProcessMaxRequestId( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtFetch> ProcessFetch(absl::string_view data) const; |
| absl::StatusOr<MoqtFetchCancel> ProcessFetchCancel( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtFetchOk> ProcessFetchOk(absl::string_view data) const; |
| absl::StatusOr<MoqtRequestsBlocked> ProcessRequestsBlocked( |
| absl::string_view data) const; |
| absl::StatusOr<MoqtPublish> ProcessPublish(absl::string_view data) const; |
| absl::StatusOr<MoqtObjectAck> ProcessObjectAck(absl::string_view data) const; |
| |
| // Parse a raw message and call a callback on it if successful. |
| // Example usage: |
| // |
| // parser_.ParseMessage(message, [] (const auto& message) { |
| // QUICHE_LOG(INFO) << "Received message: " << message; |
| // return absl::OkStatus(); |
| // }); |
| template <typename F> |
| absl::Status ParseMessage(const MoqtRawControlMessage& message, |
| const F& callback) const { |
| const auto parse = [&](auto parse_method) -> absl::Status { |
| auto parsed_message = (this->*parse_method)(message.payload); |
| QUICHE_RETURN_IF_ERROR(parsed_message.status()); |
| return callback(*std::move(parsed_message)); |
| }; |
| switch (message.type) { |
| case MoqtMessageType::kClientSetup: |
| return parse(&MoqtControlMessageParser::ProcessClientSetup); |
| case MoqtMessageType::kServerSetup: |
| return parse(&MoqtControlMessageParser::ProcessServerSetup); |
| case MoqtMessageType::kRequestOk: |
| return parse(&MoqtControlMessageParser::ProcessRequestOk); |
| case MoqtMessageType::kRequestError: |
| return parse(&MoqtControlMessageParser::ProcessRequestError); |
| case MoqtMessageType::kSubscribe: |
| return parse(&MoqtControlMessageParser::ProcessSubscribe); |
| case MoqtMessageType::kSubscribeOk: |
| return parse(&MoqtControlMessageParser::ProcessSubscribeOk); |
| case MoqtMessageType::kUnsubscribe: |
| return parse(&MoqtControlMessageParser::ProcessUnsubscribe); |
| case MoqtMessageType::kPublishDone: |
| return parse(&MoqtControlMessageParser::ProcessPublishDone); |
| case MoqtMessageType::kRequestUpdate: |
| return parse(&MoqtControlMessageParser::ProcessRequestUpdate); |
| case MoqtMessageType::kPublishNamespace: |
| return parse(&MoqtControlMessageParser::ProcessPublishNamespace); |
| case MoqtMessageType::kPublishNamespaceDone: |
| return parse(&MoqtControlMessageParser::ProcessPublishNamespaceDone); |
| case MoqtMessageType::kNamespace: |
| return parse(&MoqtControlMessageParser::ProcessNamespace); |
| case MoqtMessageType::kNamespaceDone: |
| return parse(&MoqtControlMessageParser::ProcessNamespaceDone); |
| case MoqtMessageType::kPublishNamespaceCancel: |
| return parse(&MoqtControlMessageParser::ProcessPublishNamespaceCancel); |
| case MoqtMessageType::kTrackStatus: |
| return parse(&MoqtControlMessageParser::ProcessTrackStatus); |
| case MoqtMessageType::kGoAway: |
| return parse(&MoqtControlMessageParser::ProcessGoAway); |
| case MoqtMessageType::kSubscribeNamespace: |
| return parse(&MoqtControlMessageParser::ProcessSubscribeNamespace); |
| case MoqtMessageType::kMaxRequestId: |
| return parse(&MoqtControlMessageParser::ProcessMaxRequestId); |
| case MoqtMessageType::kFetch: |
| return parse(&MoqtControlMessageParser::ProcessFetch); |
| case MoqtMessageType::kFetchCancel: |
| return parse(&MoqtControlMessageParser::ProcessFetchCancel); |
| case MoqtMessageType::kFetchOk: |
| return parse(&MoqtControlMessageParser::ProcessFetchOk); |
| case MoqtMessageType::kRequestsBlocked: |
| return parse(&MoqtControlMessageParser::ProcessRequestsBlocked); |
| case MoqtMessageType::kPublish: |
| return parse(&MoqtControlMessageParser::ProcessPublish); |
| case MoqtMessageType::kObjectAck: |
| return parse(&MoqtControlMessageParser::ProcessObjectAck); |
| default: |
| return absl::InvalidArgumentError( |
| absl::StrCat("Unknown control message type 0x", |
| absl::Hex(static_cast<uint64_t>(message.type)))); |
| } |
| } |
| |
| private: |
| // Reads a TrackNamespace from the reader. Returns false if the namespace is |
| // too large. Sets a ParseError if the namespace is malformed. |
| absl::Status ReadTrackNamespace(quic::QuicDataReader& reader, |
| TrackNamespace& track_namespace) const; |
| // Reads a FullTrackName from the reader. Returns false if the name is too |
| // large. Sets a ParseError if the name is malformed. |
| absl::Status ReadFullTrackName(quic::QuicDataReader& reader, |
| FullTrackName& full_track_name) const; |
| absl::Status FillAndValidateSetupParameters( |
| const KeyValuePairList& in, SetupParameters& out, |
| MoqtMessageType message_type) const; |
| // |reader| points to the beginning of a KeyValuePairList. Returns false if |
| // there is any sort of error. (The function calls ParseError(), so the |
| // caller has no need to do so.) |
| absl::Status FillAndValidateMessageParameters(quic::QuicDataReader& reader, |
| MessageParameters& out) const; |
| |
| bool uses_web_transport_; |
| }; |
| |
| // Parses an MoQT datagram. Returns the payload bytes, or std::nullopt on error. |
| // The caller provides the whole datagram in `data`. The function puts the |
| // object metadata in `object_metadata`. |
| // If |use_default_priority| returns true, there was no reported |
| // publisher_priority and the caller should use the default for the SUBSCRIBE. |
| std::optional<absl::string_view> ParseDatagram(absl::string_view data, |
| MoqtObject& object_metadata, |
| bool& use_default_priority); |
| |
| // Parser for MoQT unidirectional data stream. |
| class QUICHE_EXPORT MoqtDataParser { |
| public: |
| // `stream` must outlive the parser. The parser does not configure itself as |
| // a listener for the read events of the stream; it is responsibility of the |
| // caller to do so via one of the read methods below. |
| explicit MoqtDataParser(webtransport::Stream* stream, |
| MoqtDataParserVisitor* visitor) |
| : stream_(*stream), visitor_(*visitor) {} |
| |
| // Reads all of the available objects on the stream. |
| void ReadAllData(); |
| |
| void ReadStreamType(); |
| void ReadTrackAlias(); |
| void ReadAtMostOneObject(); |
| |
| // Returns the type of the unidirectional stream, if already known. |
| std::optional<MoqtDataStreamType> stream_type() const { |
| if (next_input_ == kStreamType) { |
| return std::nullopt; |
| } |
| return type_; |
| } |
| |
| // Returns the track alias, if already known. |
| std::optional<uint64_t> track_alias() const { |
| return (next_input_ == kStreamType || next_input_ == kTrackAlias || |
| next_input_ == kRequestId) |
| ? std::optional<uint64_t>() |
| : metadata_.track_alias; |
| } |
| |
| void set_default_publisher_priority(MoqtPriority priority) { |
| default_publisher_priority_ = priority; |
| } |
| |
| private: |
| friend class test::MoqtDataParserPeer; |
| |
| // Current state of the parser. |
| enum NextInput { |
| kStreamType, |
| kTrackAlias, // SUBSCRIBE/PUBLISH only. |
| kRequestId, // FETCH only. |
| kSerializationFlags, // FETCH only. |
| kGroupId, |
| kSubgroupId, |
| kPublisherPriority, |
| kObjectId, |
| kExtensionSize, |
| kExtensionBody, |
| kObjectPayloadLength, |
| kStatus, |
| kData, |
| kAwaitingNextByte, // Can't determine status until the next byte arrives. |
| kPadding, |
| kFailed, |
| }; |
| |
| // If a StopCondition callback returns true, parsing will terminate. |
| using StopCondition = quiche::UnretainedCallback<bool()>; |
| |
| struct State { |
| NextInput next_input; |
| uint64_t payload_remaining; |
| |
| bool operator==(const State&) const = default; |
| }; |
| State state() const { return State{next_input_, payload_length_remaining_}; } |
| |
| void ReadDataUntil(StopCondition stop_condition); |
| |
| // Reads a single varint from the underlying stream. Triggers a parse error if |
| // a FIN has been encountered. |
| std::optional<uint64_t> ReadVarInt62NoFin(); |
| // Reads a single uint8 from the underlying stream. Triggers a parse error if |
| // a FIN has been encountered. |
| std::optional<uint8_t> ReadUint8NoFin(); |
| |
| // Advances the state machine of the parser to the next expected state. |
| [[nodiscard]] NextInput AdvanceParserState(); |
| // Reads the next available item from the stream. |
| void ParseNextItemFromStream(); |
| // Checks if we have encountered a FIN without data. If so, processes it and |
| // returns true. |
| bool CheckForFinWithoutData(); |
| |
| void ParseError(absl::string_view reason); |
| |
| webtransport::Stream& stream_; |
| MoqtDataParserVisitor& visitor_; |
| |
| bool no_more_data_ = false; // Fatal error or fin. No more parsing. |
| bool parsing_error_ = false; |
| bool contains_end_of_group_ = false; // True if the stream contains an |
| // implied END_OF_GROUP object. |
| MoqtPriority default_publisher_priority_; |
| |
| std::string buffered_message_; |
| |
| MoqtDataStreamType type_; |
| MoqtFetchSerialization fetch_serialization_; |
| NextInput next_input_ = kStreamType; |
| MoqtObject metadata_; |
| std::optional<uint64_t> last_object_id_; |
| size_t payload_length_remaining_ = 0; |
| size_t num_objects_read_ = 0; |
| |
| bool processing_ = false; // True if currently in ProcessData(), to prevent |
| // re-entrancy. |
| }; |
| |
| } // namespace moqt |
| |
| #endif // QUICHE_QUIC_MOQT_MOQT_PARSER_H_ |