Update MoQT FETCH* messages to draft-11.
(except FETCH format change, already done)
Add a field to FETCH_OK, rename other fields.
PiperOrigin-RevId: 780281131
diff --git a/quiche/quic/moqt/moqt_failed_fetch.h b/quiche/quic/moqt/moqt_failed_fetch.h
index 154c6fc..c971897 100644
--- a/quiche/quic/moqt/moqt_failed_fetch.h
+++ b/quiche/quic/moqt/moqt_failed_fetch.h
@@ -26,9 +26,9 @@
ObjectsAvailableCallback /*callback*/) override {}
void SetFetchResponseCallback(FetchResponseCallback callback) {
MoqtFetchError error;
- error.subscribe_id = 0;
+ error.request_id = 0;
error.error_code = StatusToRequestErrorCode(status_);
- error.reason_phrase = status_.message();
+ error.error_reason = status_.message();
std::move(callback)(error);
}
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 842a356..db73875 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -699,7 +699,7 @@
const StandaloneFetch& standalone_fetch =
std::get<StandaloneFetch>(message.fetch);
return SerializeControlMessage(
- MoqtMessageType::kFetch, WireVarInt62(message.fetch_id),
+ MoqtMessageType::kFetch, WireVarInt62(message.request_id),
WireUint8(message.subscriber_priority),
WireDeliveryOrder(message.group_order),
WireVarInt62(FetchType::kStandalone),
@@ -726,19 +726,13 @@
joining_start = joining_fetch.joining_start;
}
return SerializeControlMessage(
- MoqtMessageType::kFetch, WireVarInt62(message.fetch_id),
+ MoqtMessageType::kFetch, WireVarInt62(message.request_id),
WireUint8(message.subscriber_priority),
WireDeliveryOrder(message.group_order),
WireVarInt62(message.fetch.index() + 1), WireVarInt62(subscribe_id),
WireVarInt62(joining_start), WireKeyValuePairList(parameters));
}
-quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel(
- const MoqtFetchCancel& message) {
- return SerializeControlMessage(MoqtMessageType::kFetchCancel,
- WireVarInt62(message.subscribe_id));
-}
-
quiche::QuicheBuffer MoqtFramer::SerializeFetchOk(const MoqtFetchOk& message) {
KeyValuePairList parameters;
VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
@@ -748,20 +742,26 @@
<< "Serializing invalid MoQT parameters";
return quiche::QuicheBuffer();
}
- return SerializeControlMessage(MoqtMessageType::kFetchOk,
- WireVarInt62(message.subscribe_id),
- WireDeliveryOrder(message.group_order),
- WireVarInt62(message.largest_id.group),
- WireVarInt62(message.largest_id.object),
- WireKeyValuePairList(parameters));
+ return SerializeControlMessage(
+ MoqtMessageType::kFetchOk, WireVarInt62(message.request_id),
+ WireDeliveryOrder(message.group_order), WireBoolean(message.end_of_track),
+ WireVarInt62(message.end_location.group),
+ WireVarInt62(message.end_location.object),
+ WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeFetchError(
const MoqtFetchError& message) {
return SerializeControlMessage(
- MoqtMessageType::kFetchError, WireVarInt62(message.subscribe_id),
+ MoqtMessageType::kFetchError, WireVarInt62(message.request_id),
WireVarInt62(message.error_code),
- WireStringWithVarInt62Length(message.reason_phrase));
+ WireStringWithVarInt62Length(message.error_reason));
+}
+
+quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel(
+ const MoqtFetchCancel& message) {
+ return SerializeControlMessage(MoqtMessageType::kFetchCancel,
+ WireVarInt62(message.request_id));
}
quiche::QuicheBuffer MoqtFramer::SerializeRequestsBlocked(
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 43d4ba6..d6d27fd 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -766,7 +766,7 @@
};
struct QUICHE_EXPORT MoqtFetch {
- uint64_t fetch_id;
+ uint64_t request_id;
MoqtPriority subscriber_priority;
std::optional<MoqtDeliveryOrder> group_order;
std::variant<StandaloneFetch, JoiningFetchRelative, JoiningFetchAbsolute>
@@ -774,21 +774,22 @@
VersionSpecificParameters parameters;
};
-struct QUICHE_EXPORT MoqtFetchCancel {
- uint64_t subscribe_id;
-};
-
struct QUICHE_EXPORT MoqtFetchOk {
- uint64_t subscribe_id;
+ uint64_t request_id;
MoqtDeliveryOrder group_order;
- Location largest_id;
+ bool end_of_track;
+ Location end_location;
VersionSpecificParameters parameters;
};
struct QUICHE_EXPORT MoqtFetchError {
- uint64_t subscribe_id;
+ uint64_t request_id;
RequestErrorCode error_code;
- std::string reason_phrase;
+ std::string error_reason;
+};
+
+struct QUICHE_EXPORT MoqtFetchCancel {
+ uint64_t request_id;
};
struct QUICHE_EXPORT MoqtRequestsBlocked {
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index f9a1361..c61b220 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -123,7 +123,7 @@
MoqtFetchError error(0, StatusToRequestErrorCode(status_),
std::string(status_.message()));
error.error_code = StatusToRequestErrorCode(status_);
- error.reason_phrase = status_.message();
+ error.error_reason = status_.message();
std::move(callback)(error);
return;
}
@@ -135,11 +135,13 @@
}
MoqtFetchOk ok;
ok.group_order = MoqtDeliveryOrder::kAscending;
- ok.largest_id = *(objects_.crbegin());
- if (objects_.size() > 1 && *(objects_.cbegin()) > ok.largest_id) {
+ ok.end_location = *(objects_.crbegin());
+ if (objects_.size() > 1 && *(objects_.cbegin()) > ok.end_location) {
ok.group_order = MoqtDeliveryOrder::kDescending;
- ok.largest_id = *(objects_.cbegin());
+ ok.end_location = *(objects_.cbegin());
}
+ ok.end_of_track =
+ queue_->closed_ && ok.end_location == queue_->GetLargestLocation();
std::move(callback)(ok);
}
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index 46e0d19..5f677b8 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -9,6 +9,7 @@
#include <optional>
#include <string>
#include <utility>
+#include <variant>
#include <vector>
#include "absl/status/status.h"
@@ -24,6 +25,7 @@
#include "quiche/common/platform/api/quiche_expect_bug.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_mem_slice.h"
#include "quiche/common/test_tools/quiche_test_utils.h"
#include "quiche/web_transport/web_transport.h"
@@ -35,6 +37,7 @@
using ::quiche::test::StatusIs;
using ::testing::AnyOf;
using ::testing::ElementsAre;
+using ::testing::Field;
using ::testing::IsEmpty;
class TestMoqtOutgoingQueue : public MoqtOutgoingQueue,
@@ -49,9 +52,12 @@
void OnNewObjectAvailable(Location sequence, uint64_t subgroup) override {
std::optional<PublishedObject> object =
GetCachedObject(sequence.group, subgroup, sequence.object);
- QUICHE_CHECK(object.has_value());
- ASSERT_THAT(object->metadata.status, AnyOf(MoqtObjectStatus::kNormal,
- MoqtObjectStatus::kEndOfGroup));
+ ASSERT_THAT(object,
+ Optional(Field(&PublishedObject::metadata,
+ Field(&PublishedObjectMetadata::status,
+ AnyOf(MoqtObjectStatus::kNormal,
+ MoqtObjectStatus::kEndOfGroup,
+ MoqtObjectStatus::kEndOfTrack)))));
if (object->metadata.status == MoqtObjectStatus::kNormal) {
PublishObject(object->metadata.location.group,
object->metadata.location.object,
@@ -372,5 +378,50 @@
EXPECT_GE(object->metadata.arrival_time, test_start);
}
+TEST(MoqtOutgoingQueue, EndOfTrack) {
+ TestMoqtOutgoingQueue queue;
+ queue.AddObject(quiche::QuicheMemSlice::Copy("a"), true); // Create (0, 0)
+ queue.AddObject(quiche::QuicheMemSlice::Copy("b"), true); // Create (1, 0)
+ std::unique_ptr<MoqtFetchTask> fetch = queue.Fetch(
+ Location{0, 0}, 5, std::nullopt, MoqtDeliveryOrder::kAscending);
+ bool end_of_track = false;
+ Location end_location;
+ // end_of_track is false before Close() is called.
+ fetch->SetFetchResponseCallback(
+ [&end_of_track,
+ &end_location](std::variant<MoqtFetchOk, MoqtFetchError> arg) {
+ end_of_track = std::get<MoqtFetchOk>(arg).end_of_track;
+ end_location = std::get<MoqtFetchOk>(arg).end_location;
+ });
+ EXPECT_FALSE(end_of_track);
+ EXPECT_EQ(end_location, Location(1, 0));
+
+ queue.Close(); // Create (2, 0)
+ EXPECT_EQ(queue.GetLargestLocation(), Location(2, 0));
+ fetch = queue.Fetch(Location{0, 0}, 1, std::nullopt,
+ MoqtDeliveryOrder::kAscending);
+ // end_of_track is false if the fetch does not include the last object.
+ fetch->SetFetchResponseCallback(
+ [&end_of_track,
+ &end_location](std::variant<MoqtFetchOk, MoqtFetchError> arg) {
+ end_of_track = std::get<MoqtFetchOk>(arg).end_of_track;
+ end_location = std::get<MoqtFetchOk>(arg).end_location;
+ });
+ EXPECT_FALSE(end_of_track);
+ EXPECT_EQ(end_location, Location(1, 1));
+
+ fetch = queue.Fetch(Location{0, 0}, 5, std::nullopt,
+ MoqtDeliveryOrder::kAscending);
+ // end_of_track is true if the fetch includes the last object.
+ fetch->SetFetchResponseCallback(
+ [&end_of_track,
+ &end_location](std::variant<MoqtFetchOk, MoqtFetchError> arg) {
+ end_of_track = std::get<MoqtFetchOk>(arg).end_of_track;
+ end_location = std::get<MoqtFetchOk>(arg).end_location;
+ });
+ EXPECT_TRUE(end_of_track);
+ EXPECT_EQ(end_location, Location(2, 0));
+}
+
} // namespace
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index d29b5a2..52de027 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -788,7 +788,7 @@
MoqtFetch fetch;
uint8_t group_order;
uint64_t type;
- if (!reader.ReadVarInt62(&fetch.fetch_id) ||
+ if (!reader.ReadVarInt62(&fetch.request_id) ||
!reader.ReadUInt8(&fetch.subscriber_priority) ||
!reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&type)) {
return 0;
@@ -862,23 +862,14 @@
return reader.PreviouslyReadPayload().length();
}
-size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) {
- MoqtFetchCancel fetch_cancel;
- if (!reader.ReadVarInt62(&fetch_cancel.subscribe_id)) {
- return 0;
- }
- visitor_.OnFetchCancelMessage(fetch_cancel);
- return reader.PreviouslyReadPayload().length();
-}
-
size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) {
MoqtFetchOk fetch_ok;
- uint8_t group_order;
+ uint8_t group_order, end_of_track;
KeyValuePairList parameters;
- if (!reader.ReadVarInt62(&fetch_ok.subscribe_id) ||
- !reader.ReadUInt8(&group_order) ||
- !reader.ReadVarInt62(&fetch_ok.largest_id.group) ||
- !reader.ReadVarInt62(&fetch_ok.largest_id.object) ||
+ if (!reader.ReadVarInt62(&fetch_ok.request_id) ||
+ !reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&end_of_track) ||
+ !reader.ReadVarInt62(&fetch_ok.end_location.group) ||
+ !reader.ReadVarInt62(&fetch_ok.end_location.object) ||
!ParseKeyValuePairList(reader, parameters)) {
return 0;
}
@@ -886,12 +877,17 @@
ParseError("Invalid group order value in FETCH_OK");
return 0;
}
+ if (end_of_track > 0x01) {
+ ParseError("Invalid end of track value in FETCH_OK");
+ return 0;
+ }
if (!ValidateVersionSpecificParameters(parameters,
MoqtMessageType::kFetchOk)) {
ParseError("FETCH_OK message contains invalid parameters");
return 0;
}
fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
+ fetch_ok.end_of_track = end_of_track == 1;
if (!KeyValuePairListToVersionSpecificParameters(parameters,
fetch_ok.parameters)) {
return 0;
@@ -903,9 +899,9 @@
size_t MoqtControlParser::ProcessFetchError(quic::QuicDataReader& reader) {
MoqtFetchError fetch_error;
uint64_t error_code;
- if (!reader.ReadVarInt62(&fetch_error.subscribe_id) ||
+ if (!reader.ReadVarInt62(&fetch_error.request_id) ||
!reader.ReadVarInt62(&error_code) ||
- !reader.ReadStringVarInt62(fetch_error.reason_phrase)) {
+ !reader.ReadStringVarInt62(fetch_error.error_reason)) {
return 0;
}
fetch_error.error_code = static_cast<RequestErrorCode>(error_code);
@@ -913,6 +909,15 @@
return reader.PreviouslyReadPayload().length();
}
+size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) {
+ MoqtFetchCancel fetch_cancel;
+ if (!reader.ReadVarInt62(&fetch_cancel.request_id)) {
+ return 0;
+ }
+ visitor_.OnFetchCancelMessage(fetch_cancel);
+ return reader.PreviouslyReadPayload().length();
+}
+
size_t MoqtControlParser::ProcessRequestsBlocked(quic::QuicDataReader& reader) {
MoqtRequestsBlocked requests_blocked;
if (!reader.ReadVarInt62(&requests_blocked.max_request_id)) {
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index ef1450b..55a27a3 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -207,7 +207,7 @@
return;
}
QUICHE_DLOG(INFO) << ENDPOINT
- << "Received OBJECT message in datagram for subscribe_id "
+ << "Received OBJECT message in datagram for request_id "
<< " for track alias " << message.track_alias
<< " with sequence " << message.group_id << ":"
<< message.object_id << " priority "
@@ -485,7 +485,7 @@
}
MoqtFetch message;
message.fetch = StandaloneFetch(name, start, end_group, end_object);
- message.fetch_id = next_request_id_;
+ message.request_id = next_request_id_;
next_request_id_ += 2;
message.subscriber_priority = priority;
message.group_order = delivery_order;
@@ -494,7 +494,7 @@
QUIC_DLOG(INFO) << ENDPOINT << "Sent FETCH message for " << name;
auto fetch = std::make_unique<UpstreamFetch>(
message, std::get<StandaloneFetch>(message.fetch), std::move(callback));
- upstream_by_id_.emplace(message.fetch_id, std::move(fetch));
+ upstream_by_id_.emplace(message.request_id, std::move(fetch));
return true;
}
@@ -546,7 +546,7 @@
return false;
}
MoqtFetch fetch;
- fetch.fetch_id = next_request_id_;
+ fetch.request_id = next_request_id_;
next_request_id_ += 2;
fetch.subscriber_priority = priority;
fetch.group_order = delivery_order;
@@ -556,7 +556,7 @@
QUIC_DLOG(INFO) << ENDPOINT << "Sent Joining FETCH message for " << name;
auto upstream_fetch =
std::make_unique<UpstreamFetch>(fetch, name, std::move(callback));
- upstream_by_id_.emplace(fetch.fetch_id, std::move(upstream_fetch));
+ upstream_by_id_.emplace(fetch.request_id, std::move(upstream_fetch));
return true;
}
@@ -598,7 +598,7 @@
continue;
}
if (fetch->session_->WriteObjectToStream(
- stream_, fetch->fetch_id_, object.metadata,
+ stream_, fetch->request_id(), object.metadata,
std::move(object.payload),
MoqtDataStreamType::kStreamHeaderFetch, !stream_header_written_,
/*fin=*/false)) {
@@ -798,8 +798,8 @@
return it->second;
}
-RemoteTrack* MoqtSession::RemoteTrackById(uint64_t subscribe_id) {
- auto it = upstream_by_id_.find(subscribe_id);
+RemoteTrack* MoqtSession::RemoteTrackById(uint64_t request_id) {
+ auto it = upstream_by_id_.find(request_id);
if (it == upstream_by_id_.end()) {
return nullptr;
}
@@ -853,19 +853,18 @@
}
void MoqtSession::UpdateQueuedSendOrder(
- uint64_t subscribe_id,
- std::optional<webtransport::SendOrder> old_send_order,
+ uint64_t request_id, std::optional<webtransport::SendOrder> old_send_order,
std::optional<webtransport::SendOrder> new_send_order) {
if (old_send_order == new_send_order) {
return;
}
if (old_send_order.has_value()) {
subscribes_with_queued_outgoing_data_streams_.erase(
- SubscriptionWithQueuedStream{*old_send_order, subscribe_id});
+ SubscriptionWithQueuedStream{*old_send_order, request_id});
}
if (new_send_order.has_value()) {
subscribes_with_queued_outgoing_data_streams_.emplace(*new_send_order,
- subscribe_id);
+ request_id);
}
}
@@ -985,12 +984,12 @@
}
void MoqtSession::ControlStream::SendFetchError(
- uint64_t subscribe_id, RequestErrorCode error_code,
- absl::string_view reason_phrase) {
+ uint64_t request_id, RequestErrorCode error_code,
+ absl::string_view error_reason) {
MoqtFetchError fetch_error;
- fetch_error.subscribe_id = subscribe_id;
+ fetch_error.request_id = request_id;
fetch_error.error_code = error_code;
- fetch_error.reason_phrase = reason_phrase;
+ fetch_error.error_reason = error_reason;
SendOrBufferMessage(session_->framer_.SerializeFetchError(fetch_error));
}
@@ -1051,7 +1050,7 @@
RemoteTrack* track = session_->RemoteTrackById(message.request_id);
if (track == nullptr) {
QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
- << "subscribe_id = " << message.request_id
+ << "request_id = " << message.request_id
<< " but no track exists";
// Subscription state might have been destroyed for internal reasons.
return;
@@ -1063,12 +1062,12 @@
}
if (message.largest_location.has_value()) {
QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
- << "subscribe_id = " << message.request_id << " "
+ << "request_id = " << message.request_id << " "
<< track->full_track_name()
<< " largest_id = " << *message.largest_location;
} else {
QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
- << "subscribe_id = " << message.request_id << " "
+ << "request_id = " << message.request_id << " "
<< track->full_track_name();
}
SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track);
@@ -1088,7 +1087,7 @@
RemoteTrack* track = session_->RemoteTrackById(message.request_id);
if (track == nullptr) {
QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for "
- << "subscribe_id = " << message.request_id
+ << "request_id = " << message.request_id
<< " but no track exists";
// Subscription state might have been destroyed for internal reasons.
return;
@@ -1104,7 +1103,7 @@
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for "
- << "subscribe_id = " << message.request_id << " ("
+ << "request_id = " << message.request_id << " ("
<< track->full_track_name() << ")"
<< ", error = " << static_cast<int>(message.error_code)
<< " (" << message.reason_phrase << ")";
@@ -1343,12 +1342,12 @@
}
void MoqtSession::ControlStream::OnFetchMessage(const MoqtFetch& message) {
- if (!session_->ValidateRequestId(message.fetch_id)) {
+ if (!session_->ValidateRequestId(message.request_id)) {
return;
}
if (session_->sent_goaway_) {
QUIC_DLOG(INFO) << ENDPOINT << "Received a FETCH after GOAWAY";
- SendFetchError(message.fetch_id, RequestErrorCode::kUnauthorized,
+ SendFetchError(message.request_id, RequestErrorCode::kUnauthorized,
"FETCH after GOAWAY");
return;
}
@@ -1375,7 +1374,7 @@
QUIC_DLOG(INFO) << ENDPOINT << "Received a JOINING_FETCH for "
<< "subscribe_id " << joining_subscribe_id
<< " that does not exist";
- SendFetchError(message.fetch_id, RequestErrorCode::kTrackDoesNotExist,
+ SendFetchError(message.request_id, RequestErrorCode::kTrackDoesNotExist,
"Joining Fetch for non-existent subscribe");
return;
}
@@ -1419,7 +1418,7 @@
QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name
<< " rejected by the application: "
<< track_publisher.status();
- SendFetchError(message.fetch_id, RequestErrorCode::kTrackDoesNotExist,
+ SendFetchError(message.request_id, RequestErrorCode::kTrackDoesNotExist,
track_publisher.status().message());
return;
}
@@ -1431,23 +1430,23 @@
if (!fetch->GetStatus().ok()) {
QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name
<< " could not initialize the task";
- SendFetchError(message.fetch_id, RequestErrorCode::kInvalidRange,
+ SendFetchError(message.request_id, RequestErrorCode::kInvalidRange,
fetch->GetStatus().message());
return;
}
auto published_fetch = std::make_unique<PublishedFetch>(
- message.fetch_id, session_, std::move(fetch));
- auto result = session_->incoming_fetches_.emplace(message.fetch_id,
+ message.request_id, session_, std::move(fetch));
+ auto result = session_->incoming_fetches_.emplace(message.request_id,
std::move(published_fetch));
if (!result.second) { // Emplace failed.
QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name
<< " could not be added to the session";
- SendFetchError(message.fetch_id, RequestErrorCode::kInternalError,
+ SendFetchError(message.request_id, RequestErrorCode::kInternalError,
"Could not initialize FETCH state");
}
MoqtFetchTask* fetch_task = result.first->second->fetch_task();
fetch_task->SetFetchResponseCallback(
- [this, request_id = message.fetch_id, fetch_start = start_object,
+ [this, request_id = message.request_id, fetch_start = start_object,
fetch_end = Location(end_group, end_object.value_or(UINT64_MAX))](
std::variant<MoqtFetchOk, MoqtFetchError> message) {
if (!session_->incoming_fetches_.contains(request_id)) {
@@ -1455,11 +1454,11 @@
}
if (std::holds_alternative<MoqtFetchOk>(message)) {
MoqtFetchOk& fetch_ok = std::get<MoqtFetchOk>(message);
- fetch_ok.subscribe_id = request_id;
- if (fetch_ok.largest_id < fetch_start ||
- fetch_ok.largest_id > fetch_end) {
+ fetch_ok.request_id = request_id;
+ if (fetch_ok.end_location < fetch_start ||
+ fetch_ok.end_location > fetch_end) {
// TODO(martinduke): Add end_of_track to fetch_ok and check it's
- // larger than largest_id.
+ // larger than end_location.
QUIC_BUG(quic_bug_fetch_ok_status_error)
<< "FETCH_OK end or end_of_track is invalid";
session_->Error(MoqtError::kInternalError, "FETCH_OK status error");
@@ -1469,14 +1468,14 @@
return;
}
MoqtFetchError& fetch_error = std::get<MoqtFetchError>(message);
- fetch_error.subscribe_id = request_id;
+ fetch_error.request_id = request_id;
SendOrBufferMessage(session_->framer_.SerializeFetchError(fetch_error));
});
// Set a temporary new-object callback that creates a data stream. When
// created, the stream visitor will replace this callback.
fetch_task->SetObjectAvailableCallback(
[this, send_order = SendOrderForFetch(message.subscriber_priority),
- request_id = message.fetch_id]() {
+ request_id = message.request_id]() {
auto it = session_->incoming_fetches_.find(request_id);
if (it == session_->incoming_fetches_.end()) {
return;
@@ -1495,10 +1494,10 @@
}
void MoqtSession::ControlStream::OnFetchOkMessage(const MoqtFetchOk& message) {
- RemoteTrack* track = session_->RemoteTrackById(message.subscribe_id);
+ RemoteTrack* track = session_->RemoteTrackById(message.request_id);
if (track == nullptr) {
QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_OK for "
- << "subscribe_id = " << message.subscribe_id
+ << "request_id = " << message.request_id
<< " but no track exists";
// Subscription state might have been destroyed for internal reasons.
return;
@@ -1508,21 +1507,20 @@
"Received FETCH_OK for a SUBSCRIBE");
return;
}
- QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_OK for subscribe_id = "
- << message.subscribe_id << " " << track->full_track_name();
+ QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_OK for request_id = "
+ << message.request_id << " " << track->full_track_name();
UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track);
- fetch->OnFetchResult(message.largest_id, absl::OkStatus(),
- [=, session = session_]() {
- session->CancelFetch(message.subscribe_id);
- });
+ fetch->OnFetchResult(
+ message.end_location, absl::OkStatus(),
+ [=, session = session_]() { session->CancelFetch(message.request_id); });
}
void MoqtSession::ControlStream::OnFetchErrorMessage(
const MoqtFetchError& message) {
- RemoteTrack* track = session_->RemoteTrackById(message.subscribe_id);
+ RemoteTrack* track = session_->RemoteTrackById(message.request_id);
if (track == nullptr) {
QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_ERROR for "
- << "subscribe_id = " << message.subscribe_id
+ << "request_id = " << message.request_id
<< " but no track exists";
// Subscription state might have been destroyed for internal reasons.
return;
@@ -1538,15 +1536,15 @@
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_ERROR for "
- << "subscribe_id = " << message.subscribe_id << " ("
+ << "request_id = " << message.request_id << " ("
<< track->full_track_name() << ")"
<< ", error = " << static_cast<int>(message.error_code)
- << " (" << message.reason_phrase << ")";
+ << " (" << message.error_reason << ")";
UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track);
absl::Status status =
- RequestErrorCodeToStatus(message.error_code, message.reason_phrase);
+ RequestErrorCodeToStatus(message.error_code, message.error_reason);
fetch->OnFetchResult(Location(0, 0), status, nullptr);
- session_->upstream_by_id_.erase(message.subscribe_id);
+ session_->upstream_by_id_.erase(message.request_id);
}
void MoqtSession::ControlStream::OnRequestsBlockedMessage(
@@ -2317,19 +2315,19 @@
return true;
}
-void MoqtSession::CancelFetch(uint64_t subscribe_id) {
+void MoqtSession::CancelFetch(uint64_t request_id) {
if (is_closing_) {
return;
}
// This is only called from the callback where UpstreamFetchTask has been
// destroyed, so there is no need to notify the application.
- upstream_by_id_.erase(subscribe_id);
+ upstream_by_id_.erase(request_id);
ControlStream* stream = GetControlStream();
if (stream == nullptr) {
return;
}
MoqtFetchCancel message;
- message.subscribe_id = subscribe_id;
+ message.request_id = request_id;
stream->SendOrBufferMessage(framer_.SerializeFetchCancel(message));
// The FETCH_CANCEL will cause a RESET_STREAM to return, which would be the
// same as a STOP_SENDING. However, a FETCH_CANCEL works even if the stream
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 356a4eb..8ca159e 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -296,8 +296,8 @@
private:
friend class test::MoqtSessionPeer;
- void SendFetchError(uint64_t subscribe_id, RequestErrorCode error_code,
- absl::string_view reason_phrase);
+ void SendFetchError(uint64_t request_id, RequestErrorCode error_code,
+ absl::string_view error_reason);
MoqtSession* session_;
webtransport::Stream* stream_;
@@ -562,9 +562,11 @@
class QUICHE_EXPORT PublishedFetch {
public:
- PublishedFetch(uint64_t fetch_id, MoqtSession* session,
+ PublishedFetch(uint64_t request_id, MoqtSession* session,
std::unique_ptr<MoqtFetchTask> fetch)
- : session_(session), fetch_(std::move(fetch)), fetch_id_(fetch_id) {}
+ : session_(session),
+ fetch_(std::move(fetch)),
+ request_id_(request_id) {}
class FetchStreamVisitor : public webtransport::StreamVisitor {
public:
@@ -577,7 +579,7 @@
~FetchStreamVisitor() {
std::shared_ptr<PublishedFetch> fetch = fetch_.lock();
if (fetch != nullptr) {
- fetch->session()->incoming_fetches_.erase(fetch->fetch_id_);
+ fetch->session()->incoming_fetches_.erase(fetch->request_id_);
}
}
// webtransport::StreamVisitor implementation.
@@ -597,13 +599,13 @@
MoqtFetchTask* fetch_task() { return fetch_.get(); }
MoqtSession* session() { return session_; }
- uint64_t fetch_id() const { return fetch_id_; }
+ uint64_t request_id() const { return request_id_; }
void SetStreamId(webtransport::StreamId id) { stream_id_ = id; }
private:
MoqtSession* session_;
std::unique_ptr<MoqtFetchTask> fetch_;
- uint64_t fetch_id_;
+ uint64_t request_id_;
// Store the stream ID in case a FETCH_CANCEL requires a reset.
std::optional<webtransport::StreamId> stream_id_;
};
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 80b3520..a89bf05 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -76,7 +76,7 @@
MoqtFetch DefaultFetch() {
MoqtFetch fetch = {
- /*fetch_id=*/1,
+ /*request_id=*/1,
/*subscriber_priority=*/0x80,
/*group_order=*/std::nullopt,
/*fetch=*/
@@ -2264,9 +2264,10 @@
// Compose and send the FETCH_OK.
MoqtFetchOk expected_ok;
- expected_ok.subscribe_id = fetch.fetch_id;
+ expected_ok.request_id = fetch.request_id;
expected_ok.group_order = MoqtDeliveryOrder::kAscending;
- expected_ok.largest_id = Location(1, 4);
+ expected_ok.end_of_track = false;
+ expected_ok.end_location = Location(1, 4);
EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _));
fetch_task->CallFetchResponseCallback(expected_ok);
// Data arrives.
@@ -2288,9 +2289,10 @@
MockTrackPublisher* track = CreateTrackPublisher();
MoqtFetchOk expected_ok;
- expected_ok.subscribe_id = fetch.fetch_id;
+ expected_ok.request_id = fetch.request_id;
expected_ok.group_order = MoqtDeliveryOrder::kAscending;
- expected_ok.largest_id = Location(1, 4);
+ expected_ok.end_of_track = false;
+ expected_ok.end_location = Location(1, 4);
auto fetch_task_ptr =
std::make_unique<MockFetchTask>(expected_ok, std::nullopt, true);
MockFetchTask* fetch_task = fetch_task_ptr.get();
@@ -2329,9 +2331,10 @@
stream_input->OnFetchMessage(fetch);
MoqtFetchOk expected_ok;
- expected_ok.subscribe_id = fetch.fetch_id;
+ expected_ok.request_id = fetch.request_id;
expected_ok.group_order = MoqtDeliveryOrder::kAscending;
- expected_ok.largest_id = Location(1, 4);
+ expected_ok.end_of_track = false;
+ expected_ok.end_location = Location(1, 4);
EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _));
fetch_task->CallFetchResponseCallback(expected_ok);
}
@@ -2355,9 +2358,9 @@
stream_input->OnFetchMessage(fetch);
MoqtFetchError expected_error;
- expected_error.subscribe_id = fetch.fetch_id;
+ expected_error.request_id = fetch.request_id;
expected_error.error_code = RequestErrorCode::kTrackDoesNotExist;
- expected_error.reason_phrase = "foo";
+ expected_error.error_reason = "foo";
EXPECT_CALL(mock_stream_,
Writev(SerializedControlMessage(expected_error), _));
fetch_task->CallFetchResponseCallback(expected_error);
@@ -2370,7 +2373,7 @@
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
MoqtFetch fetch = DefaultFetch();
- fetch.fetch_id = 1; // Too low.
+ fetch.request_id = 1; // Too low.
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kInvalidRequestId),
"Request ID not monotonically increasing"))
@@ -2445,7 +2448,7 @@
// Joining FETCH arrives. The resulting Fetch should begin at (2, 0).
MoqtFetch fetch = DefaultFetch();
- fetch.fetch_id = 3;
+ fetch.request_id = 3;
fetch.fetch = JoiningFetchRelative(1, 2);
EXPECT_CALL(*track, Fetch(Location(2, 0), 4, std::optional<uint64_t>(10), _))
.WillOnce(Return(std::make_unique<MockFetchTask>()));
@@ -2476,7 +2479,7 @@
ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
MoqtFetch fetch = DefaultFetch();
- fetch.fetch_id = 3;
+ fetch.request_id = 3;
fetch.fetch = JoiningFetchRelative(1, 2);
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -2504,7 +2507,7 @@
VersionSpecificParameters(),
};
MoqtFetch expected_fetch = {
- /*fetch_id=*/2,
+ /*request_id=*/2,
/*subscriber_priority=*/0x80,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*fetch=*/JoiningFetchRelative(0, 1),
@@ -2538,11 +2541,11 @@
MoqtDeliveryOrder::kAscending, Location(2, 0),
VersionSpecificParameters()));
stream_input->OnFetchOkMessage(MoqtFetchOk(2, MoqtDeliveryOrder::kAscending,
- Location(2, 0),
+ false, Location(2, 0),
VersionSpecificParameters()));
// Packet arrives on FETCH stream.
MoqtObject object = {
- /*fetch_id=*/2,
+ /*request_id=*/2,
/*group_id, object_id=*/0,
0,
/*publisher_priority=*/128,
@@ -2627,7 +2630,8 @@
MoqtFetchOk ok = {
/*request_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
- /*largest_id=*/Location(3, 25),
+ /*end_of_track=*/false,
+ /*end_location=*/Location(3, 25),
VersionSpecificParameters(),
};
stream_input->OnFetchOkMessage(ok);
@@ -2726,7 +2730,8 @@
MoqtFetchOk ok = {
/*request_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
- /*largest_id=*/Location(3, 25),
+ /*end_of_track=*/false,
+ /*end_location=*/Location(3, 25),
VersionSpecificParameters(),
};
stream_input->OnFetchOkMessage(ok);
@@ -2796,7 +2801,8 @@
MoqtFetchOk ok = {
/*request_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
- /*largest_id=*/Location(3, 25),
+ /*end_of_track=*/false,
+ /*end_location=*/Location(3, 25),
VersionSpecificParameters(),
};
stream_input->OnFetchOkMessage(ok);
@@ -3194,7 +3200,7 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kFetchError), _));
MoqtFetch fetch = DefaultFetch();
- fetch.fetch_id = 3;
+ fetch.request_id = 3;
stream_input->OnFetchMessage(fetch);
EXPECT_CALL(
mock_stream_,
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 6f5b96c..4c81f37 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -236,7 +236,7 @@
UpstreamFetch(const MoqtFetch& fetch, const StandaloneFetch standalone,
FetchResponseCallback callback)
: RemoteTrack(
- standalone.full_track_name, fetch.fetch_id,
+ standalone.full_track_name, fetch.request_id,
SubscribeWindow(standalone.start_object, standalone.end_group,
standalone.end_object),
fetch.subscriber_priority),
@@ -247,7 +247,7 @@
// Relative Joining Fetch constructor
UpstreamFetch(const MoqtFetch& fetch, FullTrackName full_track_name,
FetchResponseCallback callback)
- : RemoteTrack(full_track_name, fetch.fetch_id,
+ : RemoteTrack(full_track_name, fetch.request_id,
SubscribeWindow(Location(0, 0)), fetch.subscriber_priority),
ok_callback_(std::move(callback)) {
// Immediately set the data stream type.
@@ -258,7 +258,7 @@
JoiningFetchAbsolute absolute_joining,
FetchResponseCallback callback)
: RemoteTrack(
- full_track_name, fetch.fetch_id,
+ full_track_name, fetch.request_id,
SubscribeWindow(Location(absolute_joining.joining_start, 0)),
fetch.subscriber_priority),
ok_callback_(std::move(callback)) {
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 467c2fb..a0dc737 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -199,8 +199,8 @@
// Initialize the fetch task
fetch->OnFetchResult(
Location{4, 10}, absl::OkStatus(),
- [=, session_ptr = session, fetch_id = fetch_message.fetch_id]() {
- session_ptr->CancelFetch(fetch_id);
+ [=, session_ptr = session, request_id = fetch_message.request_id]() {
+ session_ptr->CancelFetch(request_id);
});
;
auto mock_session =
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 1ad79b2..4497674 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -1309,8 +1309,8 @@
}
bool EqualFieldValues(MessageStructuredData& values) const override {
auto cast = std::get<MoqtFetch>(values);
- if (cast.fetch_id != fetch_.fetch_id) {
- QUIC_LOG(INFO) << "FETCH fetch_id mismatch";
+ if (cast.request_id != fetch_.request_id) {
+ QUIC_LOG(INFO) << "FETCH request_id mismatch";
return false;
}
if (cast.subscriber_priority != fetch_.subscriber_priority) {
@@ -1359,7 +1359,7 @@
private:
uint8_t raw_packet_[28] = {
0x16, 0x00, 0x19,
- 0x01, // fetch_id = 1
+ 0x01, // request_id = 1
0x02, // priority = kHigh
0x01, // group_order = kAscending
0x01, // type = kStandalone
@@ -1371,7 +1371,7 @@
};
MoqtFetch fetch_ = {
- /*fetch_id =*/1,
+ /*request_id=*/1,
/*subscriber_priority=*/2,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*fetch =*/
@@ -1394,8 +1394,8 @@
}
bool EqualFieldValues(MessageStructuredData& values) const override {
auto cast = std::get<MoqtFetch>(values);
- if (cast.fetch_id != fetch_.fetch_id) {
- QUIC_LOG(INFO) << "FETCH fetch_id mismatch";
+ if (cast.request_id != fetch_.request_id) {
+ QUIC_LOG(INFO) << "FETCH request_id mismatch";
return false;
}
if (cast.subscriber_priority != fetch_.subscriber_priority) {
@@ -1433,7 +1433,7 @@
private:
uint8_t raw_packet_[17] = {
0x16, 0x00, 0x0e,
- 0x01, // fetch_id = 1
+ 0x01, // request_id = 1
0x02, // priority = kHigh
0x01, // group_order = kAscending
0x02, // type = kRelativeJoining
@@ -1442,7 +1442,7 @@
};
MoqtFetch fetch_ = {
- /*fetch_id =*/1,
+ /*request_id =*/1,
/*subscriber_priority=*/2,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*fetch=*/JoiningFetchRelative{2, 2},
@@ -1459,8 +1459,8 @@
}
bool EqualFieldValues(MessageStructuredData& values) const override {
auto cast = std::get<MoqtFetch>(values);
- if (cast.fetch_id != fetch_.fetch_id) {
- QUIC_LOG(INFO) << "FETCH fetch_id mismatch";
+ if (cast.request_id != fetch_.request_id) {
+ QUIC_LOG(INFO) << "FETCH request_id mismatch";
return false;
}
if (cast.subscriber_priority != fetch_.subscriber_priority) {
@@ -1498,7 +1498,7 @@
private:
uint8_t raw_packet_[17] = {
0x16, 0x00, 0x0e,
- 0x01, // fetch_id = 1
+ 0x01, // request_id = 1
0x02, // priority = kHigh
0x01, // group_order = kAscending
0x03, // type = kAbsoluteJoining
@@ -1507,7 +1507,7 @@
};
MoqtFetch fetch_ = {
- /*fetch_id =*/1,
+ /*request_id=*/1,
/*subscriber_priority=*/2,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*fetch=*/JoiningFetchAbsolute{2, 2},
@@ -1515,6 +1515,105 @@
};
};
+class QUICHE_NO_EXPORT FetchOkMessage : public TestMessageBase {
+ public:
+ FetchOkMessage() : TestMessageBase() {
+ SetWireImage(raw_packet_, sizeof(raw_packet_));
+ }
+ bool EqualFieldValues(MessageStructuredData& values) const override {
+ auto cast = std::get<MoqtFetchOk>(values);
+ if (cast.request_id != fetch_ok_.request_id) {
+ QUIC_LOG(INFO) << "FETCH_OK request_id mismatch";
+ return false;
+ }
+ if (cast.group_order != fetch_ok_.group_order) {
+ QUIC_LOG(INFO) << "FETCH_OK group_order mismatch";
+ return false;
+ }
+ if (cast.end_of_track != fetch_ok_.end_of_track) {
+ QUIC_LOG(INFO) << "FETCH_OK end_of_track mismatch";
+ return false;
+ }
+ if (cast.end_location != fetch_ok_.end_location) {
+ QUIC_LOG(INFO) << "FETCH_OK end_location mismatch";
+ return false;
+ }
+ if (cast.parameters != fetch_ok_.parameters) {
+ QUIC_LOG(INFO) << "FETCH_OK parameters mismatch";
+ return false;
+ }
+ return true;
+ }
+
+ void ExpandVarints() override { ExpandVarintsImpl("v--vvvvv---"); }
+
+ MessageStructuredData structured_data() const override {
+ return TestMessageBase::MessageStructuredData(fetch_ok_);
+ }
+
+ private:
+ uint8_t raw_packet_[12] = {
+ 0x18, 0x00, 0x09,
+ 0x01, // request_id = 1
+ 0x01, // group_order = kAscending
+ 0x00, // end_of_track = false
+ 0x05, 0x04, // end_location = 5, 4
+ 0x01, 0x04, 0x67, 0x10, // MaxCacheDuration = 10000
+ };
+
+ MoqtFetchOk fetch_ok_ = {
+ /*request_id =*/1,
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
+ /*end_of_track=*/false,
+ /*end_location=*/Location{5, 4},
+ VersionSpecificParameters(quic::QuicTimeDelta::Infinite(),
+ quic::QuicTimeDelta::FromMilliseconds(10000)),
+ };
+};
+
+class QUICHE_NO_EXPORT FetchErrorMessage : public TestMessageBase {
+ public:
+ FetchErrorMessage() : TestMessageBase() {
+ SetWireImage(raw_packet_, sizeof(raw_packet_));
+ }
+ bool EqualFieldValues(MessageStructuredData& values) const override {
+ auto cast = std::get<MoqtFetchError>(values);
+ if (cast.request_id != fetch_error_.request_id) {
+ QUIC_LOG(INFO) << "FETCH_ERROR request_id mismatch";
+ return false;
+ }
+ if (cast.error_code != fetch_error_.error_code) {
+ QUIC_LOG(INFO) << "FETCH_ERROR group_order mismatch";
+ return false;
+ }
+ if (cast.error_reason != fetch_error_.error_reason) {
+ QUIC_LOG(INFO) << "FETCH_ERROR error_reason mismatch";
+ return false;
+ }
+ return true;
+ }
+
+ void ExpandVarints() override { ExpandVarintsImpl("vvv---"); }
+
+ MessageStructuredData structured_data() const override {
+ return TestMessageBase::MessageStructuredData(fetch_error_);
+ }
+
+ private:
+ uint8_t raw_packet_[9] = {
+ 0x19, 0x00, 0x06,
+ 0x01, // request_id = 1
+ 0x01, // error_code = kUnauthorized
+ 0x03, 0x62, 0x61, 0x72, // error_reason = "bar"
+ };
+
+ MoqtFetchError fetch_error_ = {
+ /*request_id =*/1,
+ /*error_code=*/RequestErrorCode::kUnauthorized,
+ /*error_reason=*/"bar",
+ };
+};
+
class QUICHE_NO_EXPORT FetchCancelMessage : public TestMessageBase {
public:
FetchCancelMessage() : TestMessageBase() {
@@ -1522,7 +1621,7 @@
}
bool EqualFieldValues(MessageStructuredData& values) const override {
auto cast = std::get<MoqtFetchCancel>(values);
- if (cast.subscribe_id != fetch_cancel_.subscribe_id) {
+ if (cast.request_id != fetch_cancel_.request_id) {
QUIC_LOG(INFO) << "FETCH_CANCEL subscribe_id mismatch";
return false;
}
@@ -1538,104 +1637,11 @@
private:
uint8_t raw_packet_[4] = {
0x17, 0x00, 0x01,
- 0x01, // subscribe_id = 1
+ 0x01, // request_id = 1
};
MoqtFetchCancel fetch_cancel_ = {
- /*subscribe_id =*/1,
- };
-};
-
-class QUICHE_NO_EXPORT FetchOkMessage : public TestMessageBase {
- public:
- FetchOkMessage() : TestMessageBase() {
- SetWireImage(raw_packet_, sizeof(raw_packet_));
- }
- bool EqualFieldValues(MessageStructuredData& values) const override {
- auto cast = std::get<MoqtFetchOk>(values);
- if (cast.subscribe_id != fetch_ok_.subscribe_id) {
- QUIC_LOG(INFO) << "FETCH_OK subscribe_id mismatch";
- return false;
- }
- if (cast.group_order != fetch_ok_.group_order) {
- QUIC_LOG(INFO) << "FETCH_OK group_order mismatch";
- return false;
- }
- if (cast.largest_id != fetch_ok_.largest_id) {
- QUIC_LOG(INFO) << "FETCH_OK start_object mismatch";
- return false;
- }
- if (cast.parameters != fetch_ok_.parameters) {
- QUIC_LOG(INFO) << "FETCH_OK parameters mismatch";
- return false;
- }
- return true;
- }
-
- void ExpandVarints() override { ExpandVarintsImpl("v-vvvvv---"); }
-
- MessageStructuredData structured_data() const override {
- return TestMessageBase::MessageStructuredData(fetch_ok_);
- }
-
- private:
- uint8_t raw_packet_[11] = {
- 0x18, 0x00, 0x08,
- 0x01, // subscribe_id = 1
- 0x01, // group_order = kAscending
- 0x05, 0x04, // largest_object = 5, 4
- 0x01, 0x04, 0x67, 0x10, // MaxCacheDuration = 10000
- };
-
- MoqtFetchOk fetch_ok_ = {
- /*subscribe_id =*/1,
- /*group_order=*/MoqtDeliveryOrder::kAscending,
- /*start_object=*/Location{5, 4},
- VersionSpecificParameters(quic::QuicTimeDelta::Infinite(),
- quic::QuicTimeDelta::FromMilliseconds(10000)),
- };
-};
-
-class QUICHE_NO_EXPORT FetchErrorMessage : public TestMessageBase {
- public:
- FetchErrorMessage() : TestMessageBase() {
- SetWireImage(raw_packet_, sizeof(raw_packet_));
- }
- bool EqualFieldValues(MessageStructuredData& values) const override {
- auto cast = std::get<MoqtFetchError>(values);
- if (cast.subscribe_id != fetch_error_.subscribe_id) {
- QUIC_LOG(INFO) << "FETCH_ERROR subscribe_id mismatch";
- return false;
- }
- if (cast.error_code != fetch_error_.error_code) {
- QUIC_LOG(INFO) << "FETCH_ERROR group_order mismatch";
- return false;
- }
- if (cast.reason_phrase != fetch_error_.reason_phrase) {
- QUIC_LOG(INFO) << "FETCH_ERROR reason_phrase mismatch";
- return false;
- }
- return true;
- }
-
- void ExpandVarints() override { ExpandVarintsImpl("vvv---"); }
-
- MessageStructuredData structured_data() const override {
- return TestMessageBase::MessageStructuredData(fetch_error_);
- }
-
- private:
- uint8_t raw_packet_[9] = {
- 0x19, 0x00, 0x06,
- 0x01, // subscribe_id = 1
- 0x01, // error_code = kUnauthorized
- 0x03, 0x62, 0x61, 0x72, // reason_phrase = "bar"
- };
-
- MoqtFetchError fetch_error_ = {
- /*subscribe_id =*/1,
- /*error_code=*/RequestErrorCode::kUnauthorized,
- /*reason_phrase=*/"bar",
+ /*request_id =*/1,
};
};