Update MoQT FETCH* messages to draft-11. (except FETCH format change, already done) Add a field to FETCH_OK, rename other fields. PiperOrigin-RevId: 780281131
diff --git a/quiche/quic/moqt/moqt_failed_fetch.h b/quiche/quic/moqt/moqt_failed_fetch.h index 154c6fc..c971897 100644 --- a/quiche/quic/moqt/moqt_failed_fetch.h +++ b/quiche/quic/moqt/moqt_failed_fetch.h
@@ -26,9 +26,9 @@ ObjectsAvailableCallback /*callback*/) override {} void SetFetchResponseCallback(FetchResponseCallback callback) { MoqtFetchError error; - error.subscribe_id = 0; + error.request_id = 0; error.error_code = StatusToRequestErrorCode(status_); - error.reason_phrase = status_.message(); + error.error_reason = status_.message(); std::move(callback)(error); }
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 842a356..db73875 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -699,7 +699,7 @@ const StandaloneFetch& standalone_fetch = std::get<StandaloneFetch>(message.fetch); return SerializeControlMessage( - MoqtMessageType::kFetch, WireVarInt62(message.fetch_id), + MoqtMessageType::kFetch, WireVarInt62(message.request_id), WireUint8(message.subscriber_priority), WireDeliveryOrder(message.group_order), WireVarInt62(FetchType::kStandalone), @@ -726,19 +726,13 @@ joining_start = joining_fetch.joining_start; } return SerializeControlMessage( - MoqtMessageType::kFetch, WireVarInt62(message.fetch_id), + MoqtMessageType::kFetch, WireVarInt62(message.request_id), WireUint8(message.subscriber_priority), WireDeliveryOrder(message.group_order), WireVarInt62(message.fetch.index() + 1), WireVarInt62(subscribe_id), WireVarInt62(joining_start), WireKeyValuePairList(parameters)); } -quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel( - const MoqtFetchCancel& message) { - return SerializeControlMessage(MoqtMessageType::kFetchCancel, - WireVarInt62(message.subscribe_id)); -} - quiche::QuicheBuffer MoqtFramer::SerializeFetchOk(const MoqtFetchOk& message) { KeyValuePairList parameters; VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); @@ -748,20 +742,26 @@ << "Serializing invalid MoQT parameters"; return quiche::QuicheBuffer(); } - return SerializeControlMessage(MoqtMessageType::kFetchOk, - WireVarInt62(message.subscribe_id), - WireDeliveryOrder(message.group_order), - WireVarInt62(message.largest_id.group), - WireVarInt62(message.largest_id.object), - WireKeyValuePairList(parameters)); + return SerializeControlMessage( + MoqtMessageType::kFetchOk, WireVarInt62(message.request_id), + WireDeliveryOrder(message.group_order), WireBoolean(message.end_of_track), + WireVarInt62(message.end_location.group), + WireVarInt62(message.end_location.object), + WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeFetchError( const MoqtFetchError& message) { return SerializeControlMessage( - MoqtMessageType::kFetchError, WireVarInt62(message.subscribe_id), + MoqtMessageType::kFetchError, WireVarInt62(message.request_id), WireVarInt62(message.error_code), - WireStringWithVarInt62Length(message.reason_phrase)); + WireStringWithVarInt62Length(message.error_reason)); +} + +quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel( + const MoqtFetchCancel& message) { + return SerializeControlMessage(MoqtMessageType::kFetchCancel, + WireVarInt62(message.request_id)); } quiche::QuicheBuffer MoqtFramer::SerializeRequestsBlocked(
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 43d4ba6..d6d27fd 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -766,7 +766,7 @@ }; struct QUICHE_EXPORT MoqtFetch { - uint64_t fetch_id; + uint64_t request_id; MoqtPriority subscriber_priority; std::optional<MoqtDeliveryOrder> group_order; std::variant<StandaloneFetch, JoiningFetchRelative, JoiningFetchAbsolute> @@ -774,21 +774,22 @@ VersionSpecificParameters parameters; }; -struct QUICHE_EXPORT MoqtFetchCancel { - uint64_t subscribe_id; -}; - struct QUICHE_EXPORT MoqtFetchOk { - uint64_t subscribe_id; + uint64_t request_id; MoqtDeliveryOrder group_order; - Location largest_id; + bool end_of_track; + Location end_location; VersionSpecificParameters parameters; }; struct QUICHE_EXPORT MoqtFetchError { - uint64_t subscribe_id; + uint64_t request_id; RequestErrorCode error_code; - std::string reason_phrase; + std::string error_reason; +}; + +struct QUICHE_EXPORT MoqtFetchCancel { + uint64_t request_id; }; struct QUICHE_EXPORT MoqtRequestsBlocked {
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h index f9a1361..c61b220 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.h +++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -123,7 +123,7 @@ MoqtFetchError error(0, StatusToRequestErrorCode(status_), std::string(status_.message())); error.error_code = StatusToRequestErrorCode(status_); - error.reason_phrase = status_.message(); + error.error_reason = status_.message(); std::move(callback)(error); return; } @@ -135,11 +135,13 @@ } MoqtFetchOk ok; ok.group_order = MoqtDeliveryOrder::kAscending; - ok.largest_id = *(objects_.crbegin()); - if (objects_.size() > 1 && *(objects_.cbegin()) > ok.largest_id) { + ok.end_location = *(objects_.crbegin()); + if (objects_.size() > 1 && *(objects_.cbegin()) > ok.end_location) { ok.group_order = MoqtDeliveryOrder::kDescending; - ok.largest_id = *(objects_.cbegin()); + ok.end_location = *(objects_.cbegin()); } + ok.end_of_track = + queue_->closed_ && ok.end_location == queue_->GetLargestLocation(); std::move(callback)(ok); }
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc index 46e0d19..5f677b8 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -9,6 +9,7 @@ #include <optional> #include <string> #include <utility> +#include <variant> #include <vector> #include "absl/status/status.h" @@ -24,6 +25,7 @@ #include "quiche/common/platform/api/quiche_expect_bug.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/platform/api/quiche_test.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/test_tools/quiche_test_utils.h" #include "quiche/web_transport/web_transport.h" @@ -35,6 +37,7 @@ using ::quiche::test::StatusIs; using ::testing::AnyOf; using ::testing::ElementsAre; +using ::testing::Field; using ::testing::IsEmpty; class TestMoqtOutgoingQueue : public MoqtOutgoingQueue, @@ -49,9 +52,12 @@ void OnNewObjectAvailable(Location sequence, uint64_t subgroup) override { std::optional<PublishedObject> object = GetCachedObject(sequence.group, subgroup, sequence.object); - QUICHE_CHECK(object.has_value()); - ASSERT_THAT(object->metadata.status, AnyOf(MoqtObjectStatus::kNormal, - MoqtObjectStatus::kEndOfGroup)); + ASSERT_THAT(object, + Optional(Field(&PublishedObject::metadata, + Field(&PublishedObjectMetadata::status, + AnyOf(MoqtObjectStatus::kNormal, + MoqtObjectStatus::kEndOfGroup, + MoqtObjectStatus::kEndOfTrack))))); if (object->metadata.status == MoqtObjectStatus::kNormal) { PublishObject(object->metadata.location.group, object->metadata.location.object, @@ -372,5 +378,50 @@ EXPECT_GE(object->metadata.arrival_time, test_start); } +TEST(MoqtOutgoingQueue, EndOfTrack) { + TestMoqtOutgoingQueue queue; + queue.AddObject(quiche::QuicheMemSlice::Copy("a"), true); // Create (0, 0) + queue.AddObject(quiche::QuicheMemSlice::Copy("b"), true); // Create (1, 0) + std::unique_ptr<MoqtFetchTask> fetch = queue.Fetch( + Location{0, 0}, 5, std::nullopt, MoqtDeliveryOrder::kAscending); + bool end_of_track = false; + Location end_location; + // end_of_track is false before Close() is called. + fetch->SetFetchResponseCallback( + [&end_of_track, + &end_location](std::variant<MoqtFetchOk, MoqtFetchError> arg) { + end_of_track = std::get<MoqtFetchOk>(arg).end_of_track; + end_location = std::get<MoqtFetchOk>(arg).end_location; + }); + EXPECT_FALSE(end_of_track); + EXPECT_EQ(end_location, Location(1, 0)); + + queue.Close(); // Create (2, 0) + EXPECT_EQ(queue.GetLargestLocation(), Location(2, 0)); + fetch = queue.Fetch(Location{0, 0}, 1, std::nullopt, + MoqtDeliveryOrder::kAscending); + // end_of_track is false if the fetch does not include the last object. + fetch->SetFetchResponseCallback( + [&end_of_track, + &end_location](std::variant<MoqtFetchOk, MoqtFetchError> arg) { + end_of_track = std::get<MoqtFetchOk>(arg).end_of_track; + end_location = std::get<MoqtFetchOk>(arg).end_location; + }); + EXPECT_FALSE(end_of_track); + EXPECT_EQ(end_location, Location(1, 1)); + + fetch = queue.Fetch(Location{0, 0}, 5, std::nullopt, + MoqtDeliveryOrder::kAscending); + // end_of_track is true if the fetch includes the last object. + fetch->SetFetchResponseCallback( + [&end_of_track, + &end_location](std::variant<MoqtFetchOk, MoqtFetchError> arg) { + end_of_track = std::get<MoqtFetchOk>(arg).end_of_track; + end_location = std::get<MoqtFetchOk>(arg).end_location; + }); + EXPECT_TRUE(end_of_track); + EXPECT_EQ(end_location, Location(2, 0)); +} + } // namespace } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index d29b5a2..52de027 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -788,7 +788,7 @@ MoqtFetch fetch; uint8_t group_order; uint64_t type; - if (!reader.ReadVarInt62(&fetch.fetch_id) || + if (!reader.ReadVarInt62(&fetch.request_id) || !reader.ReadUInt8(&fetch.subscriber_priority) || !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&type)) { return 0; @@ -862,23 +862,14 @@ return reader.PreviouslyReadPayload().length(); } -size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) { - MoqtFetchCancel fetch_cancel; - if (!reader.ReadVarInt62(&fetch_cancel.subscribe_id)) { - return 0; - } - visitor_.OnFetchCancelMessage(fetch_cancel); - return reader.PreviouslyReadPayload().length(); -} - size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) { MoqtFetchOk fetch_ok; - uint8_t group_order; + uint8_t group_order, end_of_track; KeyValuePairList parameters; - if (!reader.ReadVarInt62(&fetch_ok.subscribe_id) || - !reader.ReadUInt8(&group_order) || - !reader.ReadVarInt62(&fetch_ok.largest_id.group) || - !reader.ReadVarInt62(&fetch_ok.largest_id.object) || + if (!reader.ReadVarInt62(&fetch_ok.request_id) || + !reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&end_of_track) || + !reader.ReadVarInt62(&fetch_ok.end_location.group) || + !reader.ReadVarInt62(&fetch_ok.end_location.object) || !ParseKeyValuePairList(reader, parameters)) { return 0; } @@ -886,12 +877,17 @@ ParseError("Invalid group order value in FETCH_OK"); return 0; } + if (end_of_track > 0x01) { + ParseError("Invalid end of track value in FETCH_OK"); + return 0; + } if (!ValidateVersionSpecificParameters(parameters, MoqtMessageType::kFetchOk)) { ParseError("FETCH_OK message contains invalid parameters"); return 0; } fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order); + fetch_ok.end_of_track = end_of_track == 1; if (!KeyValuePairListToVersionSpecificParameters(parameters, fetch_ok.parameters)) { return 0; @@ -903,9 +899,9 @@ size_t MoqtControlParser::ProcessFetchError(quic::QuicDataReader& reader) { MoqtFetchError fetch_error; uint64_t error_code; - if (!reader.ReadVarInt62(&fetch_error.subscribe_id) || + if (!reader.ReadVarInt62(&fetch_error.request_id) || !reader.ReadVarInt62(&error_code) || - !reader.ReadStringVarInt62(fetch_error.reason_phrase)) { + !reader.ReadStringVarInt62(fetch_error.error_reason)) { return 0; } fetch_error.error_code = static_cast<RequestErrorCode>(error_code); @@ -913,6 +909,15 @@ return reader.PreviouslyReadPayload().length(); } +size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) { + MoqtFetchCancel fetch_cancel; + if (!reader.ReadVarInt62(&fetch_cancel.request_id)) { + return 0; + } + visitor_.OnFetchCancelMessage(fetch_cancel); + return reader.PreviouslyReadPayload().length(); +} + size_t MoqtControlParser::ProcessRequestsBlocked(quic::QuicDataReader& reader) { MoqtRequestsBlocked requests_blocked; if (!reader.ReadVarInt62(&requests_blocked.max_request_id)) {
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index ef1450b..55a27a3 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -207,7 +207,7 @@ return; } QUICHE_DLOG(INFO) << ENDPOINT - << "Received OBJECT message in datagram for subscribe_id " + << "Received OBJECT message in datagram for request_id " << " for track alias " << message.track_alias << " with sequence " << message.group_id << ":" << message.object_id << " priority " @@ -485,7 +485,7 @@ } MoqtFetch message; message.fetch = StandaloneFetch(name, start, end_group, end_object); - message.fetch_id = next_request_id_; + message.request_id = next_request_id_; next_request_id_ += 2; message.subscriber_priority = priority; message.group_order = delivery_order; @@ -494,7 +494,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "Sent FETCH message for " << name; auto fetch = std::make_unique<UpstreamFetch>( message, std::get<StandaloneFetch>(message.fetch), std::move(callback)); - upstream_by_id_.emplace(message.fetch_id, std::move(fetch)); + upstream_by_id_.emplace(message.request_id, std::move(fetch)); return true; } @@ -546,7 +546,7 @@ return false; } MoqtFetch fetch; - fetch.fetch_id = next_request_id_; + fetch.request_id = next_request_id_; next_request_id_ += 2; fetch.subscriber_priority = priority; fetch.group_order = delivery_order; @@ -556,7 +556,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "Sent Joining FETCH message for " << name; auto upstream_fetch = std::make_unique<UpstreamFetch>(fetch, name, std::move(callback)); - upstream_by_id_.emplace(fetch.fetch_id, std::move(upstream_fetch)); + upstream_by_id_.emplace(fetch.request_id, std::move(upstream_fetch)); return true; } @@ -598,7 +598,7 @@ continue; } if (fetch->session_->WriteObjectToStream( - stream_, fetch->fetch_id_, object.metadata, + stream_, fetch->request_id(), object.metadata, std::move(object.payload), MoqtDataStreamType::kStreamHeaderFetch, !stream_header_written_, /*fin=*/false)) { @@ -798,8 +798,8 @@ return it->second; } -RemoteTrack* MoqtSession::RemoteTrackById(uint64_t subscribe_id) { - auto it = upstream_by_id_.find(subscribe_id); +RemoteTrack* MoqtSession::RemoteTrackById(uint64_t request_id) { + auto it = upstream_by_id_.find(request_id); if (it == upstream_by_id_.end()) { return nullptr; } @@ -853,19 +853,18 @@ } void MoqtSession::UpdateQueuedSendOrder( - uint64_t subscribe_id, - std::optional<webtransport::SendOrder> old_send_order, + uint64_t request_id, std::optional<webtransport::SendOrder> old_send_order, std::optional<webtransport::SendOrder> new_send_order) { if (old_send_order == new_send_order) { return; } if (old_send_order.has_value()) { subscribes_with_queued_outgoing_data_streams_.erase( - SubscriptionWithQueuedStream{*old_send_order, subscribe_id}); + SubscriptionWithQueuedStream{*old_send_order, request_id}); } if (new_send_order.has_value()) { subscribes_with_queued_outgoing_data_streams_.emplace(*new_send_order, - subscribe_id); + request_id); } } @@ -985,12 +984,12 @@ } void MoqtSession::ControlStream::SendFetchError( - uint64_t subscribe_id, RequestErrorCode error_code, - absl::string_view reason_phrase) { + uint64_t request_id, RequestErrorCode error_code, + absl::string_view error_reason) { MoqtFetchError fetch_error; - fetch_error.subscribe_id = subscribe_id; + fetch_error.request_id = request_id; fetch_error.error_code = error_code; - fetch_error.reason_phrase = reason_phrase; + fetch_error.error_reason = error_reason; SendOrBufferMessage(session_->framer_.SerializeFetchError(fetch_error)); } @@ -1051,7 +1050,7 @@ RemoteTrack* track = session_->RemoteTrackById(message.request_id); if (track == nullptr) { QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for " - << "subscribe_id = " << message.request_id + << "request_id = " << message.request_id << " but no track exists"; // Subscription state might have been destroyed for internal reasons. return; @@ -1063,12 +1062,12 @@ } if (message.largest_location.has_value()) { QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for " - << "subscribe_id = " << message.request_id << " " + << "request_id = " << message.request_id << " " << track->full_track_name() << " largest_id = " << *message.largest_location; } else { QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for " - << "subscribe_id = " << message.request_id << " " + << "request_id = " << message.request_id << " " << track->full_track_name(); } SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track); @@ -1088,7 +1087,7 @@ RemoteTrack* track = session_->RemoteTrackById(message.request_id); if (track == nullptr) { QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for " - << "subscribe_id = " << message.request_id + << "request_id = " << message.request_id << " but no track exists"; // Subscription state might have been destroyed for internal reasons. return; @@ -1104,7 +1103,7 @@ return; } QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for " - << "subscribe_id = " << message.request_id << " (" + << "request_id = " << message.request_id << " (" << track->full_track_name() << ")" << ", error = " << static_cast<int>(message.error_code) << " (" << message.reason_phrase << ")"; @@ -1343,12 +1342,12 @@ } void MoqtSession::ControlStream::OnFetchMessage(const MoqtFetch& message) { - if (!session_->ValidateRequestId(message.fetch_id)) { + if (!session_->ValidateRequestId(message.request_id)) { return; } if (session_->sent_goaway_) { QUIC_DLOG(INFO) << ENDPOINT << "Received a FETCH after GOAWAY"; - SendFetchError(message.fetch_id, RequestErrorCode::kUnauthorized, + SendFetchError(message.request_id, RequestErrorCode::kUnauthorized, "FETCH after GOAWAY"); return; } @@ -1375,7 +1374,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "Received a JOINING_FETCH for " << "subscribe_id " << joining_subscribe_id << " that does not exist"; - SendFetchError(message.fetch_id, RequestErrorCode::kTrackDoesNotExist, + SendFetchError(message.request_id, RequestErrorCode::kTrackDoesNotExist, "Joining Fetch for non-existent subscribe"); return; } @@ -1419,7 +1418,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name << " rejected by the application: " << track_publisher.status(); - SendFetchError(message.fetch_id, RequestErrorCode::kTrackDoesNotExist, + SendFetchError(message.request_id, RequestErrorCode::kTrackDoesNotExist, track_publisher.status().message()); return; } @@ -1431,23 +1430,23 @@ if (!fetch->GetStatus().ok()) { QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name << " could not initialize the task"; - SendFetchError(message.fetch_id, RequestErrorCode::kInvalidRange, + SendFetchError(message.request_id, RequestErrorCode::kInvalidRange, fetch->GetStatus().message()); return; } auto published_fetch = std::make_unique<PublishedFetch>( - message.fetch_id, session_, std::move(fetch)); - auto result = session_->incoming_fetches_.emplace(message.fetch_id, + message.request_id, session_, std::move(fetch)); + auto result = session_->incoming_fetches_.emplace(message.request_id, std::move(published_fetch)); if (!result.second) { // Emplace failed. QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name << " could not be added to the session"; - SendFetchError(message.fetch_id, RequestErrorCode::kInternalError, + SendFetchError(message.request_id, RequestErrorCode::kInternalError, "Could not initialize FETCH state"); } MoqtFetchTask* fetch_task = result.first->second->fetch_task(); fetch_task->SetFetchResponseCallback( - [this, request_id = message.fetch_id, fetch_start = start_object, + [this, request_id = message.request_id, fetch_start = start_object, fetch_end = Location(end_group, end_object.value_or(UINT64_MAX))]( std::variant<MoqtFetchOk, MoqtFetchError> message) { if (!session_->incoming_fetches_.contains(request_id)) { @@ -1455,11 +1454,11 @@ } if (std::holds_alternative<MoqtFetchOk>(message)) { MoqtFetchOk& fetch_ok = std::get<MoqtFetchOk>(message); - fetch_ok.subscribe_id = request_id; - if (fetch_ok.largest_id < fetch_start || - fetch_ok.largest_id > fetch_end) { + fetch_ok.request_id = request_id; + if (fetch_ok.end_location < fetch_start || + fetch_ok.end_location > fetch_end) { // TODO(martinduke): Add end_of_track to fetch_ok and check it's - // larger than largest_id. + // larger than end_location. QUIC_BUG(quic_bug_fetch_ok_status_error) << "FETCH_OK end or end_of_track is invalid"; session_->Error(MoqtError::kInternalError, "FETCH_OK status error"); @@ -1469,14 +1468,14 @@ return; } MoqtFetchError& fetch_error = std::get<MoqtFetchError>(message); - fetch_error.subscribe_id = request_id; + fetch_error.request_id = request_id; SendOrBufferMessage(session_->framer_.SerializeFetchError(fetch_error)); }); // Set a temporary new-object callback that creates a data stream. When // created, the stream visitor will replace this callback. fetch_task->SetObjectAvailableCallback( [this, send_order = SendOrderForFetch(message.subscriber_priority), - request_id = message.fetch_id]() { + request_id = message.request_id]() { auto it = session_->incoming_fetches_.find(request_id); if (it == session_->incoming_fetches_.end()) { return; @@ -1495,10 +1494,10 @@ } void MoqtSession::ControlStream::OnFetchOkMessage(const MoqtFetchOk& message) { - RemoteTrack* track = session_->RemoteTrackById(message.subscribe_id); + RemoteTrack* track = session_->RemoteTrackById(message.request_id); if (track == nullptr) { QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_OK for " - << "subscribe_id = " << message.subscribe_id + << "request_id = " << message.request_id << " but no track exists"; // Subscription state might have been destroyed for internal reasons. return; @@ -1508,21 +1507,20 @@ "Received FETCH_OK for a SUBSCRIBE"); return; } - QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_OK for subscribe_id = " - << message.subscribe_id << " " << track->full_track_name(); + QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_OK for request_id = " + << message.request_id << " " << track->full_track_name(); UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track); - fetch->OnFetchResult(message.largest_id, absl::OkStatus(), - [=, session = session_]() { - session->CancelFetch(message.subscribe_id); - }); + fetch->OnFetchResult( + message.end_location, absl::OkStatus(), + [=, session = session_]() { session->CancelFetch(message.request_id); }); } void MoqtSession::ControlStream::OnFetchErrorMessage( const MoqtFetchError& message) { - RemoteTrack* track = session_->RemoteTrackById(message.subscribe_id); + RemoteTrack* track = session_->RemoteTrackById(message.request_id); if (track == nullptr) { QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_ERROR for " - << "subscribe_id = " << message.subscribe_id + << "request_id = " << message.request_id << " but no track exists"; // Subscription state might have been destroyed for internal reasons. return; @@ -1538,15 +1536,15 @@ return; } QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_ERROR for " - << "subscribe_id = " << message.subscribe_id << " (" + << "request_id = " << message.request_id << " (" << track->full_track_name() << ")" << ", error = " << static_cast<int>(message.error_code) - << " (" << message.reason_phrase << ")"; + << " (" << message.error_reason << ")"; UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track); absl::Status status = - RequestErrorCodeToStatus(message.error_code, message.reason_phrase); + RequestErrorCodeToStatus(message.error_code, message.error_reason); fetch->OnFetchResult(Location(0, 0), status, nullptr); - session_->upstream_by_id_.erase(message.subscribe_id); + session_->upstream_by_id_.erase(message.request_id); } void MoqtSession::ControlStream::OnRequestsBlockedMessage( @@ -2317,19 +2315,19 @@ return true; } -void MoqtSession::CancelFetch(uint64_t subscribe_id) { +void MoqtSession::CancelFetch(uint64_t request_id) { if (is_closing_) { return; } // This is only called from the callback where UpstreamFetchTask has been // destroyed, so there is no need to notify the application. - upstream_by_id_.erase(subscribe_id); + upstream_by_id_.erase(request_id); ControlStream* stream = GetControlStream(); if (stream == nullptr) { return; } MoqtFetchCancel message; - message.subscribe_id = subscribe_id; + message.request_id = request_id; stream->SendOrBufferMessage(framer_.SerializeFetchCancel(message)); // The FETCH_CANCEL will cause a RESET_STREAM to return, which would be the // same as a STOP_SENDING. However, a FETCH_CANCEL works even if the stream
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 356a4eb..8ca159e 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -296,8 +296,8 @@ private: friend class test::MoqtSessionPeer; - void SendFetchError(uint64_t subscribe_id, RequestErrorCode error_code, - absl::string_view reason_phrase); + void SendFetchError(uint64_t request_id, RequestErrorCode error_code, + absl::string_view error_reason); MoqtSession* session_; webtransport::Stream* stream_; @@ -562,9 +562,11 @@ class QUICHE_EXPORT PublishedFetch { public: - PublishedFetch(uint64_t fetch_id, MoqtSession* session, + PublishedFetch(uint64_t request_id, MoqtSession* session, std::unique_ptr<MoqtFetchTask> fetch) - : session_(session), fetch_(std::move(fetch)), fetch_id_(fetch_id) {} + : session_(session), + fetch_(std::move(fetch)), + request_id_(request_id) {} class FetchStreamVisitor : public webtransport::StreamVisitor { public: @@ -577,7 +579,7 @@ ~FetchStreamVisitor() { std::shared_ptr<PublishedFetch> fetch = fetch_.lock(); if (fetch != nullptr) { - fetch->session()->incoming_fetches_.erase(fetch->fetch_id_); + fetch->session()->incoming_fetches_.erase(fetch->request_id_); } } // webtransport::StreamVisitor implementation. @@ -597,13 +599,13 @@ MoqtFetchTask* fetch_task() { return fetch_.get(); } MoqtSession* session() { return session_; } - uint64_t fetch_id() const { return fetch_id_; } + uint64_t request_id() const { return request_id_; } void SetStreamId(webtransport::StreamId id) { stream_id_ = id; } private: MoqtSession* session_; std::unique_ptr<MoqtFetchTask> fetch_; - uint64_t fetch_id_; + uint64_t request_id_; // Store the stream ID in case a FETCH_CANCEL requires a reset. std::optional<webtransport::StreamId> stream_id_; };
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 80b3520..a89bf05 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -76,7 +76,7 @@ MoqtFetch DefaultFetch() { MoqtFetch fetch = { - /*fetch_id=*/1, + /*request_id=*/1, /*subscriber_priority=*/0x80, /*group_order=*/std::nullopt, /*fetch=*/ @@ -2264,9 +2264,10 @@ // Compose and send the FETCH_OK. MoqtFetchOk expected_ok; - expected_ok.subscribe_id = fetch.fetch_id; + expected_ok.request_id = fetch.request_id; expected_ok.group_order = MoqtDeliveryOrder::kAscending; - expected_ok.largest_id = Location(1, 4); + expected_ok.end_of_track = false; + expected_ok.end_location = Location(1, 4); EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _)); fetch_task->CallFetchResponseCallback(expected_ok); // Data arrives. @@ -2288,9 +2289,10 @@ MockTrackPublisher* track = CreateTrackPublisher(); MoqtFetchOk expected_ok; - expected_ok.subscribe_id = fetch.fetch_id; + expected_ok.request_id = fetch.request_id; expected_ok.group_order = MoqtDeliveryOrder::kAscending; - expected_ok.largest_id = Location(1, 4); + expected_ok.end_of_track = false; + expected_ok.end_location = Location(1, 4); auto fetch_task_ptr = std::make_unique<MockFetchTask>(expected_ok, std::nullopt, true); MockFetchTask* fetch_task = fetch_task_ptr.get(); @@ -2329,9 +2331,10 @@ stream_input->OnFetchMessage(fetch); MoqtFetchOk expected_ok; - expected_ok.subscribe_id = fetch.fetch_id; + expected_ok.request_id = fetch.request_id; expected_ok.group_order = MoqtDeliveryOrder::kAscending; - expected_ok.largest_id = Location(1, 4); + expected_ok.end_of_track = false; + expected_ok.end_location = Location(1, 4); EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _)); fetch_task->CallFetchResponseCallback(expected_ok); } @@ -2355,9 +2358,9 @@ stream_input->OnFetchMessage(fetch); MoqtFetchError expected_error; - expected_error.subscribe_id = fetch.fetch_id; + expected_error.request_id = fetch.request_id; expected_error.error_code = RequestErrorCode::kTrackDoesNotExist; - expected_error.reason_phrase = "foo"; + expected_error.error_reason = "foo"; EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_error), _)); fetch_task->CallFetchResponseCallback(expected_error); @@ -2370,7 +2373,7 @@ std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &control_stream); MoqtFetch fetch = DefaultFetch(); - fetch.fetch_id = 1; // Too low. + fetch.request_id = 1; // Too low. EXPECT_CALL(mock_session_, CloseSession(static_cast<uint64_t>(MoqtError::kInvalidRequestId), "Request ID not monotonically increasing")) @@ -2445,7 +2448,7 @@ // Joining FETCH arrives. The resulting Fetch should begin at (2, 0). MoqtFetch fetch = DefaultFetch(); - fetch.fetch_id = 3; + fetch.request_id = 3; fetch.fetch = JoiningFetchRelative(1, 2); EXPECT_CALL(*track, Fetch(Location(2, 0), 4, std::optional<uint64_t>(10), _)) .WillOnce(Return(std::make_unique<MockFetchTask>())); @@ -2476,7 +2479,7 @@ ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get()); MoqtFetch fetch = DefaultFetch(); - fetch.fetch_id = 3; + fetch.request_id = 3; fetch.fetch = JoiningFetchRelative(1, 2); EXPECT_CALL(mock_session_, CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation), @@ -2504,7 +2507,7 @@ VersionSpecificParameters(), }; MoqtFetch expected_fetch = { - /*fetch_id=*/2, + /*request_id=*/2, /*subscriber_priority=*/0x80, /*group_order=*/MoqtDeliveryOrder::kAscending, /*fetch=*/JoiningFetchRelative(0, 1), @@ -2538,11 +2541,11 @@ MoqtDeliveryOrder::kAscending, Location(2, 0), VersionSpecificParameters())); stream_input->OnFetchOkMessage(MoqtFetchOk(2, MoqtDeliveryOrder::kAscending, - Location(2, 0), + false, Location(2, 0), VersionSpecificParameters())); // Packet arrives on FETCH stream. MoqtObject object = { - /*fetch_id=*/2, + /*request_id=*/2, /*group_id, object_id=*/0, 0, /*publisher_priority=*/128, @@ -2627,7 +2630,8 @@ MoqtFetchOk ok = { /*request_id=*/0, /*group_order=*/MoqtDeliveryOrder::kAscending, - /*largest_id=*/Location(3, 25), + /*end_of_track=*/false, + /*end_location=*/Location(3, 25), VersionSpecificParameters(), }; stream_input->OnFetchOkMessage(ok); @@ -2726,7 +2730,8 @@ MoqtFetchOk ok = { /*request_id=*/0, /*group_order=*/MoqtDeliveryOrder::kAscending, - /*largest_id=*/Location(3, 25), + /*end_of_track=*/false, + /*end_location=*/Location(3, 25), VersionSpecificParameters(), }; stream_input->OnFetchOkMessage(ok); @@ -2796,7 +2801,8 @@ MoqtFetchOk ok = { /*request_id=*/0, /*group_order=*/MoqtDeliveryOrder::kAscending, - /*largest_id=*/Location(3, 25), + /*end_of_track=*/false, + /*end_location=*/Location(3, 25), VersionSpecificParameters(), }; stream_input->OnFetchOkMessage(ok); @@ -3194,7 +3200,7 @@ EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kFetchError), _)); MoqtFetch fetch = DefaultFetch(); - fetch.fetch_id = 3; + fetch.request_id = 3; stream_input->OnFetchMessage(fetch); EXPECT_CALL( mock_stream_,
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index 6f5b96c..4c81f37 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -236,7 +236,7 @@ UpstreamFetch(const MoqtFetch& fetch, const StandaloneFetch standalone, FetchResponseCallback callback) : RemoteTrack( - standalone.full_track_name, fetch.fetch_id, + standalone.full_track_name, fetch.request_id, SubscribeWindow(standalone.start_object, standalone.end_group, standalone.end_object), fetch.subscriber_priority), @@ -247,7 +247,7 @@ // Relative Joining Fetch constructor UpstreamFetch(const MoqtFetch& fetch, FullTrackName full_track_name, FetchResponseCallback callback) - : RemoteTrack(full_track_name, fetch.fetch_id, + : RemoteTrack(full_track_name, fetch.request_id, SubscribeWindow(Location(0, 0)), fetch.subscriber_priority), ok_callback_(std::move(callback)) { // Immediately set the data stream type. @@ -258,7 +258,7 @@ JoiningFetchAbsolute absolute_joining, FetchResponseCallback callback) : RemoteTrack( - full_track_name, fetch.fetch_id, + full_track_name, fetch.request_id, SubscribeWindow(Location(absolute_joining.joining_start, 0)), fetch.subscriber_priority), ok_callback_(std::move(callback)) {
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h index 467c2fb..a0dc737 100644 --- a/quiche/quic/moqt/test_tools/moqt_session_peer.h +++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -199,8 +199,8 @@ // Initialize the fetch task fetch->OnFetchResult( Location{4, 10}, absl::OkStatus(), - [=, session_ptr = session, fetch_id = fetch_message.fetch_id]() { - session_ptr->CancelFetch(fetch_id); + [=, session_ptr = session, request_id = fetch_message.request_id]() { + session_ptr->CancelFetch(request_id); }); ; auto mock_session =
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 1ad79b2..4497674 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -1309,8 +1309,8 @@ } bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtFetch>(values); - if (cast.fetch_id != fetch_.fetch_id) { - QUIC_LOG(INFO) << "FETCH fetch_id mismatch"; + if (cast.request_id != fetch_.request_id) { + QUIC_LOG(INFO) << "FETCH request_id mismatch"; return false; } if (cast.subscriber_priority != fetch_.subscriber_priority) { @@ -1359,7 +1359,7 @@ private: uint8_t raw_packet_[28] = { 0x16, 0x00, 0x19, - 0x01, // fetch_id = 1 + 0x01, // request_id = 1 0x02, // priority = kHigh 0x01, // group_order = kAscending 0x01, // type = kStandalone @@ -1371,7 +1371,7 @@ }; MoqtFetch fetch_ = { - /*fetch_id =*/1, + /*request_id=*/1, /*subscriber_priority=*/2, /*group_order=*/MoqtDeliveryOrder::kAscending, /*fetch =*/ @@ -1394,8 +1394,8 @@ } bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtFetch>(values); - if (cast.fetch_id != fetch_.fetch_id) { - QUIC_LOG(INFO) << "FETCH fetch_id mismatch"; + if (cast.request_id != fetch_.request_id) { + QUIC_LOG(INFO) << "FETCH request_id mismatch"; return false; } if (cast.subscriber_priority != fetch_.subscriber_priority) { @@ -1433,7 +1433,7 @@ private: uint8_t raw_packet_[17] = { 0x16, 0x00, 0x0e, - 0x01, // fetch_id = 1 + 0x01, // request_id = 1 0x02, // priority = kHigh 0x01, // group_order = kAscending 0x02, // type = kRelativeJoining @@ -1442,7 +1442,7 @@ }; MoqtFetch fetch_ = { - /*fetch_id =*/1, + /*request_id =*/1, /*subscriber_priority=*/2, /*group_order=*/MoqtDeliveryOrder::kAscending, /*fetch=*/JoiningFetchRelative{2, 2}, @@ -1459,8 +1459,8 @@ } bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtFetch>(values); - if (cast.fetch_id != fetch_.fetch_id) { - QUIC_LOG(INFO) << "FETCH fetch_id mismatch"; + if (cast.request_id != fetch_.request_id) { + QUIC_LOG(INFO) << "FETCH request_id mismatch"; return false; } if (cast.subscriber_priority != fetch_.subscriber_priority) { @@ -1498,7 +1498,7 @@ private: uint8_t raw_packet_[17] = { 0x16, 0x00, 0x0e, - 0x01, // fetch_id = 1 + 0x01, // request_id = 1 0x02, // priority = kHigh 0x01, // group_order = kAscending 0x03, // type = kAbsoluteJoining @@ -1507,7 +1507,7 @@ }; MoqtFetch fetch_ = { - /*fetch_id =*/1, + /*request_id=*/1, /*subscriber_priority=*/2, /*group_order=*/MoqtDeliveryOrder::kAscending, /*fetch=*/JoiningFetchAbsolute{2, 2}, @@ -1515,6 +1515,105 @@ }; }; +class QUICHE_NO_EXPORT FetchOkMessage : public TestMessageBase { + public: + FetchOkMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtFetchOk>(values); + if (cast.request_id != fetch_ok_.request_id) { + QUIC_LOG(INFO) << "FETCH_OK request_id mismatch"; + return false; + } + if (cast.group_order != fetch_ok_.group_order) { + QUIC_LOG(INFO) << "FETCH_OK group_order mismatch"; + return false; + } + if (cast.end_of_track != fetch_ok_.end_of_track) { + QUIC_LOG(INFO) << "FETCH_OK end_of_track mismatch"; + return false; + } + if (cast.end_location != fetch_ok_.end_location) { + QUIC_LOG(INFO) << "FETCH_OK end_location mismatch"; + return false; + } + if (cast.parameters != fetch_ok_.parameters) { + QUIC_LOG(INFO) << "FETCH_OK parameters mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { ExpandVarintsImpl("v--vvvvv---"); } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(fetch_ok_); + } + + private: + uint8_t raw_packet_[12] = { + 0x18, 0x00, 0x09, + 0x01, // request_id = 1 + 0x01, // group_order = kAscending + 0x00, // end_of_track = false + 0x05, 0x04, // end_location = 5, 4 + 0x01, 0x04, 0x67, 0x10, // MaxCacheDuration = 10000 + }; + + MoqtFetchOk fetch_ok_ = { + /*request_id =*/1, + /*group_order=*/MoqtDeliveryOrder::kAscending, + /*end_of_track=*/false, + /*end_location=*/Location{5, 4}, + VersionSpecificParameters(quic::QuicTimeDelta::Infinite(), + quic::QuicTimeDelta::FromMilliseconds(10000)), + }; +}; + +class QUICHE_NO_EXPORT FetchErrorMessage : public TestMessageBase { + public: + FetchErrorMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtFetchError>(values); + if (cast.request_id != fetch_error_.request_id) { + QUIC_LOG(INFO) << "FETCH_ERROR request_id mismatch"; + return false; + } + if (cast.error_code != fetch_error_.error_code) { + QUIC_LOG(INFO) << "FETCH_ERROR group_order mismatch"; + return false; + } + if (cast.error_reason != fetch_error_.error_reason) { + QUIC_LOG(INFO) << "FETCH_ERROR error_reason mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { ExpandVarintsImpl("vvv---"); } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(fetch_error_); + } + + private: + uint8_t raw_packet_[9] = { + 0x19, 0x00, 0x06, + 0x01, // request_id = 1 + 0x01, // error_code = kUnauthorized + 0x03, 0x62, 0x61, 0x72, // error_reason = "bar" + }; + + MoqtFetchError fetch_error_ = { + /*request_id =*/1, + /*error_code=*/RequestErrorCode::kUnauthorized, + /*error_reason=*/"bar", + }; +}; + class QUICHE_NO_EXPORT FetchCancelMessage : public TestMessageBase { public: FetchCancelMessage() : TestMessageBase() { @@ -1522,7 +1621,7 @@ } bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtFetchCancel>(values); - if (cast.subscribe_id != fetch_cancel_.subscribe_id) { + if (cast.request_id != fetch_cancel_.request_id) { QUIC_LOG(INFO) << "FETCH_CANCEL subscribe_id mismatch"; return false; } @@ -1538,104 +1637,11 @@ private: uint8_t raw_packet_[4] = { 0x17, 0x00, 0x01, - 0x01, // subscribe_id = 1 + 0x01, // request_id = 1 }; MoqtFetchCancel fetch_cancel_ = { - /*subscribe_id =*/1, - }; -}; - -class QUICHE_NO_EXPORT FetchOkMessage : public TestMessageBase { - public: - FetchOkMessage() : TestMessageBase() { - SetWireImage(raw_packet_, sizeof(raw_packet_)); - } - bool EqualFieldValues(MessageStructuredData& values) const override { - auto cast = std::get<MoqtFetchOk>(values); - if (cast.subscribe_id != fetch_ok_.subscribe_id) { - QUIC_LOG(INFO) << "FETCH_OK subscribe_id mismatch"; - return false; - } - if (cast.group_order != fetch_ok_.group_order) { - QUIC_LOG(INFO) << "FETCH_OK group_order mismatch"; - return false; - } - if (cast.largest_id != fetch_ok_.largest_id) { - QUIC_LOG(INFO) << "FETCH_OK start_object mismatch"; - return false; - } - if (cast.parameters != fetch_ok_.parameters) { - QUIC_LOG(INFO) << "FETCH_OK parameters mismatch"; - return false; - } - return true; - } - - void ExpandVarints() override { ExpandVarintsImpl("v-vvvvv---"); } - - MessageStructuredData structured_data() const override { - return TestMessageBase::MessageStructuredData(fetch_ok_); - } - - private: - uint8_t raw_packet_[11] = { - 0x18, 0x00, 0x08, - 0x01, // subscribe_id = 1 - 0x01, // group_order = kAscending - 0x05, 0x04, // largest_object = 5, 4 - 0x01, 0x04, 0x67, 0x10, // MaxCacheDuration = 10000 - }; - - MoqtFetchOk fetch_ok_ = { - /*subscribe_id =*/1, - /*group_order=*/MoqtDeliveryOrder::kAscending, - /*start_object=*/Location{5, 4}, - VersionSpecificParameters(quic::QuicTimeDelta::Infinite(), - quic::QuicTimeDelta::FromMilliseconds(10000)), - }; -}; - -class QUICHE_NO_EXPORT FetchErrorMessage : public TestMessageBase { - public: - FetchErrorMessage() : TestMessageBase() { - SetWireImage(raw_packet_, sizeof(raw_packet_)); - } - bool EqualFieldValues(MessageStructuredData& values) const override { - auto cast = std::get<MoqtFetchError>(values); - if (cast.subscribe_id != fetch_error_.subscribe_id) { - QUIC_LOG(INFO) << "FETCH_ERROR subscribe_id mismatch"; - return false; - } - if (cast.error_code != fetch_error_.error_code) { - QUIC_LOG(INFO) << "FETCH_ERROR group_order mismatch"; - return false; - } - if (cast.reason_phrase != fetch_error_.reason_phrase) { - QUIC_LOG(INFO) << "FETCH_ERROR reason_phrase mismatch"; - return false; - } - return true; - } - - void ExpandVarints() override { ExpandVarintsImpl("vvv---"); } - - MessageStructuredData structured_data() const override { - return TestMessageBase::MessageStructuredData(fetch_error_); - } - - private: - uint8_t raw_packet_[9] = { - 0x19, 0x00, 0x06, - 0x01, // subscribe_id = 1 - 0x01, // error_code = kUnauthorized - 0x03, 0x62, 0x61, 0x72, // reason_phrase = "bar" - }; - - MoqtFetchError fetch_error_ = { - /*subscribe_id =*/1, - /*error_code=*/RequestErrorCode::kUnauthorized, - /*reason_phrase=*/"bar", + /*request_id =*/1, }; };