blob: 34953e4a0003f9f7686d3146f3d0beeafc334201 [file] [log] [blame]
// Copyright (c) 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// A parser for draft-ietf-moq-transport.
// TODO(vasilvv): possibly split this header into two.
#ifndef QUICHE_QUIC_MOQT_MOQT_PARSER_H_
#define QUICHE_QUIC_MOQT_MOQT_PARSER_H_
#include <cstddef>
#include <cstdint>
#include <optional>
#include <string>
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_data_reader.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/common/platform/api/quiche_export.h"
namespace moqt {
namespace test {
class MoqtDataParserPeer;
}
class QUICHE_EXPORT MoqtControlParserVisitor {
public:
virtual ~MoqtControlParserVisitor() = default;
// All of these are called only when the entire message has arrived. The
// parser retains ownership of the memory.
virtual void OnClientSetupMessage(const MoqtClientSetup& message) = 0;
virtual void OnServerSetupMessage(const MoqtServerSetup& message) = 0;
virtual void OnSubscribeMessage(const MoqtSubscribe& message) = 0;
virtual void OnSubscribeOkMessage(const MoqtSubscribeOk& message) = 0;
virtual void OnSubscribeErrorMessage(const MoqtSubscribeError& message) = 0;
virtual void OnUnsubscribeMessage(const MoqtUnsubscribe& message) = 0;
virtual void OnSubscribeDoneMessage(const MoqtSubscribeDone& message) = 0;
virtual void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) = 0;
virtual void OnAnnounceMessage(const MoqtAnnounce& message) = 0;
virtual void OnAnnounceOkMessage(const MoqtAnnounceOk& message) = 0;
virtual void OnAnnounceErrorMessage(const MoqtAnnounceError& message) = 0;
virtual void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) = 0;
virtual void OnTrackStatusRequestMessage(
const MoqtTrackStatusRequest& message) = 0;
virtual void OnUnannounceMessage(const MoqtUnannounce& message) = 0;
virtual void OnTrackStatusMessage(const MoqtTrackStatus& message) = 0;
virtual void OnGoAwayMessage(const MoqtGoAway& message) = 0;
virtual void OnSubscribeAnnouncesMessage(
const MoqtSubscribeAnnounces& message) = 0;
virtual void OnSubscribeAnnouncesOkMessage(
const MoqtSubscribeAnnouncesOk& message) = 0;
virtual void OnSubscribeAnnouncesErrorMessage(
const MoqtSubscribeAnnouncesError& message) = 0;
virtual void OnUnsubscribeAnnouncesMessage(
const MoqtUnsubscribeAnnounces& message) = 0;
virtual void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) = 0;
virtual void OnFetchMessage(const MoqtFetch& message) = 0;
virtual void OnFetchCancelMessage(const MoqtFetchCancel& message) = 0;
virtual void OnFetchOkMessage(const MoqtFetchOk& message) = 0;
virtual void OnFetchErrorMessage(const MoqtFetchError& message) = 0;
virtual void OnObjectAckMessage(const MoqtObjectAck& message) = 0;
virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0;
};
class MoqtDataParserVisitor {
public:
virtual ~MoqtDataParserVisitor() = default;
// If |end_of_message| is true, |payload| contains the last bytes of the
// OBJECT payload. If not, there will be subsequent calls with further payload
// data. The parser retains ownership of |message| and |payload|, so the
// visitor needs to copy anything it wants to retain.
virtual void OnObjectMessage(const MoqtObject& message,
absl::string_view payload,
bool end_of_message) = 0;
virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0;
};
class QUICHE_EXPORT MoqtControlParser {
public:
MoqtControlParser(bool uses_web_transport, MoqtControlParserVisitor& visitor)
: visitor_(visitor), uses_web_transport_(uses_web_transport) {}
~MoqtControlParser() = default;
// Take a buffer from the transport in |data|. Parse each complete message and
// call the appropriate visitor function. If |fin| is true, there
// is no more data arriving on the stream, so the parser will deliver any
// message encoded as to run to the end of the stream.
// All bytes can be freed. Calls OnParsingError() when there is a parsing
// error.
// Any calls after sending |fin| = true will be ignored.
// TODO(martinduke): Figure out what has to happen if the message arrives via
// datagram rather than a stream.
void ProcessData(absl::string_view data, bool fin);
private:
// The central switch statement to dispatch a message to the correct
// Process* function. Returns 0 if it could not parse the full messsage
// (except for object payload). Otherwise, returns the number of bytes
// processed.
size_t ProcessMessage(absl::string_view data);
// The Process* functions parse the serialized data into the appropriate
// structs, and call the relevant visitor function for further action. Returns
// the number of bytes consumed if the message is complete; returns 0
// otherwise.
size_t ProcessClientSetup(quic::QuicDataReader& reader);
size_t ProcessServerSetup(quic::QuicDataReader& reader);
size_t ProcessSubscribe(quic::QuicDataReader& reader);
size_t ProcessSubscribeOk(quic::QuicDataReader& reader);
size_t ProcessSubscribeError(quic::QuicDataReader& reader);
size_t ProcessUnsubscribe(quic::QuicDataReader& reader);
size_t ProcessSubscribeDone(quic::QuicDataReader& reader);
size_t ProcessSubscribeUpdate(quic::QuicDataReader& reader);
size_t ProcessAnnounce(quic::QuicDataReader& reader);
size_t ProcessAnnounceOk(quic::QuicDataReader& reader);
size_t ProcessAnnounceError(quic::QuicDataReader& reader);
size_t ProcessAnnounceCancel(quic::QuicDataReader& reader);
size_t ProcessTrackStatusRequest(quic::QuicDataReader& reader);
size_t ProcessUnannounce(quic::QuicDataReader& reader);
size_t ProcessTrackStatus(quic::QuicDataReader& reader);
size_t ProcessGoAway(quic::QuicDataReader& reader);
size_t ProcessSubscribeAnnounces(quic::QuicDataReader& reader);
size_t ProcessSubscribeAnnouncesOk(quic::QuicDataReader& reader);
size_t ProcessSubscribeAnnouncesError(quic::QuicDataReader& reader);
size_t ProcessUnsubscribeAnnounces(quic::QuicDataReader& reader);
size_t ProcessMaxSubscribeId(quic::QuicDataReader& reader);
size_t ProcessFetch(quic::QuicDataReader& reader);
size_t ProcessFetchCancel(quic::QuicDataReader& reader);
size_t ProcessFetchOk(quic::QuicDataReader& reader);
size_t ProcessFetchError(quic::QuicDataReader& reader);
size_t ProcessObjectAck(quic::QuicDataReader& reader);
// If |error| is not provided, assumes kProtocolViolation.
void ParseError(absl::string_view reason);
void ParseError(MoqtError error, absl::string_view reason);
// Reads an integer whose length is specified by a preceding VarInt62 and
// returns it in |result|. Returns false if parsing fails.
bool ReadVarIntPieceVarInt62(quic::QuicDataReader& reader, uint64_t& result);
// Read a parameter and return the value as a string_view. Returns false if
// |reader| does not have enough data.
bool ReadParameter(quic::QuicDataReader& reader, uint64_t& type,
absl::string_view& value);
// Reads MoqtSubscribeParameter from one of the message types that supports
// it. The cursor in |reader| should point to the "number of parameters"
// field in the message. The cursor will move to the end of the parameters.
// Returns false if it could not parse the full message, in which case the
// cursor in |reader| should not be used.
bool ReadSubscribeParameters(quic::QuicDataReader& reader,
MoqtSubscribeParameters& params);
// Convert a string view to a varint. Throws an error and returns false if the
// string_view is not exactly the right length.
bool StringViewToVarInt(absl::string_view& sv, uint64_t& vi);
// Parses a message that a track namespace but not name. The last element of
// |full_track_name| will be set to the empty string. Returns false if it
// could not parse the full namespace field.
bool ReadTrackNamespace(quic::QuicDataReader& reader,
FullTrackName& full_track_name);
MoqtControlParserVisitor& visitor_;
bool uses_web_transport_;
bool no_more_data_ = false; // Fatal error or fin. No more parsing.
bool parsing_error_ = false;
std::string buffered_message_;
bool processing_ = false; // True if currently in ProcessData(), to prevent
// re-entrancy.
};
// Parses an MoQT datagram. Returns the payload bytes, or empty string_view on
// error. The caller provides the whole datagram in `data`. The function puts
// the object metadata in `object_metadata`.
absl::string_view ParseDatagram(absl::string_view data,
MoqtObject& object_metadata);
// Parser for MoQT unidirectional data stream.
class QUICHE_EXPORT MoqtDataParser {
public:
explicit MoqtDataParser(MoqtDataParserVisitor* visitor)
: visitor_(*visitor) {}
~MoqtDataParser() = default;
// Take a buffer from the transport in |data|. Parse each complete message and
// call the appropriate visitor function. If |fin| is true, there
// is no more data arriving on the stream, so the parser will deliver any
// message encoded as to run to the end of the stream.
// All bytes can be freed. Calls OnParsingError() when there is a parsing
// error.
void ProcessData(absl::string_view data, bool fin);
// Alters `chunk_size_` value (see discussion below). Primarily intended to
// be used for testing.
void set_chunk_size(size_t size) { chunk_size_ = size; }
std::optional<MoqtDataStreamType> stream_type() const { return type_; }
private:
friend class test::MoqtDataParserPeer;
// If there is buffered data from the previous attempt at parsing it, new data
// will be added in `chunk_size_`-sized chunks.
constexpr static size_t kDefaultChunkSize = 64;
// Current state of the parser.
enum NextInput {
// Nothing has been read yet; the next thing to be read is the stream type
// varint.
kStreamType,
// The next thing to be read is the stream header.
kHeader,
// The next thing to be read is the stream subheader for the given object.
kSubheader,
// The next thing to be read is the object payload.
kData,
// The next thing to be read (and ignored) is padding.
kPadding,
};
// Infers the current state of the parser.
NextInput GetNextInput() const {
if (!type_.has_value()) {
return kStreamType;
}
if (type_ == MoqtDataStreamType::kPadding) {
return kPadding;
}
if (!metadata_.has_value()) {
return kHeader;
}
if (payload_length_remaining_ > 0) {
return kData;
}
return kSubheader;
}
// Processes all that can be entirely processed, and returns the view for the
// data that needs to be buffered.
absl::string_view ProcessDataInner(absl::string_view data);
void ParseError(absl::string_view reason);
MoqtDataParserVisitor& visitor_;
size_t chunk_size_ = kDefaultChunkSize;
bool no_more_data_ = false; // Fatal error or fin. No more parsing.
bool parsing_error_ = false;
std::string buffered_message_;
// The three variables below implicitly drive the state machine; see
// `GetNextInput()` for how the state is derived.
std::optional<MoqtDataStreamType> type_ = std::nullopt;
std::optional<MoqtObject> metadata_ = std::nullopt;
size_t payload_length_remaining_ = 0;
bool processing_ = false; // True if currently in ProcessData(), to prevent
// re-entrancy.
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_MOQT_PARSER_H_