|  | // Copyright (c) 2023 The Chromium Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | // A parser for draft-ietf-moq-transport. | 
|  | // TODO(vasilvv): possibly split this header into two. | 
|  |  | 
|  | #ifndef QUICHE_QUIC_MOQT_MOQT_PARSER_H_ | 
|  | #define QUICHE_QUIC_MOQT_MOQT_PARSER_H_ | 
|  |  | 
|  | #include <cstddef> | 
|  | #include <cstdint> | 
|  | #include <optional> | 
|  | #include <string> | 
|  | #include <vector> | 
|  |  | 
|  | #include "absl/strings/string_view.h" | 
|  | #include "quiche/quic/core/quic_data_reader.h" | 
|  | #include "quiche/quic/moqt/moqt_messages.h" | 
|  | #include "quiche/common/platform/api/quiche_export.h" | 
|  | #include "quiche/common/quiche_callbacks.h" | 
|  | #include "quiche/common/quiche_stream.h" | 
|  |  | 
|  | namespace moqt { | 
|  |  | 
|  | namespace test { | 
|  | class MoqtDataParserPeer; | 
|  | } | 
|  |  | 
|  | class QUICHE_EXPORT MoqtControlParserVisitor { | 
|  | public: | 
|  | virtual ~MoqtControlParserVisitor() = default; | 
|  |  | 
|  | // All of these are called only when the entire message has arrived. The | 
|  | // parser retains ownership of the memory. | 
|  | virtual void OnClientSetupMessage(const MoqtClientSetup& message) = 0; | 
|  | virtual void OnServerSetupMessage(const MoqtServerSetup& message) = 0; | 
|  | virtual void OnSubscribeMessage(const MoqtSubscribe& message) = 0; | 
|  | virtual void OnSubscribeOkMessage(const MoqtSubscribeOk& message) = 0; | 
|  | virtual void OnSubscribeErrorMessage(const MoqtSubscribeError& message) = 0; | 
|  | virtual void OnUnsubscribeMessage(const MoqtUnsubscribe& message) = 0; | 
|  | virtual void OnSubscribeDoneMessage(const MoqtSubscribeDone& message) = 0; | 
|  | virtual void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) = 0; | 
|  | virtual void OnAnnounceMessage(const MoqtAnnounce& message) = 0; | 
|  | virtual void OnAnnounceOkMessage(const MoqtAnnounceOk& message) = 0; | 
|  | virtual void OnAnnounceErrorMessage(const MoqtAnnounceError& message) = 0; | 
|  | virtual void OnUnannounceMessage(const MoqtUnannounce& message) = 0; | 
|  | virtual void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) = 0; | 
|  | virtual void OnTrackStatusMessage(const MoqtTrackStatus& message) = 0; | 
|  | virtual void OnTrackStatusOkMessage(const MoqtTrackStatusOk& message) = 0; | 
|  | virtual void OnTrackStatusErrorMessage( | 
|  | const MoqtTrackStatusError& message) = 0; | 
|  | virtual void OnGoAwayMessage(const MoqtGoAway& message) = 0; | 
|  | virtual void OnSubscribeNamespaceMessage( | 
|  | const MoqtSubscribeNamespace& message) = 0; | 
|  | virtual void OnSubscribeNamespaceOkMessage( | 
|  | const MoqtSubscribeNamespaceOk& message) = 0; | 
|  | virtual void OnSubscribeNamespaceErrorMessage( | 
|  | const MoqtSubscribeNamespaceError& message) = 0; | 
|  | virtual void OnUnsubscribeNamespaceMessage( | 
|  | const MoqtUnsubscribeNamespace& message) = 0; | 
|  | virtual void OnMaxRequestIdMessage(const MoqtMaxRequestId& message) = 0; | 
|  | virtual void OnFetchMessage(const MoqtFetch& message) = 0; | 
|  | virtual void OnFetchCancelMessage(const MoqtFetchCancel& message) = 0; | 
|  | virtual void OnFetchOkMessage(const MoqtFetchOk& message) = 0; | 
|  | virtual void OnFetchErrorMessage(const MoqtFetchError& message) = 0; | 
|  | virtual void OnRequestsBlockedMessage(const MoqtRequestsBlocked& message) = 0; | 
|  | virtual void OnPublishMessage(const MoqtPublish& message) = 0; | 
|  | virtual void OnPublishOkMessage(const MoqtPublishOk& message) = 0; | 
|  | virtual void OnPublishErrorMessage(const MoqtPublishError& message) = 0; | 
|  | virtual void OnObjectAckMessage(const MoqtObjectAck& message) = 0; | 
|  |  | 
|  | virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0; | 
|  | }; | 
|  |  | 
|  | class MoqtDataParserVisitor { | 
|  | public: | 
|  | virtual ~MoqtDataParserVisitor() = default; | 
|  |  | 
|  | // If |end_of_message| is true, |payload| contains the last bytes of the | 
|  | // OBJECT payload. If not, there will be subsequent calls with further payload | 
|  | // data. The parser retains ownership of |message| and |payload|, so the | 
|  | // visitor needs to copy anything it wants to retain. | 
|  | // If `message.object_status` == `kNormal`, the status must not be used until | 
|  | // `end_of_message` is true, since a FIN can change the status. | 
|  | virtual void OnObjectMessage(const MoqtObject& message, | 
|  | absl::string_view payload, | 
|  | bool end_of_message) = 0; | 
|  |  | 
|  | virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0; | 
|  | }; | 
|  |  | 
|  | class QUICHE_EXPORT MoqtControlParser { | 
|  | public: | 
|  | MoqtControlParser(bool uses_web_transport, quiche::ReadStream* stream, | 
|  | MoqtControlParserVisitor& visitor) | 
|  | : visitor_(visitor), | 
|  | stream_(*stream), | 
|  | uses_web_transport_(uses_web_transport) {} | 
|  | ~MoqtControlParser() = default; | 
|  |  | 
|  | void ReadAndDispatchMessages(); | 
|  |  | 
|  | private: | 
|  | // The central switch statement to dispatch a message to the correct | 
|  | // Process* function. Returns 0 if it could not parse the full messsage | 
|  | // (except for object payload). Otherwise, returns the number of bytes | 
|  | // processed. | 
|  | size_t ProcessMessage(absl::string_view data, MoqtMessageType message_type); | 
|  |  | 
|  | // The Process* functions parse the serialized data into the appropriate | 
|  | // structs, and call the relevant visitor function for further action. Returns | 
|  | // the number of bytes consumed if the message is complete; returns 0 | 
|  | // otherwise. | 
|  | size_t ProcessClientSetup(quic::QuicDataReader& reader); | 
|  | size_t ProcessServerSetup(quic::QuicDataReader& reader); | 
|  | // Subscribe formats are used for TrackStatus as well, so take the message | 
|  | // type as an argument, defaulting to the subscribe version. | 
|  | size_t ProcessSubscribe( | 
|  | quic::QuicDataReader& reader, | 
|  | MoqtMessageType message_type = MoqtMessageType::kSubscribe); | 
|  | size_t ProcessSubscribeOk( | 
|  | quic::QuicDataReader& reader, | 
|  | MoqtMessageType message_type = MoqtMessageType::kSubscribeOk); | 
|  | size_t ProcessSubscribeError( | 
|  | quic::QuicDataReader& reader, | 
|  | MoqtMessageType message_type = MoqtMessageType::kSubscribeError); | 
|  | size_t ProcessUnsubscribe(quic::QuicDataReader& reader); | 
|  | size_t ProcessSubscribeDone(quic::QuicDataReader& reader); | 
|  | size_t ProcessSubscribeUpdate(quic::QuicDataReader& reader); | 
|  | size_t ProcessAnnounce(quic::QuicDataReader& reader); | 
|  | size_t ProcessAnnounceOk(quic::QuicDataReader& reader); | 
|  | size_t ProcessAnnounceError(quic::QuicDataReader& reader); | 
|  | size_t ProcessUnannounce(quic::QuicDataReader& reader); | 
|  | size_t ProcessAnnounceCancel(quic::QuicDataReader& reader); | 
|  | size_t ProcessTrackStatus(quic::QuicDataReader& reader); | 
|  | size_t ProcessTrackStatusOk(quic::QuicDataReader& reader); | 
|  | size_t ProcessTrackStatusError(quic::QuicDataReader& reader); | 
|  | size_t ProcessGoAway(quic::QuicDataReader& reader); | 
|  | size_t ProcessSubscribeNamespace(quic::QuicDataReader& reader); | 
|  | size_t ProcessSubscribeNamespaceOk(quic::QuicDataReader& reader); | 
|  | size_t ProcessSubscribeNamespaceError(quic::QuicDataReader& reader); | 
|  | size_t ProcessUnsubscribeNamespace(quic::QuicDataReader& reader); | 
|  | size_t ProcessMaxRequestId(quic::QuicDataReader& reader); | 
|  | size_t ProcessFetch(quic::QuicDataReader& reader); | 
|  | size_t ProcessFetchCancel(quic::QuicDataReader& reader); | 
|  | size_t ProcessFetchOk(quic::QuicDataReader& reader); | 
|  | size_t ProcessFetchError(quic::QuicDataReader& reader); | 
|  | size_t ProcessRequestsBlocked(quic::QuicDataReader& reader); | 
|  | size_t ProcessPublish(quic::QuicDataReader& reader); | 
|  | size_t ProcessPublishOk(quic::QuicDataReader& reader); | 
|  | size_t ProcessPublishError(quic::QuicDataReader& reader); | 
|  | size_t ProcessObjectAck(quic::QuicDataReader& reader); | 
|  |  | 
|  | // If |error| is not provided, assumes kProtocolViolation. | 
|  | void ParseError(absl::string_view reason); | 
|  | void ParseError(MoqtError error, absl::string_view reason); | 
|  |  | 
|  | // Reads a TrackNamespace from the reader. Returns false if the namespace is | 
|  | // too large. Sets a ParseError if the namespace is malformed. | 
|  | bool ReadTrackNamespace(quic::QuicDataReader& reader, | 
|  | TrackNamespace& track_namespace); | 
|  | // Reads a FullTrackName from the reader. Returns false if the name is too | 
|  | // large. Sets a ParseError if the name is malformed. | 
|  | bool ReadFullTrackName(quic::QuicDataReader& reader, | 
|  | FullTrackName& full_track_name); | 
|  | // Translates raw key/value pairs into semantically meaningful formats. | 
|  | // Returns false if the parameters contain a protocol violation. | 
|  | bool KeyValuePairListToMoqtSessionParameters( | 
|  | const KeyValuePairList& parameters, MoqtSessionParameters& out); | 
|  | bool KeyValuePairListToVersionSpecificParameters( | 
|  | const KeyValuePairList& parameters, VersionSpecificParameters& out); | 
|  | bool ParseAuthTokenParameter(absl::string_view field, | 
|  | std::vector<AuthToken>& out); | 
|  |  | 
|  | MoqtControlParserVisitor& visitor_; | 
|  | quiche::ReadStream& stream_; | 
|  | bool uses_web_transport_; | 
|  | bool no_more_data_ = false;  // Fatal error or fin. No more parsing. | 
|  | bool parsing_error_ = false; | 
|  |  | 
|  | std::optional<uint64_t> message_type_; | 
|  | std::optional<uint16_t> message_size_; | 
|  |  | 
|  | uint64_t max_auth_token_cache_size_ = 0; | 
|  | uint64_t auth_token_cache_size_ = 0; | 
|  | bool processing_ = false;  // True if currently in ProcessData(), to prevent | 
|  | // re-entrancy. | 
|  | }; | 
|  |  | 
|  | // Parses an MoQT datagram. Returns the payload bytes, or std::nullopt on error. | 
|  | // The caller provides the whole datagram in `data`.  The function puts the | 
|  | // object metadata in `object_metadata`. | 
|  | std::optional<absl::string_view> ParseDatagram(absl::string_view data, | 
|  | MoqtObject& object_metadata); | 
|  |  | 
|  | // Parser for MoQT unidirectional data stream. | 
|  | class QUICHE_EXPORT MoqtDataParser { | 
|  | public: | 
|  | // `stream` must outlive the parser.  The parser does not configure itself as | 
|  | // a listener for the read events of the stream; it is responsibility of the | 
|  | // caller to do so via one of the read methods below. | 
|  | explicit MoqtDataParser(quiche::ReadStream* stream, | 
|  | MoqtDataParserVisitor* visitor) | 
|  | : stream_(*stream), visitor_(*visitor) {} | 
|  |  | 
|  | // Reads all of the available objects on the stream. | 
|  | void ReadAllData(); | 
|  |  | 
|  | void ReadStreamType(); | 
|  | void ReadTrackAlias(); | 
|  | void ReadAtMostOneObject(); | 
|  |  | 
|  | // Returns the type of the unidirectional stream, if already known. | 
|  | std::optional<MoqtDataStreamType> stream_type() const { return type_; } | 
|  |  | 
|  | // Returns the track alias, if already known. | 
|  | std::optional<uint64_t> track_alias() const { | 
|  | return (next_input_ == kStreamType || next_input_ == kTrackAlias) | 
|  | ? std::optional<uint64_t>() | 
|  | : metadata_.track_alias; | 
|  | } | 
|  |  | 
|  | private: | 
|  | friend class test::MoqtDataParserPeer; | 
|  |  | 
|  | // Current state of the parser. | 
|  | enum NextInput { | 
|  | kStreamType, | 
|  | kTrackAlias, | 
|  | kGroupId, | 
|  | kSubgroupId, | 
|  | kPublisherPriority, | 
|  | kObjectId, | 
|  | kExtensionSize, | 
|  | kExtensionBody, | 
|  | kObjectPayloadLength, | 
|  | kStatus, | 
|  | kData, | 
|  | kAwaitingNextByte,  // Can't determine status until the next byte arrives. | 
|  | kPadding, | 
|  | kFailed, | 
|  | }; | 
|  |  | 
|  | // If a StopCondition callback returns true, parsing will terminate. | 
|  | using StopCondition = quiche::UnretainedCallback<bool()>; | 
|  |  | 
|  | struct State { | 
|  | NextInput next_input; | 
|  | uint64_t payload_remaining; | 
|  |  | 
|  | bool operator==(const State&) const = default; | 
|  | }; | 
|  | State state() const { return State{next_input_, payload_length_remaining_}; } | 
|  |  | 
|  | void ReadDataUntil(StopCondition stop_condition); | 
|  |  | 
|  | // Reads a single varint from the underlying stream. Triggers a parse error if | 
|  | // a FIN has been encountered. | 
|  | std::optional<uint64_t> ReadVarInt62NoFin(); | 
|  | // Reads a single uint8 from the underlying stream. Triggers a parse error if | 
|  | // a FIN has been encountered. | 
|  | std::optional<uint8_t> ReadUint8NoFin(); | 
|  |  | 
|  | // Advances the state machine of the parser to the next expected state. | 
|  | void AdvanceParserState(); | 
|  | // Reads the next available item from the stream. | 
|  | void ParseNextItemFromStream(); | 
|  | // Checks if we have encountered a FIN without data.  If so, processes it and | 
|  | // returns true. | 
|  | bool CheckForFinWithoutData(); | 
|  |  | 
|  | void ParseError(absl::string_view reason); | 
|  |  | 
|  | quiche::ReadStream& stream_; | 
|  | MoqtDataParserVisitor& visitor_; | 
|  |  | 
|  | bool no_more_data_ = false;  // Fatal error or fin. No more parsing. | 
|  | bool parsing_error_ = false; | 
|  | bool contains_end_of_group_ = false;  // True if the stream contains an | 
|  | // implied END_OF_GROUP object. | 
|  |  | 
|  | std::string buffered_message_; | 
|  |  | 
|  | std::optional<MoqtDataStreamType> type_ = std::nullopt; | 
|  | NextInput next_input_ = kStreamType; | 
|  | MoqtObject metadata_; | 
|  | std::optional<uint64_t> last_object_id_; | 
|  | size_t payload_length_remaining_ = 0; | 
|  | size_t num_objects_read_ = 0; | 
|  |  | 
|  | bool processing_ = false;  // True if currently in ProcessData(), to prevent | 
|  | // re-entrancy. | 
|  | }; | 
|  |  | 
|  | }  // namespace moqt | 
|  |  | 
|  | #endif  // QUICHE_QUIC_MOQT_MOQT_PARSER_H_ |