Update MoQT Session/Application APIs to support relays.
Eliminate some circular dependencies that result by moving certain items to other header files.
PiperOrigin-RevId: 803457806
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 0d3e7ad..84c347e 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1549,17 +1549,18 @@
]
moqt_hdrs = [
"quic/moqt/moqt_bitrate_adjuster.h",
- "quic/moqt/moqt_cached_object.h",
- "quic/moqt/moqt_failed_fetch.h",
+ "quic/moqt/moqt_fetch_task.h",
"quic/moqt/moqt_framer.h",
"quic/moqt/moqt_known_track_publisher.h",
"quic/moqt/moqt_live_relay_queue.h",
"quic/moqt/moqt_messages.h",
+ "quic/moqt/moqt_object.h",
"quic/moqt/moqt_outgoing_queue.h",
"quic/moqt/moqt_parser.h",
"quic/moqt/moqt_priority.h",
"quic/moqt/moqt_probe_manager.h",
"quic/moqt/moqt_publisher.h",
+ "quic/moqt/moqt_relay_publisher.h",
"quic/moqt/moqt_relay_track_publisher.h",
"quic/moqt/moqt_session.h",
"quic/moqt/moqt_session_callbacks.h",
@@ -1582,7 +1583,6 @@
moqt_srcs = [
"quic/moqt/moqt_bitrate_adjuster.cc",
"quic/moqt/moqt_bitrate_adjuster_test.cc",
- "quic/moqt/moqt_cached_object.cc",
"quic/moqt/moqt_framer.cc",
"quic/moqt/moqt_framer_test.cc",
"quic/moqt/moqt_integration_test.cc",
@@ -1591,6 +1591,7 @@
"quic/moqt/moqt_live_relay_queue_test.cc",
"quic/moqt/moqt_messages.cc",
"quic/moqt/moqt_messages_test.cc",
+ "quic/moqt/moqt_object.cc",
"quic/moqt/moqt_outgoing_queue.cc",
"quic/moqt/moqt_outgoing_queue_test.cc",
"quic/moqt/moqt_parser.cc",
@@ -1600,6 +1601,7 @@
"quic/moqt/moqt_priority_test.cc",
"quic/moqt/moqt_probe_manager.cc",
"quic/moqt/moqt_probe_manager_test.cc",
+ "quic/moqt/moqt_relay_publisher.cc",
"quic/moqt/moqt_relay_track_publisher.cc",
"quic/moqt/moqt_relay_track_publisher_test.cc",
"quic/moqt/moqt_session.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 0b489e9..09c6eca 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1553,17 +1553,18 @@
]
moqt_hdrs = [
"src/quiche/quic/moqt/moqt_bitrate_adjuster.h",
- "src/quiche/quic/moqt/moqt_cached_object.h",
- "src/quiche/quic/moqt/moqt_failed_fetch.h",
+ "src/quiche/quic/moqt/moqt_fetch_task.h",
"src/quiche/quic/moqt/moqt_framer.h",
"src/quiche/quic/moqt/moqt_known_track_publisher.h",
"src/quiche/quic/moqt/moqt_live_relay_queue.h",
"src/quiche/quic/moqt/moqt_messages.h",
+ "src/quiche/quic/moqt/moqt_object.h",
"src/quiche/quic/moqt/moqt_outgoing_queue.h",
"src/quiche/quic/moqt/moqt_parser.h",
"src/quiche/quic/moqt/moqt_priority.h",
"src/quiche/quic/moqt/moqt_probe_manager.h",
"src/quiche/quic/moqt/moqt_publisher.h",
+ "src/quiche/quic/moqt/moqt_relay_publisher.h",
"src/quiche/quic/moqt/moqt_relay_track_publisher.h",
"src/quiche/quic/moqt/moqt_session.h",
"src/quiche/quic/moqt/moqt_session_callbacks.h",
@@ -1586,7 +1587,6 @@
moqt_srcs = [
"src/quiche/quic/moqt/moqt_bitrate_adjuster.cc",
"src/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc",
- "src/quiche/quic/moqt/moqt_cached_object.cc",
"src/quiche/quic/moqt/moqt_framer.cc",
"src/quiche/quic/moqt/moqt_framer_test.cc",
"src/quiche/quic/moqt/moqt_integration_test.cc",
@@ -1595,6 +1595,7 @@
"src/quiche/quic/moqt/moqt_live_relay_queue_test.cc",
"src/quiche/quic/moqt/moqt_messages.cc",
"src/quiche/quic/moqt/moqt_messages_test.cc",
+ "src/quiche/quic/moqt/moqt_object.cc",
"src/quiche/quic/moqt/moqt_outgoing_queue.cc",
"src/quiche/quic/moqt/moqt_outgoing_queue_test.cc",
"src/quiche/quic/moqt/moqt_parser.cc",
@@ -1604,6 +1605,7 @@
"src/quiche/quic/moqt/moqt_priority_test.cc",
"src/quiche/quic/moqt/moqt_probe_manager.cc",
"src/quiche/quic/moqt/moqt_probe_manager_test.cc",
+ "src/quiche/quic/moqt/moqt_relay_publisher.cc",
"src/quiche/quic/moqt/moqt_relay_track_publisher.cc",
"src/quiche/quic/moqt/moqt_relay_track_publisher_test.cc",
"src/quiche/quic/moqt/moqt_session.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 2bfce87..af36bae 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1552,17 +1552,18 @@
],
"moqt_hdrs": [
"quiche/quic/moqt/moqt_bitrate_adjuster.h",
- "quiche/quic/moqt/moqt_cached_object.h",
- "quiche/quic/moqt/moqt_failed_fetch.h",
+ "quiche/quic/moqt/moqt_fetch_task.h",
"quiche/quic/moqt/moqt_framer.h",
"quiche/quic/moqt/moqt_known_track_publisher.h",
"quiche/quic/moqt/moqt_live_relay_queue.h",
"quiche/quic/moqt/moqt_messages.h",
+ "quiche/quic/moqt/moqt_object.h",
"quiche/quic/moqt/moqt_outgoing_queue.h",
"quiche/quic/moqt/moqt_parser.h",
"quiche/quic/moqt/moqt_priority.h",
"quiche/quic/moqt/moqt_probe_manager.h",
"quiche/quic/moqt/moqt_publisher.h",
+ "quiche/quic/moqt/moqt_relay_publisher.h",
"quiche/quic/moqt/moqt_relay_track_publisher.h",
"quiche/quic/moqt/moqt_session.h",
"quiche/quic/moqt/moqt_session_callbacks.h",
@@ -1585,7 +1586,6 @@
"moqt_srcs": [
"quiche/quic/moqt/moqt_bitrate_adjuster.cc",
"quiche/quic/moqt/moqt_bitrate_adjuster_test.cc",
- "quiche/quic/moqt/moqt_cached_object.cc",
"quiche/quic/moqt/moqt_framer.cc",
"quiche/quic/moqt/moqt_framer_test.cc",
"quiche/quic/moqt/moqt_integration_test.cc",
@@ -1594,6 +1594,7 @@
"quiche/quic/moqt/moqt_live_relay_queue_test.cc",
"quiche/quic/moqt/moqt_messages.cc",
"quiche/quic/moqt/moqt_messages_test.cc",
+ "quiche/quic/moqt/moqt_object.cc",
"quiche/quic/moqt/moqt_outgoing_queue.cc",
"quiche/quic/moqt/moqt_outgoing_queue_test.cc",
"quiche/quic/moqt/moqt_parser.cc",
@@ -1603,6 +1604,7 @@
"quiche/quic/moqt/moqt_priority_test.cc",
"quiche/quic/moqt/moqt_probe_manager.cc",
"quiche/quic/moqt/moqt_probe_manager_test.cc",
+ "quiche/quic/moqt/moqt_relay_publisher.cc",
"quiche/quic/moqt/moqt_relay_track_publisher.cc",
"quiche/quic/moqt/moqt_relay_track_publisher_test.cc",
"quiche/quic/moqt/moqt_session.cc",
diff --git a/quiche/quic/moqt/moqt_cached_object.h b/quiche/quic/moqt/moqt_cached_object.h
deleted file mode 100644
index ef5566a..0000000
--- a/quiche/quic/moqt/moqt_cached_object.h
+++ /dev/null
@@ -1,28 +0,0 @@
-// Copyright 2024 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_
-#define QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_
-
-#include <memory>
-
-#include "quiche/quic/moqt/moqt_publisher.h"
-#include "quiche/common/quiche_mem_slice.h"
-
-namespace moqt {
-
-// CachedObject is a version of PublishedObject with a reference counted
-// payload.
-struct CachedObject {
- PublishedObjectMetadata metadata;
- std::shared_ptr<quiche::QuicheMemSlice> payload;
- bool fin_after_this; // This is the last object before FIN.
-};
-
-// Transforms a CachedObject into a PublishedObject.
-PublishedObject CachedObjectToPublishedObject(const CachedObject& object);
-
-} // namespace moqt
-
-#endif // QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_
diff --git a/quiche/quic/moqt/moqt_failed_fetch.h b/quiche/quic/moqt/moqt_failed_fetch.h
deleted file mode 100644
index c971897..0000000
--- a/quiche/quic/moqt/moqt_failed_fetch.h
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright 2024 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef QUICHE_QUIC_MOQT_MOQT_FAILED_FETCH_H_
-#define QUICHE_QUIC_MOQT_MOQT_FAILED_FETCH_H_
-
-#include <utility>
-
-#include "absl/status/status.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
-
-namespace moqt {
-
-// A fetch that starts out in the failed state.
-class MoqtFailedFetch : public MoqtFetchTask {
- public:
- explicit MoqtFailedFetch(absl::Status status) : status_(std::move(status)) {}
-
- GetNextObjectResult GetNextObject(PublishedObject&) override {
- return kError;
- }
- absl::Status GetStatus() override { return status_; }
- void SetObjectAvailableCallback(
- ObjectsAvailableCallback /*callback*/) override {}
- void SetFetchResponseCallback(FetchResponseCallback callback) {
- MoqtFetchError error;
- error.request_id = 0;
- error.error_code = StatusToRequestErrorCode(status_);
- error.error_reason = status_.message();
- std::move(callback)(error);
- }
-
- private:
- absl::Status status_;
-};
-
-} // namespace moqt
-
-#endif // QUICHE_QUIC_MOQT_MOQT_FAILED_FETCH_H_
diff --git a/quiche/quic/moqt/moqt_fetch_task.h b/quiche/quic/moqt/moqt_fetch_task.h
new file mode 100644
index 0000000..b770684
--- /dev/null
+++ b/quiche/quic/moqt/moqt_fetch_task.h
@@ -0,0 +1,90 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_FETCH_TASK_H_
+#define QUICHE_QUIC_MOQT_MOQT_FETCH_TASK_H_
+
+#include <utility>
+#include <variant>
+
+#include "absl/status/status.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
+#include "quiche/common/quiche_callbacks.h"
+
+namespace moqt {
+
+// A handle representing a fetch in progress. The fetch in question can be
+// cancelled by deleting the object.
+class MoqtFetchTask {
+ public:
+ using ObjectsAvailableCallback = quiche::MultiUseCallback<void()>;
+ // The request_id field will be ignored.
+ using FetchResponseCallback = quiche::SingleUseCallback<void(
+ std::variant<MoqtFetchOk, MoqtFetchError>)>;
+
+ virtual ~MoqtFetchTask() = default;
+
+ // Potential results of a GetNextObject() call.
+ enum GetNextObjectResult {
+ // The next object is available, and is placed into the reference specified
+ // by the caller.
+ kSuccess,
+ // The next object is not yet available (equivalent of EAGAIN).
+ kPending,
+ // The end of fetch has been reached.
+ kEof,
+ // The fetch has failed; the error is available via GetStatus().
+ kError,
+ };
+
+ // Returns the next object received via the fetch, if available. MUST NOT
+ // return an object with status kObjectDoesNotExist.
+ virtual GetNextObjectResult GetNextObject(PublishedObject& output) = 0;
+
+ // Sets the callback that is called when GetNextObject() has previously
+ // returned kPending, but now a new object (or potentially an error or an
+ // end-of-fetch) is available. The application is responsible for calling
+ // GetNextObject() until it gets kPending; no further callback will occur
+ // until then.
+ // If an object is available immediately, the callback will be called
+ // immediately.
+ virtual void SetObjectAvailableCallback(
+ ObjectsAvailableCallback callback) = 0;
+ // One of these callbacks is called as soon as the data publisher has enough
+ // information for either FETCH_OK or FETCH_ERROR.
+ // If the appropriate response is already available, the callback will be
+ // called immediately.
+ virtual void SetFetchResponseCallback(FetchResponseCallback callback) = 0;
+
+ // Returns the error if fetch has completely failed, and OK otherwise.
+ virtual absl::Status GetStatus() = 0;
+};
+
+// A fetch that starts out in the failed state.
+class MoqtFailedFetch : public MoqtFetchTask {
+ public:
+ explicit MoqtFailedFetch(absl::Status status) : status_(std::move(status)) {}
+
+ GetNextObjectResult GetNextObject(PublishedObject&) override {
+ return kError;
+ }
+ absl::Status GetStatus() override { return status_; }
+ void SetObjectAvailableCallback(
+ ObjectsAvailableCallback /*callback*/) override {}
+ void SetFetchResponseCallback(FetchResponseCallback callback) {
+ MoqtFetchError error;
+ error.request_id = 0;
+ error.error_code = StatusToRequestErrorCode(status_);
+ error.error_reason = status_.message();
+ std::move(callback)(error);
+ }
+
+ private:
+ absl::Status status_;
+};
+
+} // namespace moqt
+
+#endif // QUICHE_QUIC_MOQT_MOQT_FETCH_TASK_H_
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 8c6b55c..05bda43 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -591,7 +591,7 @@
EstablishSession();
FullTrackName full_track_name("foo", "bar");
MockSubscribeRemoteTrackVisitor client_visitor;
- std::optional<absl::string_view> expected_reason = "No tracks published";
+ std::optional<absl::string_view> expected_reason = "not found";
bool received_ok = false;
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
diff --git a/quiche/quic/moqt/moqt_known_track_publisher.cc b/quiche/quic/moqt/moqt_known_track_publisher.cc
index 60150cf..309d65b 100644
--- a/quiche/quic/moqt/moqt_known_track_publisher.cc
+++ b/quiche/quic/moqt/moqt_known_track_publisher.cc
@@ -6,19 +6,18 @@
#include <memory>
-#include "absl/status/status.h"
-#include "absl/status/statusor.h"
+#include "absl/base/nullability.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
namespace moqt {
-absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>>
+absl_nullable std::shared_ptr<MoqtTrackPublisher>
MoqtKnownTrackPublisher::GetTrack(const FullTrackName& track_name) {
auto it = tracks_.find(track_name);
if (it == tracks_.end()) {
- return absl::NotFoundError("Requested track not found");
+ return nullptr;
}
return it->second;
}
diff --git a/quiche/quic/moqt/moqt_known_track_publisher.h b/quiche/quic/moqt/moqt_known_track_publisher.h
index 59a22a0..24a6f35 100644
--- a/quiche/quic/moqt/moqt_known_track_publisher.h
+++ b/quiche/quic/moqt/moqt_known_track_publisher.h
@@ -7,8 +7,8 @@
#include <memory>
+#include "absl/base/nullability.h"
#include "absl/container/flat_hash_map.h"
-#include "absl/status/statusor.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_publisher.h"
@@ -24,8 +24,13 @@
MoqtKnownTrackPublisher& operator=(const MoqtKnownTrackPublisher&) = delete;
MoqtKnownTrackPublisher& operator=(MoqtKnownTrackPublisher&&) = delete;
- absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack(
+ // MoqtPublisher implementation.
+ absl_nullable std::shared_ptr<MoqtTrackPublisher> GetTrack(
const FullTrackName& track_name) override;
+ // TODO(martinduke): Implement namespace support.
+ void AddNamespaceListener(NamespaceListener* /*listener*/) override {}
+ void RemoveNamespaceListener(NamespaceListener* /*listener*/) override {}
+
void Add(std::shared_ptr<MoqtTrackPublisher> track_publisher);
void Delete(const FullTrackName& track_name);
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc
index 050a740..0e145cd 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue.cc
@@ -10,8 +10,8 @@
#include "absl/base/attributes.h"
#include "absl/strings/string_view.h"
-#include "quiche/quic/moqt/moqt_cached_object.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h
index 2417835..1baf993 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.h
+++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -18,9 +18,9 @@
#include "quiche/quic/core/quic_clock.h"
#include "quiche/quic/core/quic_default_clock.h"
#include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/moqt/moqt_cached_object.h"
-#include "quiche/quic/moqt/moqt_failed_fetch.h"
+#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/common/quiche_callbacks.h"
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
index aaee1ac..6980709 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -9,8 +9,8 @@
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/moqt/moqt_cached_object.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
diff --git a/quiche/quic/moqt/moqt_cached_object.cc b/quiche/quic/moqt/moqt_object.cc
similarity index 88%
rename from quiche/quic/moqt/moqt_cached_object.cc
rename to quiche/quic/moqt/moqt_object.cc
index df71f1c..3ee1a72 100644
--- a/quiche/quic/moqt/moqt_cached_object.cc
+++ b/quiche/quic/moqt/moqt_object.cc
@@ -2,10 +2,9 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "quiche/quic/moqt/moqt_cached_object.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "absl/strings/string_view.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/common/quiche_mem_slice.h"
namespace moqt {
diff --git a/quiche/quic/moqt/moqt_object.h b/quiche/quic/moqt/moqt_object.h
new file mode 100644
index 0000000..30abe6c
--- /dev/null
+++ b/quiche/quic/moqt/moqt_object.h
@@ -0,0 +1,47 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_OBJECT_H_
+#define QUICHE_QUIC_MOQT_MOQT_OBJECT_H_
+
+#include <cstdint>
+#include <memory>
+
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/common/quiche_mem_slice.h"
+
+namespace moqt {
+
+struct PublishedObjectMetadata {
+ Location location;
+ uint64_t subgroup; // Equal to object_id for datagrams.
+ MoqtObjectStatus status;
+ MoqtPriority publisher_priority;
+ quic::QuicTime arrival_time = quic::QuicTime::Zero();
+};
+
+// PublishedObject is a description of an object that is sufficient to publish
+// it on a given track.
+struct PublishedObject {
+ PublishedObjectMetadata metadata;
+ quiche::QuicheMemSlice payload;
+ bool fin_after_this = false;
+};
+
+// CachedObject is a version of PublishedObject with a reference counted
+// payload.
+struct CachedObject {
+ PublishedObjectMetadata metadata;
+ std::shared_ptr<quiche::QuicheMemSlice> payload;
+ bool fin_after_this; // This is the last object before FIN.
+};
+
+// Transforms a CachedObject into a PublishedObject.
+PublishedObject CachedObjectToPublishedObject(const CachedObject& object);
+
+} // namespace moqt
+
+#endif // QUICHE_QUIC_MOQT_MOQT_OBJECT_H_
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
index 8e434e3..7960e61 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -14,9 +14,9 @@
#include "absl/algorithm/container.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
-#include "quiche/quic/moqt/moqt_cached_object.h"
-#include "quiche/quic/moqt/moqt_failed_fetch.h"
+#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index cadd11c..5aa0b80 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -18,8 +18,8 @@
#include "quiche/quic/core/quic_clock.h"
#include "quiche/quic/core/quic_default_clock.h"
#include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/moqt/moqt_cached_object.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/common/quiche_circular_deque.h"
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index d08897b..eafbf78 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -17,12 +17,13 @@
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_default_clock.h"
#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/common/platform/api/quiche_expect_bug.h"
-#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/platform/api/quiche_test.h"
#include "quiche/common/quiche_mem_slice.h"
#include "quiche/common/test_tools/quiche_test_utils.h"
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
index acd3587..4f723a2 100644
--- a/quiche/quic/moqt/moqt_publisher.h
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -8,36 +8,19 @@
#include <cstdint>
#include <memory>
#include <optional>
-#include <string>
-#include <variant>
-#include "absl/status/status.h"
-#include "absl/status/statusor.h"
+#include "absl/base/nullability.h"
#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
-#include "quiche/common/quiche_callbacks.h"
-#include "quiche/common/quiche_mem_slice.h"
+#include "quiche/quic/moqt/moqt_session_interface.h"
+#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
-struct PublishedObjectMetadata {
- Location location;
- uint64_t subgroup; // Equal to object_id for datagrams.
- MoqtObjectStatus status;
- MoqtPriority publisher_priority;
- quic::QuicTime arrival_time = quic::QuicTime::Zero();
-};
-
-// PublishedObject is a description of an object that is sufficient to publish
-// it on a given track.
-struct PublishedObject {
- PublishedObjectMetadata metadata;
- quiche::QuicheMemSlice payload;
- bool fin_after_this = false;
-};
-
// MoqtObjectListener is an interface for any entity that is listening for
// incoming objects for a given track.
class MoqtObjectListener {
@@ -77,53 +60,6 @@
virtual void OnTrackPublisherGone() = 0;
};
-// A handle representing a fetch in progress. The fetch in question can be
-// cancelled by deleting the object.
-class MoqtFetchTask {
- public:
- using ObjectsAvailableCallback = quiche::MultiUseCallback<void()>;
- // The request_id field will be ignored.
- using FetchResponseCallback = quiche::SingleUseCallback<void(
- std::variant<MoqtFetchOk, MoqtFetchError>)>;
-
- virtual ~MoqtFetchTask() = default;
-
- // Potential results of a GetNextObject() call.
- enum GetNextObjectResult {
- // The next object is available, and is placed into the reference specified
- // by the caller.
- kSuccess,
- // The next object is not yet available (equivalent of EAGAIN).
- kPending,
- // The end of fetch has been reached.
- kEof,
- // The fetch has failed; the error is available via GetStatus().
- kError,
- };
-
- // Returns the next object received via the fetch, if available. MUST NOT
- // return an object with status kObjectDoesNotExist.
- virtual GetNextObjectResult GetNextObject(PublishedObject& output) = 0;
-
- // Sets the callback that is called when GetNextObject() has previously
- // returned kPending, but now a new object (or potentially an error or an
- // end-of-fetch) is available. The application is responsible for calling
- // GetNextObject() until it gets kPending; no further callback will occur
- // until then.
- // If an object is available immediately, the callback will be called
- // immediately.
- virtual void SetObjectAvailableCallback(
- ObjectsAvailableCallback callback) = 0;
- // One of these callbacks is called as soon as the data publisher has enough
- // information for either FETCH_OK or FETCH_ERROR.
- // If the appropriate response is already available, the callback will be
- // called immediately.
- virtual void SetFetchResponseCallback(FetchResponseCallback callback) = 0;
-
- // Returns the error if fetch has completely failed, and OK otherwise.
- virtual absl::Status GetStatus() = 0;
-};
-
// MoqtTrackPublisher is an application-side API for an MoQT publisher
// of a single individual track.
class MoqtTrackPublisher {
@@ -178,14 +114,37 @@
uint64_t group, std::optional<MoqtDeliveryOrder> order) = 0;
};
+// MoqtSession delivers a NamespaceListener to a TrackPublisher to receive
+// notifications that sub-namespaces are available, and receive tracks that
+// are published within that namespace. This generally occurs when the session
+// receives a SUBSCRIBE_NAMESPACE message.
+class NamespaceListener {
+ public:
+ virtual ~NamespaceListener() = default;
+ // Called when a namespace is published. Will generally result in the
+ // receiving session sending a PUBLISH_NAMESPACE message.
+ virtual void OnNamespacePublished(const TrackNamespace& track_namespace) = 0;
+ // Called when a namespace is no longer available. Will generally result in
+ // the receiving session sending a PUBLISH_NAMESPACE_DONE message.
+ virtual void OnNamespaceDone(const TrackNamespace& track_namespace) = 0;
+ // Called when a track is published within the namespace. Will generally
+ // result in the receiving session sending a PUBLISH message. Track APIs are
+ // available in |publisher|.
+ virtual void OnTrackPublished(
+ std::shared_ptr<MoqtTrackPublisher> publisher) = 0;
+};
+
// MoqtPublisher is an interface to a publisher that allows it to publish
// multiple tracks.
class MoqtPublisher {
public:
virtual ~MoqtPublisher() = default;
- virtual absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack(
+ // These are all called by MoqtSession based on messages arriving on the wire.
+ virtual absl_nullable std::shared_ptr<MoqtTrackPublisher> GetTrack(
const FullTrackName& track_name) = 0;
+ virtual void AddNamespaceListener(NamespaceListener* listener) = 0;
+ virtual void RemoveNamespaceListener(NamespaceListener* listener) = 0;
};
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_relay_publisher.cc b/quiche/quic/moqt/moqt_relay_publisher.cc
new file mode 100644
index 0000000..432413f
--- /dev/null
+++ b/quiche/quic/moqt/moqt_relay_publisher.cc
@@ -0,0 +1,38 @@
+// Copyright 2025 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_relay_publisher.h"
+
+#include <memory>
+
+#include "absl/base/nullability.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_relay_track_publisher.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
+
+namespace moqt {
+
+absl_nullable std::shared_ptr<MoqtTrackPublisher> MoqtRelayPublisher::GetTrack(
+ const FullTrackName& track_name) {
+ auto it = tracks_.find(track_name);
+ if (it == tracks_.end()) {
+ return nullptr;
+ }
+ return it->second;
+}
+
+void MoqtRelayPublisher::Add(
+ std::shared_ptr<MoqtRelayTrackPublisher> track_publisher) {
+ const FullTrackName& track_name = track_publisher->GetTrackName();
+ auto [it, success] = tracks_.emplace(track_name, track_publisher);
+ QUICHE_BUG_IF(MoqtRelayPublisher_duplicate, !success)
+ << "Trying to add a duplicate track into a RelayPublisher";
+}
+
+void MoqtRelayPublisher::Delete(const FullTrackName& track_name) {
+ tracks_.erase(track_name);
+}
+
+} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_relay_publisher.h b/quiche/quic/moqt/moqt_relay_publisher.h
new file mode 100644
index 0000000..8d8e0f2
--- /dev/null
+++ b/quiche/quic/moqt/moqt_relay_publisher.h
@@ -0,0 +1,47 @@
+// Copyright 2025 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_RELAY_PUBLISHER_H_
+#define QUICHE_QUIC_MOQT_MOQT_RELAY_PUBLISHER_H_
+
+#include <memory>
+
+#include "absl/base/nullability.h"
+#include "absl/container/flat_hash_map.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_relay_track_publisher.h"
+
+namespace moqt {
+
+// MoqtRelayPublisher is a publisher that connects sessions that request objects
+// and namespaces with upstream sessions that can deliver those things.
+class MoqtRelayPublisher : public MoqtPublisher {
+ public:
+ MoqtRelayPublisher() = default;
+ MoqtRelayPublisher(const MoqtRelayPublisher&) = delete;
+ MoqtRelayPublisher(MoqtRelayPublisher&&) = delete;
+ MoqtRelayPublisher& operator=(const MoqtRelayPublisher&) = delete;
+ MoqtRelayPublisher& operator=(MoqtRelayPublisher&&) = delete;
+
+ // MoqtPublisher implementation.
+ absl_nullable std::shared_ptr<MoqtTrackPublisher> GetTrack(
+ const FullTrackName& track_name) override;
+ // TODO(martinduke): Implement namespace support.
+ void AddNamespaceListener(NamespaceListener* /*listener*/) override {}
+ void RemoveNamespaceListener(NamespaceListener* /*listener*/) override {}
+
+ void Add(std::shared_ptr<MoqtRelayTrackPublisher> track_publisher);
+ void Delete(const FullTrackName& track_name);
+
+ private:
+ absl::flat_hash_map<FullTrackName, std::shared_ptr<MoqtRelayTrackPublisher>>
+ tracks_;
+ // TODO(martinduke): Add a map of Namespaces to source sessions and
+ // namespace listeners.
+};
+
+} // namespace moqt
+
+#endif // QUICHE_QUIC_MOQT_MOQT_RELAY_PUBLISHER_H_
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.cc b/quiche/quic/moqt/moqt_relay_track_publisher.cc
index 275caf1..b091731 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher.cc
+++ b/quiche/quic/moqt/moqt_relay_track_publisher.cc
@@ -10,8 +10,8 @@
#include "absl/base/attributes.h"
#include "absl/strings/string_view.h"
-#include "quiche/quic/moqt/moqt_cached_object.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.h b/quiche/quic/moqt/moqt_relay_track_publisher.h
index 22cba24..2a1d153 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher.h
+++ b/quiche/quic/moqt/moqt_relay_track_publisher.h
@@ -18,11 +18,12 @@
#include "quiche/quic/core/quic_clock.h"
#include "quiche/quic/core/quic_default_clock.h"
#include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/moqt/moqt_cached_object.h"
-#include "quiche/quic/moqt/moqt_failed_fetch.h"
+#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/web_transport/web_transport.h"
@@ -38,7 +39,8 @@
//
// This class is primarily meant to be used by live relays to buffer the
// frames that arrive for a short time.
-class MoqtRelayTrackPublisher : public MoqtTrackPublisher {
+class MoqtRelayTrackPublisher : public MoqtTrackPublisher,
+ public SubscribeRemoteTrack::Visitor {
public:
MoqtRelayTrackPublisher(
FullTrackName track,
@@ -58,6 +60,20 @@
MoqtRelayTrackPublisher& operator=(const MoqtRelayTrackPublisher&) = delete;
MoqtRelayTrackPublisher& operator=(MoqtRelayTrackPublisher&&) = default;
+ // SubscribeRemoteTrack::Visitor implementation.
+ // TODO(martinduke): Implement these.
+ void OnReply(
+ const FullTrackName& /*full_track_name*/,
+ std::optional<Location> /*largest_location*/,
+ std::optional<absl::string_view> /*error_reason_phrase*/) override {}
+ void OnCanAckObjects(MoqtObjectAckFunction /*ack_function*/) override {}
+ void OnObjectFragment(const FullTrackName& /*full_track_name*/,
+ const PublishedObjectMetadata& /*metadata*/,
+ absl::string_view /*object*/,
+ bool /*end_of_message*/) override {}
+ void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}
+ void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {}
+
// Publish a received object. Returns false if the object is invalid, given
// other non-normal objects indicate that the sequence number should not
// occur. A false return value might result in a session error on the
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
index 6b24c91..c76a725 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
+++ b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
@@ -9,8 +9,8 @@
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/moqt/moqt_cached_object.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index ffa88a1..9ccb1db 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -10,13 +10,13 @@
#include <memory>
#include <optional>
#include <string>
-#include <tuple>
#include <utility>
#include <variant>
#include <vector>
#include "absl/algorithm/container.h"
+#include "absl/base/nullability.h"
#include "absl/container/btree_map.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
@@ -24,15 +24,16 @@
#include "absl/functional/bind_front.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
-#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "quiche/quic/core/quic_alarm_factory.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
@@ -83,11 +84,14 @@
return instance;
}
- absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack(
+ // MoqtPublisher implementation.
+ absl_nullable std::shared_ptr<MoqtTrackPublisher> GetTrack(
const FullTrackName& track_name) override {
QUICHE_DCHECK(track_name.IsValid());
- return absl::NotFoundError("No tracks published");
+ return nullptr;
}
+ void AddNamespaceListener(NamespaceListener*) override {}
+ void RemoveNamespaceListener(NamespaceListener*) override {}
};
} // namespace
@@ -1036,14 +1040,13 @@
return;
}
const FullTrackName& track_name = message.full_track_name;
- absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> track_publisher =
+ std::shared_ptr<MoqtTrackPublisher> track_publisher =
session_->publisher_->GetTrack(track_name);
- if (!track_publisher.ok()) {
+ if (track_publisher == nullptr) {
QUIC_DLOG(INFO) << ENDPOINT << "SUBSCRIBE for " << track_name
- << " rejected by the application: "
- << track_publisher.status();
+ << " rejected by the application: does not exist";
SendSubscribeError(message.request_id, RequestErrorCode::kTrackDoesNotExist,
- track_publisher.status().message());
+ "not found");
return;
}
@@ -1056,9 +1059,9 @@
session_->monitoring_interfaces_for_published_tracks_.erase(monitoring_it);
}
- MoqtTrackPublisher* track_publisher_ptr = track_publisher->get();
+ MoqtTrackPublisher* track_publisher_ptr = track_publisher.get();
auto subscription = std::make_unique<MoqtSession::PublishedSubscription>(
- session_, *std::move(track_publisher), message, monitoring);
+ session_, track_publisher, message, monitoring);
subscription->set_delivery_timeout(message.parameters.delivery_timeout);
MoqtSession::PublishedSubscription* subscription_ptr = subscription.get();
auto [it, success] = session_->published_subscriptions_.emplace(
@@ -1298,9 +1301,9 @@
return;
}
// TODO(martinduke): Handle authentication.
- absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> track =
+ std::shared_ptr<MoqtTrackPublisher> track =
session_->publisher_->GetTrack(message.full_track_name);
- if (!track.ok()) {
+ if (track == nullptr) {
MoqtTrackStatusError error;
error.request_id = message.request_id;
error.error_code = RequestErrorCode::kTrackDoesNotExist;
@@ -1310,8 +1313,8 @@
}
auto [it, inserted] = session_->incoming_track_status_.emplace(
message.request_id, std::make_unique<DownstreamTrackStatus>(
- message.request_id, session_, track->get()));
- track->get()->AddObjectListener(it->second.get());
+ message.request_id, session_, track.get()));
+ track->AddObjectListener(it->second.get());
}
void MoqtSession::ControlStream::OnGoAwayMessage(const MoqtGoAway& message) {
@@ -1450,23 +1453,21 @@
const StandaloneFetch& standalone_fetch =
std::get<StandaloneFetch>(message.fetch);
track_name = standalone_fetch.full_track_name;
- absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> track_publisher =
+ std::shared_ptr<MoqtTrackPublisher> track_publisher =
session_->publisher_->GetTrack(track_name);
- if (!track_publisher.ok()) {
+ if (track_publisher == nullptr) {
QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name
- << " rejected by the application: "
- << track_publisher.status();
+ << " rejected by the application: not found";
SendFetchError(message.request_id, RequestErrorCode::kTrackDoesNotExist,
- track_publisher.status().message());
+ "not found");
}
QUIC_DLOG(INFO) << ENDPOINT << "Received a StandaloneFETCH for "
<< track_name;
// The check for end_object < start_object is done in
// MoqtTrackPublisher::Fetch().
- fetch = (*track_publisher)
- ->StandaloneFetch(standalone_fetch.start_location,
- standalone_fetch.end_location,
- message.group_order);
+ fetch = track_publisher->StandaloneFetch(standalone_fetch.start_location,
+ standalone_fetch.end_location,
+ message.group_order);
} else {
// Joining Fetch processing.
uint64_t joining_request_id =
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index bbdcbee..69e425f 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -12,7 +12,7 @@
#include "absl/container/btree_map.h"
#include "quiche/quic/core/quic_interval.h"
#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/web_transport/web_transport.h"
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index 5deee1c..dcb26f0 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -16,10 +16,10 @@
#include "quiche/quic/core/quic_alarm.h"
#include "quiche/quic/core/quic_clock.h"
#include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/moqt/moqt_failed_fetch.h"
+#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_mem_slice.h"
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 4ff4ab8..eeb3412 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -14,9 +14,10 @@
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_alarm.h"
#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_callbacks.h"
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 58ca466..8ed99f9 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -10,9 +10,10 @@
#include "absl/status/status.h"
#include "quiche/quic/core/quic_alarm.h"
+#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
#include "quiche/quic/platform/api/quic_test.h"
#include "quiche/common/quiche_mem_slice.h"