Update MoQT TRACK_STATUS{_REQUEST} to draft-11.
These messages will change substantially in draft-13, so this is mostly useful for any interop at IETF 123, which will be draft-12.
PiperOrigin-RevId: 781190930
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index db73875..1d5225c 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -602,6 +602,7 @@
return quiche::QuicheBuffer();
}
return SerializeControlMessage(MoqtMessageType::kTrackStatusRequest,
+ WireVarInt62(message.request_id),
WireFullTrackName(message.full_track_name),
WireKeyValuePairList(parameters));
}
@@ -622,10 +623,12 @@
<< "Serializing invalid MoQT parameters";
return quiche::QuicheBuffer();
}
- return SerializeControlMessage(
- MoqtMessageType::kTrackStatus, WireFullTrackName(message.full_track_name),
- WireVarInt62(message.status_code), WireVarInt62(message.last_group),
- WireVarInt62(message.last_object), WireKeyValuePairList(parameters));
+ return SerializeControlMessage(MoqtMessageType::kTrackStatus,
+ WireVarInt62(message.request_id),
+ WireVarInt62(message.status_code),
+ WireVarInt62(message.largest_location.group),
+ WireVarInt62(message.largest_location.object),
+ WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeGoAway(const MoqtGoAway& message) {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index d6d27fd..cd008cb 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -661,11 +661,16 @@
return false;
}
-struct QUICHE_EXPORT MoqtTrackStatus {
+struct QUICHE_EXPORT MoqtTrackStatusRequest {
+ uint64_t request_id;
FullTrackName full_track_name;
+ VersionSpecificParameters parameters;
+};
+
+struct QUICHE_EXPORT MoqtTrackStatus {
+ uint64_t request_id;
MoqtTrackStatusCode status_code;
- uint64_t last_group;
- uint64_t last_object;
+ Location largest_location;
VersionSpecificParameters parameters;
};
@@ -675,11 +680,6 @@
std::string reason_phrase;
};
-struct QUICHE_EXPORT MoqtTrackStatusRequest {
- FullTrackName full_track_name;
- VersionSpecificParameters parameters;
-};
-
struct QUICHE_EXPORT MoqtGoAway {
std::string new_session_uri;
};
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 52de027..9e1a0b0 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -650,6 +650,9 @@
size_t MoqtControlParser::ProcessTrackStatusRequest(
quic::QuicDataReader& reader) {
MoqtTrackStatusRequest track_status_request;
+ if (!reader.ReadVarInt62(&track_status_request.request_id)) {
+ return 0;
+ }
if (!ReadFullTrackName(reader, track_status_request.full_track_name)) {
return 0;
}
@@ -681,16 +684,14 @@
size_t MoqtControlParser::ProcessTrackStatus(quic::QuicDataReader& reader) {
MoqtTrackStatus track_status;
- if (!ReadFullTrackName(reader, track_status.full_track_name)) {
+ uint64_t status_code;
+ if (!reader.ReadVarInt62(&track_status.request_id) ||
+ !reader.ReadVarInt62(&status_code) ||
+ !reader.ReadVarInt62(&track_status.largest_location.group) ||
+ !reader.ReadVarInt62(&track_status.largest_location.object)) {
return 0;
}
- uint64_t value;
- if (!reader.ReadVarInt62(&value) ||
- !reader.ReadVarInt62(&track_status.last_group) ||
- !reader.ReadVarInt62(&track_status.last_object)) {
- return 0;
- }
- track_status.status_code = static_cast<MoqtTrackStatusCode>(value);
+ track_status.status_code = static_cast<MoqtTrackStatusCode>(status_code);
KeyValuePairList parameters;
if (!ParseKeyValuePairList(reader, parameters)) {
return 0;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 55a27a3..6a31483 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -1231,6 +1231,34 @@
session_->outgoing_announces_.erase(it);
}
+void MoqtSession::ControlStream::OnTrackStatusRequestMessage(
+ const MoqtTrackStatusRequest& message) {
+ if (!session_->ValidateRequestId(message.request_id)) {
+ return;
+ }
+ if (session_->sent_goaway_) {
+ QUIC_DLOG(INFO) << ENDPOINT
+ << "Received a TRACK_STATUS_REQUEST after GOAWAY";
+ SendOrBufferMessage(session_->framer_.SerializeTrackStatus(
+ MoqtTrackStatus(message.request_id, MoqtTrackStatusCode::kDoesNotExist,
+ Location(0, 0))));
+ return;
+ }
+ // TODO(martinduke): Handle authentication.
+ absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> track =
+ session_->publisher_->GetTrack(message.full_track_name);
+ if (!track.ok()) {
+ SendOrBufferMessage(session_->framer_.SerializeTrackStatus(
+ MoqtTrackStatus(message.request_id, MoqtTrackStatusCode::kDoesNotExist,
+ Location(0, 0))));
+ return;
+ }
+ session_->incoming_track_status_.emplace(
+ std::pair<uint64_t, DownstreamTrackStatus>(
+ message.request_id,
+ DownstreamTrackStatus(message.request_id, session_, track->get())));
+}
+
void MoqtSession::ControlStream::OnUnannounceMessage(
const MoqtUnannounce& message) {
session_->callbacks_.incoming_announce_callback(message.track_namespace,
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 8ca159e..775ed5e 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -13,15 +13,15 @@
#include <utility>
#include <vector>
+#include "absl/base/nullability.h"
#include "absl/container/btree_map.h"
#include "absl/container/btree_set.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
+#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_alarm.h"
#include "quiche/quic/core/quic_alarm_factory.h"
-#include "quiche/quic/core/quic_clock.h"
-#include "quiche/quic/core/quic_default_clock.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_framer.h"
@@ -35,7 +35,6 @@
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/quiche_buffer_allocator.h"
-#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/quiche_mem_slice.h"
#include "quiche/common/quiche_weak_ptr.h"
#include "quiche/web_transport/web_transport.h"
@@ -251,7 +250,7 @@
void OnAnnounceErrorMessage(const MoqtAnnounceError& message) override;
void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) override;
void OnTrackStatusRequestMessage(
- const MoqtTrackStatusRequest& message) override {};
+ const MoqtTrackStatusRequest& message) override;
void OnUnannounceMessage(const MoqtUnannounce& /*message*/) override;
void OnTrackStatusMessage(const MoqtTrackStatus& message) override {}
void OnGoAwayMessage(const MoqtGoAway& /*message*/) override;
@@ -610,6 +609,70 @@
std::optional<webtransport::StreamId> stream_id_;
};
+ class QUICHE_EXPORT DownstreamTrackStatus : public MoqtObjectListener {
+ public:
+ DownstreamTrackStatus(uint64_t request_id,
+ MoqtSession* absl_nonnull session,
+ MoqtTrackPublisher* absl_nonnull publisher)
+ : request_id_(request_id), session_(session), publisher_(publisher) {
+ publisher_->AddObjectListener(this);
+ }
+ ~DownstreamTrackStatus() {
+ if (publisher_ != nullptr) {
+ publisher_->RemoveObjectListener(this);
+ }
+ }
+
+ void OnSubscribeAccepted() override {
+ MoqtTrackStatus track_status;
+ track_status.request_id = request_id_;
+ QUICHE_CHECK(publisher_ != nullptr);
+ absl::StatusOr<MoqtTrackStatusCode> status = publisher_->GetTrackStatus();
+ if (!status.ok()) {
+ session_->Error(MoqtError::kInternalError,
+ "Failed to get track status");
+ return;
+ }
+ track_status.status_code = *status;
+ if (*status != MoqtTrackStatusCode::kDoesNotExist &&
+ *status != MoqtTrackStatusCode::kNotYetBegun) {
+ track_status.largest_location = publisher_->GetLargestLocation();
+ } // Else, leave it at (0,0).
+ session_->SendControlMessage(
+ session_->framer_.SerializeTrackStatus(track_status));
+ session_->incoming_track_status_.erase(request_id_);
+ // No class access below this line!
+ }
+
+ // TODO(martinduke): In draft-13, this will trigger TRACK_STATUS_ERROR.
+ void OnSubscribeRejected(MoqtSubscribeErrorReason /*error_code*/,
+ std::optional<uint64_t> /*track_alias*/) override {
+ OnSubscribeAccepted();
+ }
+
+ void OnNewObjectAvailable(Location sequence, uint64_t subgroup) override {}
+ void OnNewFinAvailable(Location location, uint64_t subgroup) override {}
+ void OnSubgroupAbandoned(
+ uint64_t group, uint64_t subgroup,
+ webtransport::StreamErrorCode error_code) override {}
+ void OnGroupAbandoned(uint64_t group_id) override {}
+ void OnTrackPublisherGone() override {
+ publisher_ = nullptr;
+ MoqtTrackStatus track_status;
+ track_status.request_id = request_id_;
+ track_status.status_code = MoqtTrackStatusCode::kDoesNotExist;
+ track_status.largest_location = Location(0, 0);
+ session_->SendControlMessage(
+ session_->framer_.SerializeTrackStatus(track_status));
+ session_->incoming_track_status_.erase(request_id_);
+ }
+
+ private:
+ uint64_t request_id_;
+ MoqtSession* session_;
+ MoqtTrackPublisher* publisher_;
+ };
+
class GoAwayTimeoutDelegate : public quic::QuicAlarm::DelegateWithoutContext {
public:
explicit GoAwayTimeoutDelegate(MoqtSession* session) : session_(session) {}
@@ -750,6 +813,8 @@
absl::flat_hash_map<uint64_t, std::shared_ptr<PublishedFetch>>
incoming_fetches_;
+ absl::flat_hash_map<uint64_t, DownstreamTrackStatus> incoming_track_status_;
+
// Monitoring interfaces for expected incoming subscriptions.
absl::flat_hash_map<FullTrackName, MoqtPublishingMonitorInterface*>
monitoring_interfaces_for_published_tracks_;
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 4497674..fcfbce5 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -972,6 +972,10 @@
bool EqualFieldValues(MessageStructuredData& values) const override {
auto cast = std::get<MoqtTrackStatusRequest>(values);
+ if (cast.request_id != track_status_request_.request_id) {
+ QUIC_LOG(INFO) << "TRACK STATUS REQUEST request ID mismatch";
+ return false;
+ }
if (cast.full_track_name != track_status_request_.full_track_name) {
QUIC_LOG(INFO) << "TRACK STATUS REQUEST track name mismatch";
return false;
@@ -983,22 +987,23 @@
return true;
}
- void ExpandVarints() override { ExpandVarintsImpl("vv---v----vvv-----"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvv---v----vvv-----"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(track_status_request_);
}
private:
- uint8_t raw_packet_[21] = {
- 0x0d, 0x00, 0x12, 0x01, 0x03, 0x66, 0x6f,
- 0x6f, // track_namespace = "foo"
+ uint8_t raw_packet_[22] = {
+ 0x0d, 0x00, 0x13, 0x02, // request_id = 2
+ 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x01, // 1 parameter
0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar"
};
MoqtTrackStatusRequest track_status_request_ = {
+ /*request_id=*/2,
FullTrackName("foo", "abcd"),
VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"),
};
@@ -1043,20 +1048,16 @@
bool EqualFieldValues(MessageStructuredData& values) const override {
auto cast = std::get<MoqtTrackStatus>(values);
- if (cast.full_track_name != track_status_.full_track_name) {
- QUIC_LOG(INFO) << "TRACK STATUS track name mismatch";
+ if (cast.request_id != track_status_.request_id) {
+ QUIC_LOG(INFO) << "TRACK STATUS request ID mismatch";
return false;
}
if (cast.status_code != track_status_.status_code) {
QUIC_LOG(INFO) << "TRACK STATUS code mismatch";
return false;
}
- if (cast.last_group != track_status_.last_group) {
- QUIC_LOG(INFO) << "TRACK STATUS last group mismatch";
- return false;
- }
- if (cast.last_object != track_status_.last_object) {
- QUIC_LOG(INFO) << "TRACK STATUS last object mismatch";
+ if (cast.largest_location != track_status_.largest_location) {
+ QUIC_LOG(INFO) << "TRACK STATUS largest location mismatch";
return false;
}
if (cast.parameters != track_status_.parameters) {
@@ -1066,28 +1067,25 @@
return true;
}
- void ExpandVarints() override { ExpandVarintsImpl("vv---v----vvvvv--v--"); }
+ void ExpandVarints() override { ExpandVarintsImpl("v-vvvv--v--"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(track_status_);
}
private:
- uint8_t raw_packet_[23] = {
- 0x0e, 0x00, 0x14, 0x01, 0x03,
- 0x66, 0x6f, 0x6f, // track_namespace = "foo"
- 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x00, 0x0c, 0x14, // status, last_group, last_object
- 0x02, // 2 parameters
- 0x02, 0x67, 0x10, // Delivery Timeout = 10000
- 0x04, 0x67, 0x10, // Max Cache Duration = 10000
+ uint8_t raw_packet_[14] = {
+ 0x0e, 0x00, 0x0b, 0x02, // request_id = 2
+ 0x00, 0x0c, 0x14, // status, last_group, last_object
+ 0x02, // 2 parameters
+ 0x02, 0x67, 0x10, // Delivery Timeout = 10000
+ 0x04, 0x67, 0x10, // Max Cache Duration = 10000
};
MoqtTrackStatus track_status_ = {
- FullTrackName("foo", "abcd"),
+ /*request_id=*/2,
/*status_code=*/MoqtTrackStatusCode::kInProgress,
- /*last_group=*/12,
- /*last_object=*/20,
+ /*largest_location=*/Location(12, 20),
VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000),
quic::QuicTimeDelta::FromMilliseconds(10000)),
};