// Copyright (c) 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "quiche/quic/moqt/moqt_parser.h"

#include <algorithm>
#include <array>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <optional>
#include <string>

#include "absl/base/casts.h"
#include "absl/cleanup/cleanup.h"
#include "absl/container/fixed_array.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "quiche/quic/core/quic_data_reader.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_data_reader.h"
#include "quiche/common/quiche_stream.h"

namespace moqt {

namespace {

bool ParseDeliveryOrder(uint8_t raw_value,
                        std::optional<MoqtDeliveryOrder>& output) {
  switch (raw_value) {
    case 0x00:
      output = std::nullopt;
      return true;
    case 0x01:
      output = MoqtDeliveryOrder::kAscending;
      return true;
    case 0x02:
      output = MoqtDeliveryOrder::kDescending;
      return true;
    default:
      return false;
  }
}

uint64_t SignedVarintUnserializedForm(uint64_t value) {
  if (value & 0x01) {
    return -(value >> 1);
  }
  return value >> 1;
}

bool IsAllowedStreamType(uint64_t value) {
  constexpr std::array kAllowedStreamTypes = {
      MoqtDataStreamType::kStreamHeaderSubgroup,
      MoqtDataStreamType::kStreamHeaderFetch, MoqtDataStreamType::kPadding};
  for (MoqtDataStreamType type : kAllowedStreamTypes) {
    if (static_cast<uint64_t>(type) == value) {
      return true;
    }
  }
  return false;
}

std::optional<uint64_t> ReadVarInt62FromStream(quiche::ReadStream& stream,
                                               bool& fin_read) {
  fin_read = false;

  quiche::ReadStream::PeekResult peek_result = stream.PeekNextReadableRegion();
  if (peek_result.peeked_data.empty()) {
    if (peek_result.fin_next) {
      fin_read = stream.SkipBytes(0);
      QUICHE_DCHECK(fin_read);
    }
    return std::nullopt;
  }
  char first_byte = peek_result.peeked_data[0];
  size_t varint_size =
      1 << ((absl::bit_cast<uint8_t>(first_byte) & 0b11000000) >> 6);
  if (stream.ReadableBytes() < varint_size) {
    return std::nullopt;
  }

  char buffer[8];
  absl::Span<char> bytes_to_read =
      absl::MakeSpan(buffer).subspan(0, varint_size);
  quiche::ReadStream::ReadResult read_result = stream.Read(bytes_to_read);
  QUICHE_DCHECK_EQ(read_result.bytes_read, varint_size);
  fin_read = read_result.fin;

  quiche::QuicheDataReader reader(buffer, read_result.bytes_read);
  uint64_t result;
  bool success = reader.ReadVarInt62(&result);
  QUICHE_DCHECK(success);
  QUICHE_DCHECK(reader.IsDoneReading());
  return result;
}

// Reads from |reader| to list. Returns false if there is a read error.
bool ParseKeyValuePairList(quic::QuicDataReader& reader,
                           KeyValuePairList& list) {
  list.clear();
  uint64_t num_params;
  if (!reader.ReadVarInt62(&num_params)) {
    return false;
  }
  for (uint64_t i = 0; i < num_params; ++i) {
    uint64_t type;
    if (!reader.ReadVarInt62(&type)) {
      return false;
    }
    if (type % 2 == 1) {
      absl::string_view bytes;
      if (!reader.ReadStringPieceVarInt62(&bytes)) {
        return false;
      }
      list.insert(type, bytes);
      continue;
    }
    uint64_t value;
    if (!reader.ReadVarInt62(&value)) {
      return false;
    }
    list.insert(type, value);
  }
  return true;
}

void KeyValuePairListToMoqtSessionParameters(const KeyValuePairList& parameters,
                                             MoqtSessionParameters& out) {
  parameters.ForEach(
      [&](uint64_t key, uint64_t value) {
        SetupParameter parameter = static_cast<SetupParameter>(key);
        switch (parameter) {
          case SetupParameter::kMaxRequestId:
            out.max_request_id = value;
            break;
          case SetupParameter::kMaxAuthTokenCacheSize:
            out.max_auth_token_cache_size = value;
            break;
          case SetupParameter::kSupportObjectAcks:
            out.support_object_acks = (value == 1);
            break;
          default:
            break;
        }
        return true;
      },
      [&](uint64_t key, absl::string_view value) {
        SetupParameter parameter = static_cast<SetupParameter>(key);
        switch (parameter) {
          case SetupParameter::kPath:
            out.path = value;
            break;
          default:
            break;
        }
        return true;
      });
}

}  // namespace

void MoqtControlParser::ReadAndDispatchMessages() {
  if (no_more_data_) {
    ParseError("Data after end of stream");
    return;
  }
  if (processing_) {
    return;
  }
  processing_ = true;
  auto on_return = absl::MakeCleanup([&] { processing_ = false; });
  while (!no_more_data_) {
    bool fin_read = false;
    // Read the message type.
    if (!message_type_.has_value()) {
      message_type_ = ReadVarInt62FromStream(stream_, fin_read);
      if (fin_read) {
        ParseError("FIN on control stream");
        return;
      }
      if (!message_type_.has_value()) {
        return;
      }
    }
    QUICHE_DCHECK(message_type_.has_value());

    // Read the message length.
    if (!message_size_.has_value()) {
      if (stream_.ReadableBytes() < 2) {
        return;
      }
      std::array<char, 2> size_bytes;
      quiche::ReadStream::ReadResult result =
          stream_.Read(absl::MakeSpan(size_bytes));
      if (result.bytes_read != 2) {
        ParseError(MoqtError::kInternalError,
                   "Stream returned incorrect ReadableBytes");
        return;
      }
      if (result.fin) {
        ParseError("FIN on control stream");
        return;
      }
      message_size_ = static_cast<uint16_t>(size_bytes[0]) << 8 |
                      static_cast<uint16_t>(size_bytes[1]);
      if (*message_size_ > kMaxMessageHeaderSize) {
        ParseError(MoqtError::kInternalError,
                   absl::StrCat("Cannot parse control messages more than ",
                                kMaxMessageHeaderSize, " bytes"));
        return;
      }
    }
    QUICHE_DCHECK(message_size_.has_value());

    // Read the message if it's fully received.
    //
    // CAUTION: if the flow control windows are too low, and
    // kMaxMessageHeaderSize is too high, this will cause a deadlock.
    if (stream_.ReadableBytes() < *message_size_) {
      return;
    }
    absl::FixedArray<char> message(*message_size_);
    quiche::ReadStream::ReadResult result =
        stream_.Read(absl::MakeSpan(message));
    if (result.bytes_read != *message_size_) {
      ParseError("Stream returned incorrect ReadableBytes");
      return;
    }
    if (result.fin) {
      ParseError("FIN on control stream");
      return;
    }

    ProcessMessage(absl::string_view(message.data(), message.size()),
                   static_cast<MoqtMessageType>(*message_type_));
    message_type_.reset();
    message_size_.reset();
  }
}

size_t MoqtControlParser::ProcessMessage(absl::string_view data,
                                         MoqtMessageType message_type) {
  quic::QuicDataReader reader(data);
  size_t bytes_read;
  switch (message_type) {
    case MoqtMessageType::kClientSetup:
      bytes_read = ProcessClientSetup(reader);
      break;
    case MoqtMessageType::kServerSetup:
      bytes_read = ProcessServerSetup(reader);
      break;
    case MoqtMessageType::kSubscribe:
      bytes_read = ProcessSubscribe(reader);
      break;
    case MoqtMessageType::kSubscribeOk:
      bytes_read = ProcessSubscribeOk(reader);
      break;
    case MoqtMessageType::kSubscribeError:
      bytes_read = ProcessSubscribeError(reader);
      break;
    case MoqtMessageType::kUnsubscribe:
      bytes_read = ProcessUnsubscribe(reader);
      break;
    case MoqtMessageType::kSubscribeDone:
      bytes_read = ProcessSubscribeDone(reader);
      break;
    case MoqtMessageType::kSubscribeUpdate:
      bytes_read = ProcessSubscribeUpdate(reader);
      break;
    case MoqtMessageType::kAnnounce:
      bytes_read = ProcessAnnounce(reader);
      break;
    case MoqtMessageType::kAnnounceOk:
      bytes_read = ProcessAnnounceOk(reader);
      break;
    case MoqtMessageType::kAnnounceError:
      bytes_read = ProcessAnnounceError(reader);
      break;
    case MoqtMessageType::kAnnounceCancel:
      bytes_read = ProcessAnnounceCancel(reader);
      break;
    case MoqtMessageType::kTrackStatusRequest:
      bytes_read = ProcessTrackStatusRequest(reader);
      break;
    case MoqtMessageType::kUnannounce:
      bytes_read = ProcessUnannounce(reader);
      break;
    case MoqtMessageType::kTrackStatus:
      bytes_read = ProcessTrackStatus(reader);
      break;
    case MoqtMessageType::kGoAway:
      bytes_read = ProcessGoAway(reader);
      break;
    case MoqtMessageType::kSubscribeAnnounces:
      bytes_read = ProcessSubscribeAnnounces(reader);
      break;
    case MoqtMessageType::kSubscribeAnnouncesOk:
      bytes_read = ProcessSubscribeAnnouncesOk(reader);
      break;
    case MoqtMessageType::kSubscribeAnnouncesError:
      bytes_read = ProcessSubscribeAnnouncesError(reader);
      break;
    case MoqtMessageType::kUnsubscribeAnnounces:
      bytes_read = ProcessUnsubscribeAnnounces(reader);
      break;
    case MoqtMessageType::kMaxRequestId:
      bytes_read = ProcessMaxRequestId(reader);
      break;
    case MoqtMessageType::kFetch:
      bytes_read = ProcessFetch(reader);
      break;
    case MoqtMessageType::kFetchCancel:
      bytes_read = ProcessFetchCancel(reader);
      break;
    case MoqtMessageType::kFetchOk:
      bytes_read = ProcessFetchOk(reader);
      break;
    case MoqtMessageType::kFetchError:
      bytes_read = ProcessFetchError(reader);
      break;
    case MoqtMessageType::kRequestsBlocked:
      bytes_read = ProcessRequestsBlocked(reader);
      break;
    case moqt::MoqtMessageType::kObjectAck:
      bytes_read = ProcessObjectAck(reader);
      break;
    default:
      ParseError("Unknown message type");
      bytes_read = 0;
      break;
  }
  if (bytes_read != data.size() || bytes_read == 0) {
    ParseError("Message length does not match payload length");
    return 0;
  }
  return bytes_read;
}

size_t MoqtControlParser::ProcessClientSetup(quic::QuicDataReader& reader) {
  MoqtClientSetup setup;
  setup.parameters.using_webtrans = uses_web_transport_;
  setup.parameters.perspective = quic::Perspective::IS_CLIENT;
  uint64_t number_of_supported_versions;
  if (!reader.ReadVarInt62(&number_of_supported_versions)) {
    return 0;
  }
  uint64_t version;
  for (uint64_t i = 0; i < number_of_supported_versions; ++i) {
    if (!reader.ReadVarInt62(&version)) {
      return 0;
    }
    setup.supported_versions.push_back(static_cast<MoqtVersion>(version));
  }
  KeyValuePairList parameters;
  if (!ParseKeyValuePairList(reader, parameters)) {
    return 0;
  }
  MoqtError error = ValidateSetupParameters(parameters, uses_web_transport_,
                                            quic::Perspective::IS_SERVER);
  if (error != MoqtError::kNoError) {
    ParseError(error, "Client SETUP contains invalid parameters");
    return 0;
  }
  KeyValuePairListToMoqtSessionParameters(parameters, setup.parameters);
  // TODO(martinduke): Validate construction of the PATH (Sec 8.3.2.1)
  visitor_.OnClientSetupMessage(setup);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessServerSetup(quic::QuicDataReader& reader) {
  MoqtServerSetup setup;
  setup.parameters.using_webtrans = uses_web_transport_;
  setup.parameters.perspective = quic::Perspective::IS_SERVER;
  uint64_t version;
  if (!reader.ReadVarInt62(&version)) {
    return 0;
  }
  setup.selected_version = static_cast<MoqtVersion>(version);
  KeyValuePairList parameters;
  if (!ParseKeyValuePairList(reader, parameters)) {
    return 0;
  }
  MoqtError error = ValidateSetupParameters(parameters, uses_web_transport_,
                                            quic::Perspective::IS_CLIENT);
  if (error != MoqtError::kNoError) {
    ParseError(error, "Server SETUP contains invalid parameters");
    return 0;
  }
  KeyValuePairListToMoqtSessionParameters(parameters, setup.parameters);
  visitor_.OnServerSetupMessage(setup);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessSubscribe(quic::QuicDataReader& reader) {
  MoqtSubscribe subscribe;
  uint64_t filter, group, object;
  uint8_t group_order, forward;
  absl::string_view track_name;
  if (!reader.ReadVarInt62(&subscribe.request_id) ||
      !reader.ReadVarInt62(&subscribe.track_alias) ||
      !ReadTrackNamespace(reader, subscribe.full_track_name) ||
      !reader.ReadStringPieceVarInt62(&track_name) ||
      !reader.ReadUInt8(&subscribe.subscriber_priority) ||
      !reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&forward) ||
      !reader.ReadVarInt62(&filter)) {
    return 0;
  }
  subscribe.full_track_name.AddElement(track_name);
  if (!ParseDeliveryOrder(group_order, subscribe.group_order)) {
    ParseError("Invalid group order value in SUBSCRIBE");
    return 0;
  }
  if (forward > 1) {
    ParseError("Invalid forward value in SUBSCRIBE");
    return 0;
  }
  subscribe.forward = (forward == 1);
  subscribe.filter_type = static_cast<MoqtFilterType>(filter);
  switch (subscribe.filter_type) {
    case MoqtFilterType::kNextGroupStart:
    case MoqtFilterType::kLatestObject:
      break;
    case MoqtFilterType::kAbsoluteStart:
    case MoqtFilterType::kAbsoluteRange:
      if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) {
        return 0;
      }
      subscribe.start = Location(group, object);
      if (subscribe.filter_type == MoqtFilterType::kAbsoluteStart) {
        break;
      }
      if (!reader.ReadVarInt62(&group)) {
        return 0;
      }
      subscribe.end_group = group;
      if (*subscribe.end_group < subscribe.start->group) {
        ParseError("End group is less than start group");
        return 0;
      }
      break;
    default:
      ParseError("Invalid filter type");
      return 0;
  }
  KeyValuePairList parameters;
  if (!ParseKeyValuePairList(reader, parameters)) {
    return 0;
  }
  if (!ValidateVersionSpecificParameters(parameters,
                                         MoqtMessageType::kSubscribe)) {
    ParseError("SUBSCRIBE contains invalid parameters");
    return 0;
  }
  if (!KeyValuePairListToVersionSpecificParameters(parameters,
                                                   subscribe.parameters)) {
    return 0;
  }
  visitor_.OnSubscribeMessage(subscribe);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessSubscribeOk(quic::QuicDataReader& reader) {
  MoqtSubscribeOk subscribe_ok;
  uint64_t milliseconds;
  uint8_t group_order;
  uint8_t content_exists;
  if (!reader.ReadVarInt62(&subscribe_ok.subscribe_id) ||
      !reader.ReadVarInt62(&milliseconds) || !reader.ReadUInt8(&group_order) ||
      !reader.ReadUInt8(&content_exists)) {
    return 0;
  }
  if (content_exists > 1) {
    ParseError("SUBSCRIBE_OK ContentExists has invalid value");
    return 0;
  }
  if (group_order != 0x01 && group_order != 0x02) {
    ParseError("Invalid group order value in SUBSCRIBE_OK");
    return 0;
  }
  subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds);
  subscribe_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
  if (content_exists) {
    subscribe_ok.largest_id = Location();
    if (!reader.ReadVarInt62(&subscribe_ok.largest_id->group) ||
        !reader.ReadVarInt62(&subscribe_ok.largest_id->object)) {
      return 0;
    }
  }
  KeyValuePairList parameters;
  if (!ParseKeyValuePairList(reader, parameters)) {
    return 0;
  }
  if (!ValidateVersionSpecificParameters(parameters,
                                         MoqtMessageType::kSubscribeOk)) {
    ParseError("SUBSCRIBE_OK contains invalid parameters");
    return 0;
  }
  if (!KeyValuePairListToVersionSpecificParameters(parameters,
                                                   subscribe_ok.parameters)) {
    return 0;
  }
  visitor_.OnSubscribeOkMessage(subscribe_ok);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessSubscribeError(quic::QuicDataReader& reader) {
  MoqtSubscribeError subscribe_error;
  uint64_t error_code;
  if (!reader.ReadVarInt62(&subscribe_error.subscribe_id) ||
      !reader.ReadVarInt62(&error_code) ||
      !reader.ReadStringVarInt62(subscribe_error.reason_phrase) ||
      !reader.ReadVarInt62(&subscribe_error.track_alias)) {
    return 0;
  }
  subscribe_error.error_code = static_cast<SubscribeErrorCode>(error_code);
  visitor_.OnSubscribeErrorMessage(subscribe_error);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessUnsubscribe(quic::QuicDataReader& reader) {
  MoqtUnsubscribe unsubscribe;
  if (!reader.ReadVarInt62(&unsubscribe.subscribe_id)) {
    return 0;
  }
  visitor_.OnUnsubscribeMessage(unsubscribe);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessSubscribeDone(quic::QuicDataReader& reader) {
  MoqtSubscribeDone subscribe_done;
  uint64_t value;
  if (!reader.ReadVarInt62(&subscribe_done.subscribe_id) ||
      !reader.ReadVarInt62(&value) ||
      !reader.ReadVarInt62(&subscribe_done.stream_count) ||
      !reader.ReadStringVarInt62(subscribe_done.reason_phrase)) {
    return 0;
  }
  subscribe_done.status_code = static_cast<SubscribeDoneCode>(value);
  visitor_.OnSubscribeDoneMessage(subscribe_done);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessSubscribeUpdate(quic::QuicDataReader& reader) {
  MoqtSubscribeUpdate subscribe_update;
  uint64_t start_group, start_object, end_group;
  if (!reader.ReadVarInt62(&subscribe_update.subscribe_id) ||
      !reader.ReadVarInt62(&start_group) ||
      !reader.ReadVarInt62(&start_object) || !reader.ReadVarInt62(&end_group) ||
      !reader.ReadUInt8(&subscribe_update.subscriber_priority)) {
    return 0;
  }
  KeyValuePairList parameters;
  if (!ParseKeyValuePairList(reader, parameters)) {
    return 0;
  }
  if (!ValidateVersionSpecificParameters(parameters,
                                         MoqtMessageType::kSubscribeUpdate)) {
    ParseError("SUBSCRIBE_UPDATE contains invalid parameters");
    return 0;
  }
  if (!KeyValuePairListToVersionSpecificParameters(
          parameters, subscribe_update.parameters)) {
    return 0;
  }
  subscribe_update.start = Location(start_group, start_object);
  if (end_group > 0) {
    subscribe_update.end_group = end_group - 1;
    if (subscribe_update.end_group < start_group) {
      ParseError("End group is less than start group");
      return 0;
    }
  }
  visitor_.OnSubscribeUpdateMessage(subscribe_update);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessAnnounce(quic::QuicDataReader& reader) {
  MoqtAnnounce announce;
  if (!ReadTrackNamespace(reader, announce.track_namespace)) {
    return 0;
  }
  KeyValuePairList parameters;
  if (!ParseKeyValuePairList(reader, parameters)) {
    return 0;
  }
  if (!ValidateVersionSpecificParameters(parameters,
                                         MoqtMessageType::kAnnounce)) {
    ParseError("ANNOUNCE contains invalid parameters");
    return 0;
  }
  if (!KeyValuePairListToVersionSpecificParameters(parameters,
                                                   announce.parameters)) {
    return 0;
  }
  visitor_.OnAnnounceMessage(announce);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessAnnounceOk(quic::QuicDataReader& reader) {
  MoqtAnnounceOk announce_ok;
  if (!ReadTrackNamespace(reader, announce_ok.track_namespace)) {
    return 0;
  }
  visitor_.OnAnnounceOkMessage(announce_ok);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessAnnounceError(quic::QuicDataReader& reader) {
  MoqtAnnounceError announce_error;
  if (!ReadTrackNamespace(reader, announce_error.track_namespace)) {
    return 0;
  }
  uint64_t error_code;
  if (!reader.ReadVarInt62(&error_code) ||
      !reader.ReadStringVarInt62(announce_error.reason_phrase)) {
    return 0;
  }
  announce_error.error_code = static_cast<SubscribeErrorCode>(error_code);
  visitor_.OnAnnounceErrorMessage(announce_error);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessAnnounceCancel(quic::QuicDataReader& reader) {
  MoqtAnnounceCancel announce_cancel;
  if (!ReadTrackNamespace(reader, announce_cancel.track_namespace)) {
    return 0;
  }
  uint64_t error_code;
  if (!reader.ReadVarInt62(&error_code) ||
      !reader.ReadStringVarInt62(announce_cancel.reason_phrase)) {
    return 0;
  }
  announce_cancel.error_code = static_cast<SubscribeErrorCode>(error_code);
  visitor_.OnAnnounceCancelMessage(announce_cancel);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessTrackStatusRequest(
    quic::QuicDataReader& reader) {
  MoqtTrackStatusRequest track_status_request;
  if (!ReadTrackNamespace(reader, track_status_request.full_track_name)) {
    return 0;
  }
  absl::string_view name;
  if (!reader.ReadStringPieceVarInt62(&name)) {
    return 0;
  }
  track_status_request.full_track_name.AddElement(name);
  KeyValuePairList parameters;
  if (!ParseKeyValuePairList(reader, parameters)) {
    return 0;
  }
  if (!ValidateVersionSpecificParameters(
          parameters, MoqtMessageType::kTrackStatusRequest)) {
    ParseError("TRACK_STATUS_REQUEST message contains invalid parameters");
    return 0;
  }
  if (!KeyValuePairListToVersionSpecificParameters(
          parameters, track_status_request.parameters)) {
    return 0;
  }
  visitor_.OnTrackStatusRequestMessage(track_status_request);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessUnannounce(quic::QuicDataReader& reader) {
  MoqtUnannounce unannounce;
  if (!ReadTrackNamespace(reader, unannounce.track_namespace)) {
    return 0;
  }
  visitor_.OnUnannounceMessage(unannounce);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessTrackStatus(quic::QuicDataReader& reader) {
  MoqtTrackStatus track_status;
  if (!ReadTrackNamespace(reader, track_status.full_track_name)) {
    return 0;
  }
  absl::string_view name;
  if (!reader.ReadStringPieceVarInt62(&name)) {
    return 0;
  }
  track_status.full_track_name.AddElement(name);
  uint64_t value;
  if (!reader.ReadVarInt62(&value) ||
      !reader.ReadVarInt62(&track_status.last_group) ||
      !reader.ReadVarInt62(&track_status.last_object)) {
    return 0;
  }
  track_status.status_code = static_cast<MoqtTrackStatusCode>(value);
  KeyValuePairList parameters;
  if (!ParseKeyValuePairList(reader, parameters)) {
    return 0;
  }
  if (!ValidateVersionSpecificParameters(parameters,
                                         MoqtMessageType::kTrackStatus)) {
    ParseError("TRACK_STATUS message contains invalid parameters");
    return 0;
  }
  if (!KeyValuePairListToVersionSpecificParameters(parameters,
                                                   track_status.parameters)) {
    return 0;
  }
  visitor_.OnTrackStatusMessage(track_status);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessGoAway(quic::QuicDataReader& reader) {
  MoqtGoAway goaway;
  if (!reader.ReadStringVarInt62(goaway.new_session_uri)) {
    return 0;
  }
  visitor_.OnGoAwayMessage(goaway);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessSubscribeAnnounces(
    quic::QuicDataReader& reader) {
  MoqtSubscribeAnnounces subscribe_announces;
  if (!ReadTrackNamespace(reader, subscribe_announces.track_namespace)) {
    return 0;
  }
  KeyValuePairList parameters;
  if (!ParseKeyValuePairList(reader, parameters)) {
    return 0;
  }
  if (!ValidateVersionSpecificParameters(
          parameters, MoqtMessageType::kSubscribeAnnounces)) {
    ParseError("SUBSCRIBE_ANNOUNCES message contains invalid parameters");
    return 0;
  }
  if (!KeyValuePairListToVersionSpecificParameters(
          parameters, subscribe_announces.parameters)) {
    return 0;
  }
  visitor_.OnSubscribeAnnouncesMessage(subscribe_announces);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessSubscribeAnnouncesOk(
    quic::QuicDataReader& reader) {
  MoqtSubscribeAnnouncesOk subscribe_namespace_ok;
  if (!ReadTrackNamespace(reader, subscribe_namespace_ok.track_namespace)) {
    return 0;
  }
  visitor_.OnSubscribeAnnouncesOkMessage(subscribe_namespace_ok);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessSubscribeAnnouncesError(
    quic::QuicDataReader& reader) {
  MoqtSubscribeAnnouncesError subscribe_namespace_error;
  uint64_t error_code;
  if (!ReadTrackNamespace(reader, subscribe_namespace_error.track_namespace) ||
      !reader.ReadVarInt62(&error_code) ||
      !reader.ReadStringVarInt62(subscribe_namespace_error.reason_phrase)) {
    return 0;
  }
  subscribe_namespace_error.error_code =
      static_cast<SubscribeErrorCode>(error_code);
  visitor_.OnSubscribeAnnouncesErrorMessage(subscribe_namespace_error);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessUnsubscribeAnnounces(
    quic::QuicDataReader& reader) {
  MoqtUnsubscribeAnnounces unsubscribe_namespace;
  if (!ReadTrackNamespace(reader, unsubscribe_namespace.track_namespace)) {
    return 0;
  }
  visitor_.OnUnsubscribeAnnouncesMessage(unsubscribe_namespace);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessMaxRequestId(quic::QuicDataReader& reader) {
  MoqtMaxRequestId max_request_id;
  if (!reader.ReadVarInt62(&max_request_id.max_request_id)) {
    return 0;
  }
  visitor_.OnMaxRequestIdMessage(max_request_id);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessFetch(quic::QuicDataReader& reader) {
  MoqtFetch fetch;
  absl::string_view track_name;
  uint8_t group_order;
  uint64_t end_object;
  uint64_t type;
  if (!reader.ReadVarInt62(&fetch.fetch_id) ||
      !reader.ReadUInt8(&fetch.subscriber_priority) ||
      !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&type)) {
    return 0;
  }
  if (!ParseDeliveryOrder(group_order, fetch.group_order)) {
    ParseError("Invalid group order value in FETCH message");
    return 0;
  }
  switch (static_cast<FetchType>(type)) {
    case FetchType::kJoining: {
      uint64_t joining_subscribe_id;
      uint64_t preceding_group_offset;
      if (!reader.ReadVarInt62(&joining_subscribe_id) ||
          !reader.ReadVarInt62(&preceding_group_offset)) {
        return 0;
      }
      fetch.joining_fetch =
          JoiningFetch{joining_subscribe_id, preceding_group_offset};
      break;
    }
    case FetchType::kStandalone: {
      fetch.joining_fetch = std::nullopt;
      if (!ReadTrackNamespace(reader, fetch.full_track_name) ||
          !reader.ReadStringPieceVarInt62(&track_name) ||
          !reader.ReadVarInt62(&fetch.start_object.group) ||
          !reader.ReadVarInt62(&fetch.start_object.object) ||
          !reader.ReadVarInt62(&fetch.end_group) ||
          !reader.ReadVarInt62(&end_object)) {
        return 0;
      }
      // Elements that have to be translated from the literal value.
      fetch.full_track_name.AddElement(track_name);
      fetch.end_object =
          end_object == 0 ? std::optional<uint64_t>() : (end_object - 1);
      if (fetch.end_group < fetch.start_object.group ||
          (fetch.end_group == fetch.start_object.group &&
           fetch.end_object.has_value() &&
           *fetch.end_object < fetch.start_object.object)) {
        ParseError("End object comes before start object in FETCH");
        return 0;
      }
      break;
    }
    default:
      ParseError("Invalid FETCH type");
      return 0;
  }
  KeyValuePairList parameters;
  if (!ParseKeyValuePairList(reader, parameters)) {
    return 0;
  }
  if (!ValidateVersionSpecificParameters(parameters, MoqtMessageType::kFetch)) {
    ParseError("FETCH message contains invalid parameters");
    return 0;
  }
  if (!KeyValuePairListToVersionSpecificParameters(parameters,
                                                   fetch.parameters)) {
    return 0;
  };
  visitor_.OnFetchMessage(fetch);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) {
  MoqtFetchCancel fetch_cancel;
  if (!reader.ReadVarInt62(&fetch_cancel.subscribe_id)) {
    return 0;
  }
  visitor_.OnFetchCancelMessage(fetch_cancel);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) {
  MoqtFetchOk fetch_ok;
  uint8_t group_order;
  KeyValuePairList parameters;
  if (!reader.ReadVarInt62(&fetch_ok.subscribe_id) ||
      !reader.ReadUInt8(&group_order) ||
      !reader.ReadVarInt62(&fetch_ok.largest_id.group) ||
      !reader.ReadVarInt62(&fetch_ok.largest_id.object) ||
      !ParseKeyValuePairList(reader, parameters)) {
    return 0;
  }
  if (group_order != 0x01 && group_order != 0x02) {
    ParseError("Invalid group order value in FETCH_OK");
    return 0;
  }
  if (!ValidateVersionSpecificParameters(parameters,
                                         MoqtMessageType::kFetchOk)) {
    ParseError("FETCH_OK message contains invalid parameters");
    return 0;
  }
  fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
  if (!KeyValuePairListToVersionSpecificParameters(parameters,
                                                   fetch_ok.parameters)) {
    return 0;
  }
  visitor_.OnFetchOkMessage(fetch_ok);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessFetchError(quic::QuicDataReader& reader) {
  MoqtFetchError fetch_error;
  uint64_t error_code;
  if (!reader.ReadVarInt62(&fetch_error.subscribe_id) ||
      !reader.ReadVarInt62(&error_code) ||
      !reader.ReadStringVarInt62(fetch_error.reason_phrase)) {
    return 0;
  }
  fetch_error.error_code = static_cast<SubscribeErrorCode>(error_code);
  visitor_.OnFetchErrorMessage(fetch_error);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessRequestsBlocked(quic::QuicDataReader& reader) {
  MoqtRequestsBlocked requests_blocked;
  if (!reader.ReadVarInt62(&requests_blocked.max_request_id)) {
    return 0;
  }
  visitor_.OnRequestsBlockedMessage(requests_blocked);
  return reader.PreviouslyReadPayload().length();
}

size_t MoqtControlParser::ProcessObjectAck(quic::QuicDataReader& reader) {
  MoqtObjectAck object_ack;
  uint64_t raw_delta;
  if (!reader.ReadVarInt62(&object_ack.subscribe_id) ||
      !reader.ReadVarInt62(&object_ack.group_id) ||
      !reader.ReadVarInt62(&object_ack.object_id) ||
      !reader.ReadVarInt62(&raw_delta)) {
    return 0;
  }
  object_ack.delta_from_deadline = quic::QuicTimeDelta::FromMicroseconds(
      SignedVarintUnserializedForm(raw_delta));
  visitor_.OnObjectAckMessage(object_ack);
  return reader.PreviouslyReadPayload().length();
}

void MoqtControlParser::ParseError(absl::string_view reason) {
  ParseError(MoqtError::kProtocolViolation, reason);
}

void MoqtControlParser::ParseError(MoqtError error_code,
                                   absl::string_view reason) {
  if (parsing_error_) {
    return;  // Don't send multiple parse errors.
  }
  no_more_data_ = true;
  parsing_error_ = true;
  visitor_.OnParsingError(error_code, reason);
}

bool MoqtControlParser::ReadTrackNamespace(quic::QuicDataReader& reader,
                                           FullTrackName& full_track_name) {
  QUICHE_DCHECK(full_track_name.empty());
  uint64_t num_elements;
  if (!reader.ReadVarInt62(&num_elements)) {
    return false;
  }
  if (num_elements == 0 || num_elements > kMaxNamespaceElements) {
    ParseError(MoqtError::kProtocolViolation,
               "Invalid number of namespace elements");
    return false;
  }
  for (uint64_t i = 0; i < num_elements; ++i) {
    absl::string_view element;
    if (!reader.ReadStringPieceVarInt62(&element)) {
      return false;
    }
    full_track_name.AddElement(element);
  }
  return true;
}

// Returns false if there is a protocol violation.
bool MoqtControlParser::KeyValuePairListToVersionSpecificParameters(
    const KeyValuePairList& parameters, VersionSpecificParameters& out) {
  return parameters.ForEach(
      [&](uint64_t key, uint64_t value) {
        VersionSpecificParameter parameter =
            static_cast<VersionSpecificParameter>(key);
        switch (parameter) {
          case VersionSpecificParameter::kDeliveryTimeout:
            out.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(value);
            break;
          case VersionSpecificParameter::kMaxCacheDuration:
            out.max_cache_duration =
                quic::QuicTimeDelta::FromMilliseconds(value);
            break;
          case VersionSpecificParameter::kOackWindowSize:
            out.oack_window_size = quic::QuicTimeDelta::FromMicroseconds(value);
            break;
          default:
            break;
        }
        return true;
      },
      [&](uint64_t key, absl::string_view value) {
        VersionSpecificParameter parameter =
            static_cast<VersionSpecificParameter>(key);
        switch (parameter) {
          case VersionSpecificParameter::kAuthorizationToken:
            if (!ParseAuthTokenParameter(value, out)) {
              return false;
            }
            break;
          default:
            break;
        }
        return true;
      });
}

bool MoqtControlParser::ParseAuthTokenParameter(
    absl::string_view field, VersionSpecificParameters& out) {
  quic::QuicDataReader reader(field);
  AuthTokenType token_type;
  absl::string_view token;
  uint64_t value;
  if (!reader.ReadVarInt62(&value) || value > AuthTokenAliasType::kMaxValue) {
    ParseError(MoqtError::kKeyValueFormattingError,
               "Invalid Authorization Token Alias type");
    return false;
  }
  AuthTokenAliasType alias_type = static_cast<AuthTokenAliasType>(value);
  switch (alias_type) {
    case AuthTokenAliasType::kUseValue:
      if (!reader.ReadVarInt62(&value)) {
        ParseError(MoqtError::kKeyValueFormattingError,
                   "Malformed Authorization Token Parameter");
        return false;
      }
      if (value > AuthTokenType::kMaxAuthTokenType) {
        ParseError(MoqtError::kKeyValueFormattingError,
                   "Invalid Authorization Token Type");
        return false;
      }
      token_type = static_cast<AuthTokenType>(value);
      token = reader.PeekRemainingPayload();
      break;
    case AuthTokenAliasType::kUseAlias:
      if (!reader.ReadVarInt62(&value)) {
        ParseError(MoqtError::kKeyValueFormattingError,
                   "Malformed Authorization Token Parameter");
        return false;
      }
      // TODO: Implement support for cache_size > 0
      ParseError(MoqtError::kKeyValueFormattingError,
                 "Unknown Auth Token Alias");
      return false;
    case AuthTokenAliasType::kRegister:
      if (!reader.ReadVarInt62(&value)) {
        ParseError(MoqtError::kKeyValueFormattingError,
                   "Malformed Authorization Token Parameter");
        return false;
      }
      if (!reader.ReadVarInt62(&value)) {
        ParseError(MoqtError::kKeyValueFormattingError,
                   "Malformed Authorization Token Parameter");
        return false;
      }
      token_type = static_cast<AuthTokenType>(value);
      token = reader.PeekRemainingPayload();
      if (auth_token_cache_size_ + sizeof(uint64_t) + token.length() >
          max_auth_token_cache_size_) {
        ParseError(MoqtError::kAuthTokenCacheOverflow,
                   "Too many authorization token tags");
        return false;
      }
      break;
      // TODO: Add to the cache.
      // TODO: Check if the alias is already in use.
      QUICHE_NOTREACHED();
      break;
    case AuthTokenAliasType::kDelete:
      if (!reader.ReadVarInt62(&value)) {
        ParseError(MoqtError::kKeyValueFormattingError,
                   "Malformed Authorization Token Parameter");
        return false;
      }
      // TODO: Implement support for cache_size > 0
      ParseError(MoqtError::kKeyValueFormattingError,
                 "Unknown Auth Token Alias");
      return false;
  }
  // Validate cache operations.
  out.authorization_token.push_back(AuthToken(token_type, token));
  return true;
}

void MoqtDataParser::ParseError(absl::string_view reason) {
  if (parsing_error_) {
    return;  // Don't send multiple parse errors.
  }
  next_input_ = kFailed;
  no_more_data_ = true;
  parsing_error_ = true;
  visitor_.OnParsingError(MoqtError::kProtocolViolation, reason);
}

std::optional<absl::string_view> ParseDatagram(absl::string_view data,
                                               MoqtObject& object_metadata) {
  uint64_t type_raw, object_status_raw;
  absl::string_view extensions;
  quic::QuicDataReader reader(data);
  if (!reader.ReadVarInt62(&type_raw) ||
      !reader.ReadVarInt62(&object_metadata.track_alias) ||
      !reader.ReadVarInt62(&object_metadata.group_id) ||
      !reader.ReadVarInt62(&object_metadata.object_id) ||
      !reader.ReadUInt8(&object_metadata.publisher_priority) ||
      !reader.ReadStringPieceVarInt62(&extensions)) {
    return std::nullopt;
  }
  object_metadata.extension_headers = std::string(extensions);
  if (static_cast<MoqtDatagramType>(type_raw) ==
      MoqtDatagramType::kObjectStatus) {
    object_metadata.payload_length = 0;
    if (!reader.ReadVarInt62(&object_status_raw)) {
      return std::nullopt;
    }
    object_metadata.object_status = IntegerToObjectStatus(object_status_raw);
    return "";
  }

  absl::string_view payload;
  if (!reader.ReadStringPieceVarInt62(&payload)) {
    return std::nullopt;
  }
  object_metadata.object_status = MoqtObjectStatus::kNormal;
  object_metadata.payload_length = payload.length();
  return payload;
}

void MoqtDataParser::ReadDataUntil(StopCondition stop_condition) {
  if (processing_) {
    QUICHE_BUG(MoqtDataParser_reentry)
        << "Calling ProcessData() when ProcessData() is already in progress.";
    return;
  }
  processing_ = true;
  auto on_return = absl::MakeCleanup([&] { processing_ = false; });

  State last_state = state();
  for (;;) {
    ParseNextItemFromStream();
    if (state() == last_state || no_more_data_ || stop_condition()) {
      break;
    }
    last_state = state();
  }
}

std::optional<uint64_t> MoqtDataParser::ReadVarInt62NoFin() {
  bool fin_read = false;
  std::optional<uint64_t> result = ReadVarInt62FromStream(stream_, fin_read);
  if (fin_read) {
    ParseError("Unexpected FIN received in the middle of a header");
    return std::nullopt;
  }
  return result;
}

std::optional<uint8_t> MoqtDataParser::ReadUint8NoFin() {
  char buffer[1];
  quiche::ReadStream::ReadResult read_result =
      stream_.Read(absl::MakeSpan(buffer));
  if (read_result.fin) {
    ParseError("Unexpected FIN received in the middle of a header");
    return std::nullopt;
  }
  if (read_result.bytes_read == 0) {
    return std::nullopt;
  }
  return absl::bit_cast<uint8_t>(buffer[0]);
}

void MoqtDataParser::AdvanceParserState() {
  QUICHE_DCHECK(type_ == MoqtDataStreamType::kStreamHeaderSubgroup ||
                type_ == MoqtDataStreamType::kStreamHeaderFetch);
  const bool is_fetch = type_ == MoqtDataStreamType::kStreamHeaderFetch;
  switch (next_input_) {
    // The state table is factored into a separate function (rather than
    // inlined) in order to separate the order of elements from the way they are
    // parsed.
    case kStreamType:
      next_input_ = kTrackAlias;
      break;
    case kTrackAlias:
      next_input_ = kGroupId;
      break;
    case kGroupId:
      next_input_ = kSubgroupId;
      break;
    case kSubgroupId:
      next_input_ = is_fetch ? kObjectId : kPublisherPriority;
      break;
    case kPublisherPriority:
      next_input_ = is_fetch ? kExtensionSize : kObjectId;
      break;
    case kObjectId:
      next_input_ = is_fetch ? kPublisherPriority : kExtensionSize;
      break;
    case kExtensionBody:
      next_input_ = kObjectPayloadLength;
      break;
    case kStatus:
    case kData:
      next_input_ = is_fetch ? kGroupId : kObjectId;
      break;

    case kExtensionSize:        // Either kExtensionBody or
                                // kObjectPayloadLength.
    case kObjectPayloadLength:  // Either kStatus or kData depending on length.
    case kPadding:              // Handled separately.
    case kFailed:               // Should cause parsing to cease.
      QUICHE_NOTREACHED();
      break;
  }
}

void MoqtDataParser::ParseNextItemFromStream() {
  if (CheckForFinWithoutData()) {
    return;
  }
  switch (next_input_) {
    case kStreamType: {
      std::optional<uint64_t> value_read = ReadVarInt62NoFin();
      if (value_read.has_value()) {
        if (!IsAllowedStreamType(*value_read)) {
          ParseError("Invalid stream type supplied");
          return;
        }
        type_ = static_cast<MoqtDataStreamType>(*value_read);
        switch (*type_) {
          case MoqtDataStreamType::kStreamHeaderSubgroup:
          case MoqtDataStreamType::kStreamHeaderFetch:
            AdvanceParserState();
            break;
          case MoqtDataStreamType::kPadding:
            next_input_ = kPadding;
            break;
        }
      }
      return;
    }

    case kTrackAlias: {
      std::optional<uint64_t> value_read = ReadVarInt62NoFin();
      if (value_read.has_value()) {
        metadata_.track_alias = *value_read;
        AdvanceParserState();
      }
      return;
    }

    case kGroupId: {
      std::optional<uint64_t> value_read = ReadVarInt62NoFin();
      if (value_read.has_value()) {
        metadata_.group_id = *value_read;
        AdvanceParserState();
      }
      return;
    }

    case kSubgroupId: {
      std::optional<uint64_t> value_read = ReadVarInt62NoFin();
      if (value_read.has_value()) {
        metadata_.subgroup_id = *value_read;
        AdvanceParserState();
      }
      return;
    }

    case kPublisherPriority: {
      std::optional<uint8_t> value_read = ReadUint8NoFin();
      if (value_read.has_value()) {
        metadata_.publisher_priority = *value_read;
        AdvanceParserState();
      }
      return;
    }

    case kObjectId: {
      std::optional<uint64_t> value_read = ReadVarInt62NoFin();
      if (value_read.has_value()) {
        metadata_.object_id = *value_read;
        AdvanceParserState();
      }
      return;
    }

    case kExtensionSize: {
      std::optional<uint64_t> value_read = ReadVarInt62NoFin();
      if (value_read.has_value()) {
        metadata_.extension_headers.clear();
        payload_length_remaining_ = *value_read;
        next_input_ = (value_read == 0) ? kObjectPayloadLength : kExtensionBody;
      }
      return;
    }

    case kObjectPayloadLength: {
      std::optional<uint64_t> value_read = ReadVarInt62NoFin();
      if (value_read.has_value()) {
        metadata_.payload_length = *value_read;
        payload_length_remaining_ = *value_read;
        if (metadata_.payload_length > 0) {
          metadata_.object_status = MoqtObjectStatus::kNormal;
          next_input_ = kData;
        } else {
          next_input_ = kStatus;
        }
      }
      return;
    }

    case kStatus: {
      bool fin_read = false;
      std::optional<uint64_t> value_read =
          ReadVarInt62FromStream(stream_, fin_read);
      if (value_read.has_value()) {
        metadata_.object_status = IntegerToObjectStatus(*value_read);
        if (metadata_.object_status == MoqtObjectStatus::kInvalidObjectStatus) {
          ParseError("Invalid object status provided");
          return;
        }

        ++num_objects_read_;
        visitor_.OnObjectMessage(metadata_, "", /*end_of_message=*/true);
        AdvanceParserState();
      }
      if (fin_read) {
        no_more_data_ = true;
        return;
      }
      return;
    }

    case kExtensionBody:
    case kData: {
      while (payload_length_remaining_ > 0) {
        quiche::ReadStream::PeekResult peek_result =
            stream_.PeekNextReadableRegion();
        if (!peek_result.has_data()) {
          return;
        }
        if (peek_result.fin_next && payload_length_remaining_ > 0) {
          ParseError("FIN received at an unexpected point in the stream");
          return;
        }

        size_t chunk_size =
            std::min(payload_length_remaining_, peek_result.peeked_data.size());
        payload_length_remaining_ -= chunk_size;
        bool done = payload_length_remaining_ == 0;
        if (next_input_ == kData) {
          visitor_.OnObjectMessage(
              metadata_, peek_result.peeked_data.substr(0, chunk_size), done);
          const bool fin = stream_.SkipBytes(chunk_size);
          if (done) {
            ++num_objects_read_;
            no_more_data_ |= fin;
            AdvanceParserState();
          }
        } else {
          absl::StrAppend(&metadata_.extension_headers,
                          peek_result.peeked_data.substr(0, chunk_size));
          if (stream_.SkipBytes(chunk_size)) {
            ParseError("FIN received at an unexpected point in the stream");
            return;
          }
          if (done) {
            AdvanceParserState();
          }
        }
      }
      return;
    }

    case kPadding:
      no_more_data_ |= stream_.SkipBytes(stream_.ReadableBytes());
      return;

    case kFailed:
      return;
  }
}

void MoqtDataParser::ReadAllData() {
  ReadDataUntil(+[]() { return false; });
}

void MoqtDataParser::ReadStreamType() {
  return ReadDataUntil([this]() { return type_.has_value(); });
}

void MoqtDataParser::ReadTrackAlias() {
  return ReadDataUntil(
      [this]() { return type_.has_value() && next_input_ != kTrackAlias; });
}

void MoqtDataParser::ReadAtMostOneObject() {
  const size_t num_objects_read_initial = num_objects_read_;
  return ReadDataUntil(
      [&]() { return num_objects_read_ != num_objects_read_initial; });
}

bool MoqtDataParser::CheckForFinWithoutData() {
  if (!stream_.PeekNextReadableRegion().fin_next) {
    return false;
  }
  const bool valid_state =
      (type_ == MoqtDataStreamType::kStreamHeaderSubgroup &&
       next_input_ == kObjectId) ||
      (type_ == MoqtDataStreamType::kStreamHeaderFetch &&
       next_input_ == kGroupId);
  if (!valid_state || num_objects_read_ == 0) {
    ParseError("FIN received at an unexpected point in the stream");
    return true;
  }
  return stream_.SkipBytes(0);
}

}  // namespace moqt
