blob: 9c287ba14e7699fa5afedfcd12da1e9a4e122a8e [file] [log] [blame]
// Copyright (c) 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// A parser for draft-ietf-moq-transport.
// TODO(vasilvv): possibly split this header into two.
#ifndef QUICHE_QUIC_MOQT_MOQT_PARSER_H_
#define QUICHE_QUIC_MOQT_MOQT_PARSER_H_
#include <cstddef>
#include <cstdint>
#include <optional>
#include <string>
#include <vector>
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_data_reader.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/quiche_stream.h"
namespace moqt {
namespace test {
class MoqtDataParserPeer;
}
class QUICHE_EXPORT MoqtControlParserVisitor {
public:
virtual ~MoqtControlParserVisitor() = default;
// All of these are called only when the entire message has arrived. The
// parser retains ownership of the memory.
virtual void OnClientSetupMessage(const MoqtClientSetup& message) = 0;
virtual void OnServerSetupMessage(const MoqtServerSetup& message) = 0;
virtual void OnSubscribeMessage(const MoqtSubscribe& message) = 0;
virtual void OnSubscribeOkMessage(const MoqtSubscribeOk& message) = 0;
virtual void OnSubscribeErrorMessage(const MoqtSubscribeError& message) = 0;
virtual void OnUnsubscribeMessage(const MoqtUnsubscribe& message) = 0;
virtual void OnSubscribeDoneMessage(const MoqtSubscribeDone& message) = 0;
virtual void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) = 0;
virtual void OnAnnounceMessage(const MoqtAnnounce& message) = 0;
virtual void OnAnnounceOkMessage(const MoqtAnnounceOk& message) = 0;
virtual void OnAnnounceErrorMessage(const MoqtAnnounceError& message) = 0;
virtual void OnUnannounceMessage(const MoqtUnannounce& message) = 0;
virtual void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) = 0;
virtual void OnTrackStatusMessage(const MoqtTrackStatus& message) = 0;
virtual void OnTrackStatusOkMessage(const MoqtTrackStatusOk& message) = 0;
virtual void OnTrackStatusErrorMessage(
const MoqtTrackStatusError& message) = 0;
virtual void OnGoAwayMessage(const MoqtGoAway& message) = 0;
virtual void OnSubscribeNamespaceMessage(
const MoqtSubscribeNamespace& message) = 0;
virtual void OnSubscribeNamespaceOkMessage(
const MoqtSubscribeNamespaceOk& message) = 0;
virtual void OnSubscribeNamespaceErrorMessage(
const MoqtSubscribeNamespaceError& message) = 0;
virtual void OnUnsubscribeNamespaceMessage(
const MoqtUnsubscribeNamespace& message) = 0;
virtual void OnMaxRequestIdMessage(const MoqtMaxRequestId& message) = 0;
virtual void OnFetchMessage(const MoqtFetch& message) = 0;
virtual void OnFetchCancelMessage(const MoqtFetchCancel& message) = 0;
virtual void OnFetchOkMessage(const MoqtFetchOk& message) = 0;
virtual void OnFetchErrorMessage(const MoqtFetchError& message) = 0;
virtual void OnRequestsBlockedMessage(const MoqtRequestsBlocked& message) = 0;
virtual void OnPublishMessage(const MoqtPublish& message) = 0;
virtual void OnPublishOkMessage(const MoqtPublishOk& message) = 0;
virtual void OnPublishErrorMessage(const MoqtPublishError& message) = 0;
virtual void OnObjectAckMessage(const MoqtObjectAck& message) = 0;
virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0;
};
class MoqtDataParserVisitor {
public:
virtual ~MoqtDataParserVisitor() = default;
// If |end_of_message| is true, |payload| contains the last bytes of the
// OBJECT payload. If not, there will be subsequent calls with further payload
// data. The parser retains ownership of |message| and |payload|, so the
// visitor needs to copy anything it wants to retain.
// If `message.object_status` == `kNormal`, the status must not be used until
// `end_of_message` is true, since a FIN can change the status.
virtual void OnObjectMessage(const MoqtObject& message,
absl::string_view payload,
bool end_of_message) = 0;
virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0;
};
class QUICHE_EXPORT MoqtControlParser {
public:
MoqtControlParser(bool uses_web_transport, quiche::ReadStream* stream,
MoqtControlParserVisitor& visitor)
: visitor_(visitor),
stream_(*stream),
uses_web_transport_(uses_web_transport) {}
~MoqtControlParser() = default;
void ReadAndDispatchMessages();
private:
// The central switch statement to dispatch a message to the correct
// Process* function. Returns 0 if it could not parse the full messsage
// (except for object payload). Otherwise, returns the number of bytes
// processed.
size_t ProcessMessage(absl::string_view data, MoqtMessageType message_type);
// The Process* functions parse the serialized data into the appropriate
// structs, and call the relevant visitor function for further action. Returns
// the number of bytes consumed if the message is complete; returns 0
// otherwise.
size_t ProcessClientSetup(quic::QuicDataReader& reader);
size_t ProcessServerSetup(quic::QuicDataReader& reader);
// Subscribe formats are used for TrackStatus as well, so take the message
// type as an argument, defaulting to the subscribe version.
size_t ProcessSubscribe(
quic::QuicDataReader& reader,
MoqtMessageType message_type = MoqtMessageType::kSubscribe);
size_t ProcessSubscribeOk(
quic::QuicDataReader& reader,
MoqtMessageType message_type = MoqtMessageType::kSubscribeOk);
size_t ProcessSubscribeError(
quic::QuicDataReader& reader,
MoqtMessageType message_type = MoqtMessageType::kSubscribeError);
size_t ProcessUnsubscribe(quic::QuicDataReader& reader);
size_t ProcessSubscribeDone(quic::QuicDataReader& reader);
size_t ProcessSubscribeUpdate(quic::QuicDataReader& reader);
size_t ProcessAnnounce(quic::QuicDataReader& reader);
size_t ProcessAnnounceOk(quic::QuicDataReader& reader);
size_t ProcessAnnounceError(quic::QuicDataReader& reader);
size_t ProcessUnannounce(quic::QuicDataReader& reader);
size_t ProcessAnnounceCancel(quic::QuicDataReader& reader);
size_t ProcessTrackStatus(quic::QuicDataReader& reader);
size_t ProcessTrackStatusOk(quic::QuicDataReader& reader);
size_t ProcessTrackStatusError(quic::QuicDataReader& reader);
size_t ProcessGoAway(quic::QuicDataReader& reader);
size_t ProcessSubscribeNamespace(quic::QuicDataReader& reader);
size_t ProcessSubscribeNamespaceOk(quic::QuicDataReader& reader);
size_t ProcessSubscribeNamespaceError(quic::QuicDataReader& reader);
size_t ProcessUnsubscribeNamespace(quic::QuicDataReader& reader);
size_t ProcessMaxRequestId(quic::QuicDataReader& reader);
size_t ProcessFetch(quic::QuicDataReader& reader);
size_t ProcessFetchCancel(quic::QuicDataReader& reader);
size_t ProcessFetchOk(quic::QuicDataReader& reader);
size_t ProcessFetchError(quic::QuicDataReader& reader);
size_t ProcessRequestsBlocked(quic::QuicDataReader& reader);
size_t ProcessPublish(quic::QuicDataReader& reader);
size_t ProcessPublishOk(quic::QuicDataReader& reader);
size_t ProcessPublishError(quic::QuicDataReader& reader);
size_t ProcessObjectAck(quic::QuicDataReader& reader);
// If |error| is not provided, assumes kProtocolViolation.
void ParseError(absl::string_view reason);
void ParseError(MoqtError error, absl::string_view reason);
// Reads a TrackNamespace from the reader. Returns false if the namespace is
// too large. Sets a ParseError if the namespace is malformed.
bool ReadTrackNamespace(quic::QuicDataReader& reader,
TrackNamespace& track_namespace);
// Reads a FullTrackName from the reader. Returns false if the name is too
// large. Sets a ParseError if the name is malformed.
bool ReadFullTrackName(quic::QuicDataReader& reader,
FullTrackName& full_track_name);
// Translates raw key/value pairs into semantically meaningful formats.
// Returns false if the parameters contain a protocol violation.
bool KeyValuePairListToMoqtSessionParameters(
const KeyValuePairList& parameters, MoqtSessionParameters& out);
bool KeyValuePairListToVersionSpecificParameters(
const KeyValuePairList& parameters, VersionSpecificParameters& out);
bool ParseAuthTokenParameter(absl::string_view field,
std::vector<AuthToken>& out);
MoqtControlParserVisitor& visitor_;
quiche::ReadStream& stream_;
bool uses_web_transport_;
bool no_more_data_ = false; // Fatal error or fin. No more parsing.
bool parsing_error_ = false;
std::optional<uint64_t> message_type_;
std::optional<uint16_t> message_size_;
uint64_t max_auth_token_cache_size_ = 0;
uint64_t auth_token_cache_size_ = 0;
bool processing_ = false; // True if currently in ProcessData(), to prevent
// re-entrancy.
};
// Parses an MoQT datagram. Returns the payload bytes, or std::nullopt on error.
// The caller provides the whole datagram in `data`. The function puts the
// object metadata in `object_metadata`.
std::optional<absl::string_view> ParseDatagram(absl::string_view data,
MoqtObject& object_metadata);
// Parser for MoQT unidirectional data stream.
class QUICHE_EXPORT MoqtDataParser {
public:
// `stream` must outlive the parser. The parser does not configure itself as
// a listener for the read events of the stream; it is responsibility of the
// caller to do so via one of the read methods below.
explicit MoqtDataParser(quiche::ReadStream* stream,
MoqtDataParserVisitor* visitor)
: stream_(*stream), visitor_(*visitor) {}
// Reads all of the available objects on the stream.
void ReadAllData();
void ReadStreamType();
void ReadTrackAlias();
void ReadAtMostOneObject();
// Returns the type of the unidirectional stream, if already known.
std::optional<MoqtDataStreamType> stream_type() const { return type_; }
// Returns the track alias, if already known.
std::optional<uint64_t> track_alias() const {
return (next_input_ == kStreamType || next_input_ == kTrackAlias)
? std::optional<uint64_t>()
: metadata_.track_alias;
}
private:
friend class test::MoqtDataParserPeer;
// Current state of the parser.
enum NextInput {
kStreamType,
kTrackAlias,
kGroupId,
kSubgroupId,
kPublisherPriority,
kObjectId,
kExtensionSize,
kExtensionBody,
kObjectPayloadLength,
kStatus,
kData,
kAwaitingNextByte, // Can't determine status until the next byte arrives.
kPadding,
kFailed,
};
// If a StopCondition callback returns true, parsing will terminate.
using StopCondition = quiche::UnretainedCallback<bool()>;
struct State {
NextInput next_input;
uint64_t payload_remaining;
bool operator==(const State&) const = default;
};
State state() const { return State{next_input_, payload_length_remaining_}; }
void ReadDataUntil(StopCondition stop_condition);
// Reads a single varint from the underlying stream. Triggers a parse error if
// a FIN has been encountered.
std::optional<uint64_t> ReadVarInt62NoFin();
// Reads a single uint8 from the underlying stream. Triggers a parse error if
// a FIN has been encountered.
std::optional<uint8_t> ReadUint8NoFin();
// Advances the state machine of the parser to the next expected state.
void AdvanceParserState();
// Reads the next available item from the stream.
void ParseNextItemFromStream();
// Checks if we have encountered a FIN without data. If so, processes it and
// returns true.
bool CheckForFinWithoutData();
void ParseError(absl::string_view reason);
quiche::ReadStream& stream_;
MoqtDataParserVisitor& visitor_;
bool no_more_data_ = false; // Fatal error or fin. No more parsing.
bool parsing_error_ = false;
bool contains_end_of_group_ = false; // True if the stream contains an
// implied END_OF_GROUP object.
std::string buffered_message_;
std::optional<MoqtDataStreamType> type_ = std::nullopt;
NextInput next_input_ = kStreamType;
MoqtObject metadata_;
std::optional<uint64_t> last_object_id_;
size_t payload_length_remaining_ = 0;
size_t num_objects_read_ = 0;
bool processing_ = false; // True if currently in ProcessData(), to prevent
// re-entrancy.
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_MOQT_PARSER_H_