Update MoQT Session/Application APIs to support relays. Eliminate some circular dependencies that result by moving certain items to other header files. PiperOrigin-RevId: 803457806
diff --git a/build/source_list.bzl b/build/source_list.bzl index 0d3e7ad..84c347e 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -1549,17 +1549,18 @@ ] moqt_hdrs = [ "quic/moqt/moqt_bitrate_adjuster.h", - "quic/moqt/moqt_cached_object.h", - "quic/moqt/moqt_failed_fetch.h", + "quic/moqt/moqt_fetch_task.h", "quic/moqt/moqt_framer.h", "quic/moqt/moqt_known_track_publisher.h", "quic/moqt/moqt_live_relay_queue.h", "quic/moqt/moqt_messages.h", + "quic/moqt/moqt_object.h", "quic/moqt/moqt_outgoing_queue.h", "quic/moqt/moqt_parser.h", "quic/moqt/moqt_priority.h", "quic/moqt/moqt_probe_manager.h", "quic/moqt/moqt_publisher.h", + "quic/moqt/moqt_relay_publisher.h", "quic/moqt/moqt_relay_track_publisher.h", "quic/moqt/moqt_session.h", "quic/moqt/moqt_session_callbacks.h", @@ -1582,7 +1583,6 @@ moqt_srcs = [ "quic/moqt/moqt_bitrate_adjuster.cc", "quic/moqt/moqt_bitrate_adjuster_test.cc", - "quic/moqt/moqt_cached_object.cc", "quic/moqt/moqt_framer.cc", "quic/moqt/moqt_framer_test.cc", "quic/moqt/moqt_integration_test.cc", @@ -1591,6 +1591,7 @@ "quic/moqt/moqt_live_relay_queue_test.cc", "quic/moqt/moqt_messages.cc", "quic/moqt/moqt_messages_test.cc", + "quic/moqt/moqt_object.cc", "quic/moqt/moqt_outgoing_queue.cc", "quic/moqt/moqt_outgoing_queue_test.cc", "quic/moqt/moqt_parser.cc", @@ -1600,6 +1601,7 @@ "quic/moqt/moqt_priority_test.cc", "quic/moqt/moqt_probe_manager.cc", "quic/moqt/moqt_probe_manager_test.cc", + "quic/moqt/moqt_relay_publisher.cc", "quic/moqt/moqt_relay_track_publisher.cc", "quic/moqt/moqt_relay_track_publisher_test.cc", "quic/moqt/moqt_session.cc",
diff --git a/build/source_list.gni b/build/source_list.gni index 0b489e9..09c6eca 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -1553,17 +1553,18 @@ ] moqt_hdrs = [ "src/quiche/quic/moqt/moqt_bitrate_adjuster.h", - "src/quiche/quic/moqt/moqt_cached_object.h", - "src/quiche/quic/moqt/moqt_failed_fetch.h", + "src/quiche/quic/moqt/moqt_fetch_task.h", "src/quiche/quic/moqt/moqt_framer.h", "src/quiche/quic/moqt/moqt_known_track_publisher.h", "src/quiche/quic/moqt/moqt_live_relay_queue.h", "src/quiche/quic/moqt/moqt_messages.h", + "src/quiche/quic/moqt/moqt_object.h", "src/quiche/quic/moqt/moqt_outgoing_queue.h", "src/quiche/quic/moqt/moqt_parser.h", "src/quiche/quic/moqt/moqt_priority.h", "src/quiche/quic/moqt/moqt_probe_manager.h", "src/quiche/quic/moqt/moqt_publisher.h", + "src/quiche/quic/moqt/moqt_relay_publisher.h", "src/quiche/quic/moqt/moqt_relay_track_publisher.h", "src/quiche/quic/moqt/moqt_session.h", "src/quiche/quic/moqt/moqt_session_callbacks.h", @@ -1586,7 +1587,6 @@ moqt_srcs = [ "src/quiche/quic/moqt/moqt_bitrate_adjuster.cc", "src/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc", - "src/quiche/quic/moqt/moqt_cached_object.cc", "src/quiche/quic/moqt/moqt_framer.cc", "src/quiche/quic/moqt/moqt_framer_test.cc", "src/quiche/quic/moqt/moqt_integration_test.cc", @@ -1595,6 +1595,7 @@ "src/quiche/quic/moqt/moqt_live_relay_queue_test.cc", "src/quiche/quic/moqt/moqt_messages.cc", "src/quiche/quic/moqt/moqt_messages_test.cc", + "src/quiche/quic/moqt/moqt_object.cc", "src/quiche/quic/moqt/moqt_outgoing_queue.cc", "src/quiche/quic/moqt/moqt_outgoing_queue_test.cc", "src/quiche/quic/moqt/moqt_parser.cc", @@ -1604,6 +1605,7 @@ "src/quiche/quic/moqt/moqt_priority_test.cc", "src/quiche/quic/moqt/moqt_probe_manager.cc", "src/quiche/quic/moqt/moqt_probe_manager_test.cc", + "src/quiche/quic/moqt/moqt_relay_publisher.cc", "src/quiche/quic/moqt/moqt_relay_track_publisher.cc", "src/quiche/quic/moqt/moqt_relay_track_publisher_test.cc", "src/quiche/quic/moqt/moqt_session.cc",
diff --git a/build/source_list.json b/build/source_list.json index 2bfce87..af36bae 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -1552,17 +1552,18 @@ ], "moqt_hdrs": [ "quiche/quic/moqt/moqt_bitrate_adjuster.h", - "quiche/quic/moqt/moqt_cached_object.h", - "quiche/quic/moqt/moqt_failed_fetch.h", + "quiche/quic/moqt/moqt_fetch_task.h", "quiche/quic/moqt/moqt_framer.h", "quiche/quic/moqt/moqt_known_track_publisher.h", "quiche/quic/moqt/moqt_live_relay_queue.h", "quiche/quic/moqt/moqt_messages.h", + "quiche/quic/moqt/moqt_object.h", "quiche/quic/moqt/moqt_outgoing_queue.h", "quiche/quic/moqt/moqt_parser.h", "quiche/quic/moqt/moqt_priority.h", "quiche/quic/moqt/moqt_probe_manager.h", "quiche/quic/moqt/moqt_publisher.h", + "quiche/quic/moqt/moqt_relay_publisher.h", "quiche/quic/moqt/moqt_relay_track_publisher.h", "quiche/quic/moqt/moqt_session.h", "quiche/quic/moqt/moqt_session_callbacks.h", @@ -1585,7 +1586,6 @@ "moqt_srcs": [ "quiche/quic/moqt/moqt_bitrate_adjuster.cc", "quiche/quic/moqt/moqt_bitrate_adjuster_test.cc", - "quiche/quic/moqt/moqt_cached_object.cc", "quiche/quic/moqt/moqt_framer.cc", "quiche/quic/moqt/moqt_framer_test.cc", "quiche/quic/moqt/moqt_integration_test.cc", @@ -1594,6 +1594,7 @@ "quiche/quic/moqt/moqt_live_relay_queue_test.cc", "quiche/quic/moqt/moqt_messages.cc", "quiche/quic/moqt/moqt_messages_test.cc", + "quiche/quic/moqt/moqt_object.cc", "quiche/quic/moqt/moqt_outgoing_queue.cc", "quiche/quic/moqt/moqt_outgoing_queue_test.cc", "quiche/quic/moqt/moqt_parser.cc", @@ -1603,6 +1604,7 @@ "quiche/quic/moqt/moqt_priority_test.cc", "quiche/quic/moqt/moqt_probe_manager.cc", "quiche/quic/moqt/moqt_probe_manager_test.cc", + "quiche/quic/moqt/moqt_relay_publisher.cc", "quiche/quic/moqt/moqt_relay_track_publisher.cc", "quiche/quic/moqt/moqt_relay_track_publisher_test.cc", "quiche/quic/moqt/moqt_session.cc",
diff --git a/quiche/quic/moqt/moqt_cached_object.h b/quiche/quic/moqt/moqt_cached_object.h deleted file mode 100644 index ef5566a..0000000 --- a/quiche/quic/moqt/moqt_cached_object.h +++ /dev/null
@@ -1,28 +0,0 @@ -// Copyright 2024 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_ -#define QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_ - -#include <memory> - -#include "quiche/quic/moqt/moqt_publisher.h" -#include "quiche/common/quiche_mem_slice.h" - -namespace moqt { - -// CachedObject is a version of PublishedObject with a reference counted -// payload. -struct CachedObject { - PublishedObjectMetadata metadata; - std::shared_ptr<quiche::QuicheMemSlice> payload; - bool fin_after_this; // This is the last object before FIN. -}; - -// Transforms a CachedObject into a PublishedObject. -PublishedObject CachedObjectToPublishedObject(const CachedObject& object); - -} // namespace moqt - -#endif // QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_
diff --git a/quiche/quic/moqt/moqt_failed_fetch.h b/quiche/quic/moqt/moqt_failed_fetch.h deleted file mode 100644 index c971897..0000000 --- a/quiche/quic/moqt/moqt_failed_fetch.h +++ /dev/null
@@ -1,41 +0,0 @@ -// Copyright 2024 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef QUICHE_QUIC_MOQT_MOQT_FAILED_FETCH_H_ -#define QUICHE_QUIC_MOQT_MOQT_FAILED_FETCH_H_ - -#include <utility> - -#include "absl/status/status.h" -#include "quiche/quic/moqt/moqt_messages.h" -#include "quiche/quic/moqt/moqt_publisher.h" - -namespace moqt { - -// A fetch that starts out in the failed state. -class MoqtFailedFetch : public MoqtFetchTask { - public: - explicit MoqtFailedFetch(absl::Status status) : status_(std::move(status)) {} - - GetNextObjectResult GetNextObject(PublishedObject&) override { - return kError; - } - absl::Status GetStatus() override { return status_; } - void SetObjectAvailableCallback( - ObjectsAvailableCallback /*callback*/) override {} - void SetFetchResponseCallback(FetchResponseCallback callback) { - MoqtFetchError error; - error.request_id = 0; - error.error_code = StatusToRequestErrorCode(status_); - error.error_reason = status_.message(); - std::move(callback)(error); - } - - private: - absl::Status status_; -}; - -} // namespace moqt - -#endif // QUICHE_QUIC_MOQT_MOQT_FAILED_FETCH_H_
diff --git a/quiche/quic/moqt/moqt_fetch_task.h b/quiche/quic/moqt/moqt_fetch_task.h new file mode 100644 index 0000000..b770684 --- /dev/null +++ b/quiche/quic/moqt/moqt_fetch_task.h
@@ -0,0 +1,90 @@ +// Copyright 2024 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_MOQT_MOQT_FETCH_TASK_H_ +#define QUICHE_QUIC_MOQT_MOQT_FETCH_TASK_H_ + +#include <utility> +#include <variant> + +#include "absl/status/status.h" +#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" +#include "quiche/common/quiche_callbacks.h" + +namespace moqt { + +// A handle representing a fetch in progress. The fetch in question can be +// cancelled by deleting the object. +class MoqtFetchTask { + public: + using ObjectsAvailableCallback = quiche::MultiUseCallback<void()>; + // The request_id field will be ignored. + using FetchResponseCallback = quiche::SingleUseCallback<void( + std::variant<MoqtFetchOk, MoqtFetchError>)>; + + virtual ~MoqtFetchTask() = default; + + // Potential results of a GetNextObject() call. + enum GetNextObjectResult { + // The next object is available, and is placed into the reference specified + // by the caller. + kSuccess, + // The next object is not yet available (equivalent of EAGAIN). + kPending, + // The end of fetch has been reached. + kEof, + // The fetch has failed; the error is available via GetStatus(). + kError, + }; + + // Returns the next object received via the fetch, if available. MUST NOT + // return an object with status kObjectDoesNotExist. + virtual GetNextObjectResult GetNextObject(PublishedObject& output) = 0; + + // Sets the callback that is called when GetNextObject() has previously + // returned kPending, but now a new object (or potentially an error or an + // end-of-fetch) is available. The application is responsible for calling + // GetNextObject() until it gets kPending; no further callback will occur + // until then. + // If an object is available immediately, the callback will be called + // immediately. + virtual void SetObjectAvailableCallback( + ObjectsAvailableCallback callback) = 0; + // One of these callbacks is called as soon as the data publisher has enough + // information for either FETCH_OK or FETCH_ERROR. + // If the appropriate response is already available, the callback will be + // called immediately. + virtual void SetFetchResponseCallback(FetchResponseCallback callback) = 0; + + // Returns the error if fetch has completely failed, and OK otherwise. + virtual absl::Status GetStatus() = 0; +}; + +// A fetch that starts out in the failed state. +class MoqtFailedFetch : public MoqtFetchTask { + public: + explicit MoqtFailedFetch(absl::Status status) : status_(std::move(status)) {} + + GetNextObjectResult GetNextObject(PublishedObject&) override { + return kError; + } + absl::Status GetStatus() override { return status_; } + void SetObjectAvailableCallback( + ObjectsAvailableCallback /*callback*/) override {} + void SetFetchResponseCallback(FetchResponseCallback callback) { + MoqtFetchError error; + error.request_id = 0; + error.error_code = StatusToRequestErrorCode(status_); + error.error_reason = status_.message(); + std::move(callback)(error); + } + + private: + absl::Status status_; +}; + +} // namespace moqt + +#endif // QUICHE_QUIC_MOQT_MOQT_FETCH_TASK_H_
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 8c6b55c..05bda43 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -591,7 +591,7 @@ EstablishSession(); FullTrackName full_track_name("foo", "bar"); MockSubscribeRemoteTrackVisitor client_visitor; - std::optional<absl::string_view> expected_reason = "No tracks published"; + std::optional<absl::string_view> expected_reason = "not found"; bool received_ok = false; EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason)) .WillOnce([&]() { received_ok = true; });
diff --git a/quiche/quic/moqt/moqt_known_track_publisher.cc b/quiche/quic/moqt/moqt_known_track_publisher.cc index 60150cf..309d65b 100644 --- a/quiche/quic/moqt/moqt_known_track_publisher.cc +++ b/quiche/quic/moqt/moqt_known_track_publisher.cc
@@ -6,19 +6,18 @@ #include <memory> -#include "absl/status/status.h" -#include "absl/status/statusor.h" +#include "absl/base/nullability.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" namespace moqt { -absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> +absl_nullable std::shared_ptr<MoqtTrackPublisher> MoqtKnownTrackPublisher::GetTrack(const FullTrackName& track_name) { auto it = tracks_.find(track_name); if (it == tracks_.end()) { - return absl::NotFoundError("Requested track not found"); + return nullptr; } return it->second; }
diff --git a/quiche/quic/moqt/moqt_known_track_publisher.h b/quiche/quic/moqt/moqt_known_track_publisher.h index 59a22a0..24a6f35 100644 --- a/quiche/quic/moqt/moqt_known_track_publisher.h +++ b/quiche/quic/moqt/moqt_known_track_publisher.h
@@ -7,8 +7,8 @@ #include <memory> +#include "absl/base/nullability.h" #include "absl/container/flat_hash_map.h" -#include "absl/status/statusor.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_publisher.h" @@ -24,8 +24,13 @@ MoqtKnownTrackPublisher& operator=(const MoqtKnownTrackPublisher&) = delete; MoqtKnownTrackPublisher& operator=(MoqtKnownTrackPublisher&&) = delete; - absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack( + // MoqtPublisher implementation. + absl_nullable std::shared_ptr<MoqtTrackPublisher> GetTrack( const FullTrackName& track_name) override; + // TODO(martinduke): Implement namespace support. + void AddNamespaceListener(NamespaceListener* /*listener*/) override {} + void RemoveNamespaceListener(NamespaceListener* /*listener*/) override {} + void Add(std::shared_ptr<MoqtTrackPublisher> track_publisher); void Delete(const FullTrackName& track_name);
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc index 050a740..0e145cd 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue.cc +++ b/quiche/quic/moqt/moqt_live_relay_queue.cc
@@ -10,8 +10,8 @@ #include "absl/base/attributes.h" #include "absl/strings/string_view.h" -#include "quiche/quic/moqt/moqt_cached_object.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_buffer_allocator.h"
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h index 2417835..1baf993 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue.h +++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -18,9 +18,9 @@ #include "quiche/quic/core/quic_clock.h" #include "quiche/quic/core/quic_default_clock.h" #include "quiche/quic/core/quic_time.h" -#include "quiche/quic/moqt/moqt_cached_object.h" -#include "quiche/quic/moqt/moqt_failed_fetch.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/common/quiche_callbacks.h"
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc index aaee1ac..6980709 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc +++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -9,8 +9,8 @@ #include "absl/strings/string_view.h" #include "quiche/quic/core/quic_time.h" -#include "quiche/quic/moqt/moqt_cached_object.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h"
diff --git a/quiche/quic/moqt/moqt_cached_object.cc b/quiche/quic/moqt/moqt_object.cc similarity index 88% rename from quiche/quic/moqt/moqt_cached_object.cc rename to quiche/quic/moqt/moqt_object.cc index df71f1c..3ee1a72 100644 --- a/quiche/quic/moqt/moqt_cached_object.cc +++ b/quiche/quic/moqt/moqt_object.cc
@@ -2,10 +2,9 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "quiche/quic/moqt/moqt_cached_object.h" +#include "quiche/quic/moqt/moqt_object.h" #include "absl/strings/string_view.h" -#include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/common/quiche_mem_slice.h" namespace moqt {
diff --git a/quiche/quic/moqt/moqt_object.h b/quiche/quic/moqt/moqt_object.h new file mode 100644 index 0000000..30abe6c --- /dev/null +++ b/quiche/quic/moqt/moqt_object.h
@@ -0,0 +1,47 @@ +// Copyright 2024 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_MOQT_MOQT_OBJECT_H_ +#define QUICHE_QUIC_MOQT_MOQT_OBJECT_H_ + +#include <cstdint> +#include <memory> + +#include "quiche/quic/core/quic_time.h" +#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/common/quiche_mem_slice.h" + +namespace moqt { + +struct PublishedObjectMetadata { + Location location; + uint64_t subgroup; // Equal to object_id for datagrams. + MoqtObjectStatus status; + MoqtPriority publisher_priority; + quic::QuicTime arrival_time = quic::QuicTime::Zero(); +}; + +// PublishedObject is a description of an object that is sufficient to publish +// it on a given track. +struct PublishedObject { + PublishedObjectMetadata metadata; + quiche::QuicheMemSlice payload; + bool fin_after_this = false; +}; + +// CachedObject is a version of PublishedObject with a reference counted +// payload. +struct CachedObject { + PublishedObjectMetadata metadata; + std::shared_ptr<quiche::QuicheMemSlice> payload; + bool fin_after_this; // This is the last object before FIN. +}; + +// Transforms a CachedObject into a PublishedObject. +PublishedObject CachedObjectToPublishedObject(const CachedObject& object); + +} // namespace moqt + +#endif // QUICHE_QUIC_MOQT_MOQT_OBJECT_H_
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc index 8e434e3..7960e61 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -14,9 +14,9 @@ #include "absl/algorithm/container.h" #include "absl/status/status.h" #include "absl/status/statusor.h" -#include "quiche/quic/moqt/moqt_cached_object.h" -#include "quiche/quic/moqt/moqt_failed_fetch.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h"
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h index cadd11c..5aa0b80 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.h +++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -18,8 +18,8 @@ #include "quiche/quic/core/quic_clock.h" #include "quiche/quic/core/quic_default_clock.h" #include "quiche/quic/core/quic_time.h" -#include "quiche/quic/moqt/moqt_cached_object.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/common/quiche_circular_deque.h"
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc index d08897b..eafbf78 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -17,12 +17,13 @@ #include "absl/strings/string_view.h" #include "quiche/quic/core/quic_default_clock.h" #include "quiche/quic/core/quic_time.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h" #include "quiche/common/platform/api/quiche_expect_bug.h" -#include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_mem_slice.h" #include "quiche/common/test_tools/quiche_test_utils.h"
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h index acd3587..4f723a2 100644 --- a/quiche/quic/moqt/moqt_publisher.h +++ b/quiche/quic/moqt/moqt_publisher.h
@@ -8,36 +8,19 @@ #include <cstdint> #include <memory> #include <optional> -#include <string> -#include <variant> -#include "absl/status/status.h" -#include "absl/status/statusor.h" +#include "absl/base/nullability.h" #include "quiche/quic/core/quic_time.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" -#include "quiche/common/quiche_callbacks.h" -#include "quiche/common/quiche_mem_slice.h" +#include "quiche/quic/moqt/moqt_session_interface.h" +#include "quiche/quic/moqt/moqt_track.h" #include "quiche/web_transport/web_transport.h" namespace moqt { -struct PublishedObjectMetadata { - Location location; - uint64_t subgroup; // Equal to object_id for datagrams. - MoqtObjectStatus status; - MoqtPriority publisher_priority; - quic::QuicTime arrival_time = quic::QuicTime::Zero(); -}; - -// PublishedObject is a description of an object that is sufficient to publish -// it on a given track. -struct PublishedObject { - PublishedObjectMetadata metadata; - quiche::QuicheMemSlice payload; - bool fin_after_this = false; -}; - // MoqtObjectListener is an interface for any entity that is listening for // incoming objects for a given track. class MoqtObjectListener { @@ -77,53 +60,6 @@ virtual void OnTrackPublisherGone() = 0; }; -// A handle representing a fetch in progress. The fetch in question can be -// cancelled by deleting the object. -class MoqtFetchTask { - public: - using ObjectsAvailableCallback = quiche::MultiUseCallback<void()>; - // The request_id field will be ignored. - using FetchResponseCallback = quiche::SingleUseCallback<void( - std::variant<MoqtFetchOk, MoqtFetchError>)>; - - virtual ~MoqtFetchTask() = default; - - // Potential results of a GetNextObject() call. - enum GetNextObjectResult { - // The next object is available, and is placed into the reference specified - // by the caller. - kSuccess, - // The next object is not yet available (equivalent of EAGAIN). - kPending, - // The end of fetch has been reached. - kEof, - // The fetch has failed; the error is available via GetStatus(). - kError, - }; - - // Returns the next object received via the fetch, if available. MUST NOT - // return an object with status kObjectDoesNotExist. - virtual GetNextObjectResult GetNextObject(PublishedObject& output) = 0; - - // Sets the callback that is called when GetNextObject() has previously - // returned kPending, but now a new object (or potentially an error or an - // end-of-fetch) is available. The application is responsible for calling - // GetNextObject() until it gets kPending; no further callback will occur - // until then. - // If an object is available immediately, the callback will be called - // immediately. - virtual void SetObjectAvailableCallback( - ObjectsAvailableCallback callback) = 0; - // One of these callbacks is called as soon as the data publisher has enough - // information for either FETCH_OK or FETCH_ERROR. - // If the appropriate response is already available, the callback will be - // called immediately. - virtual void SetFetchResponseCallback(FetchResponseCallback callback) = 0; - - // Returns the error if fetch has completely failed, and OK otherwise. - virtual absl::Status GetStatus() = 0; -}; - // MoqtTrackPublisher is an application-side API for an MoQT publisher // of a single individual track. class MoqtTrackPublisher { @@ -178,14 +114,37 @@ uint64_t group, std::optional<MoqtDeliveryOrder> order) = 0; }; +// MoqtSession delivers a NamespaceListener to a TrackPublisher to receive +// notifications that sub-namespaces are available, and receive tracks that +// are published within that namespace. This generally occurs when the session +// receives a SUBSCRIBE_NAMESPACE message. +class NamespaceListener { + public: + virtual ~NamespaceListener() = default; + // Called when a namespace is published. Will generally result in the + // receiving session sending a PUBLISH_NAMESPACE message. + virtual void OnNamespacePublished(const TrackNamespace& track_namespace) = 0; + // Called when a namespace is no longer available. Will generally result in + // the receiving session sending a PUBLISH_NAMESPACE_DONE message. + virtual void OnNamespaceDone(const TrackNamespace& track_namespace) = 0; + // Called when a track is published within the namespace. Will generally + // result in the receiving session sending a PUBLISH message. Track APIs are + // available in |publisher|. + virtual void OnTrackPublished( + std::shared_ptr<MoqtTrackPublisher> publisher) = 0; +}; + // MoqtPublisher is an interface to a publisher that allows it to publish // multiple tracks. class MoqtPublisher { public: virtual ~MoqtPublisher() = default; - virtual absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack( + // These are all called by MoqtSession based on messages arriving on the wire. + virtual absl_nullable std::shared_ptr<MoqtTrackPublisher> GetTrack( const FullTrackName& track_name) = 0; + virtual void AddNamespaceListener(NamespaceListener* listener) = 0; + virtual void RemoveNamespaceListener(NamespaceListener* listener) = 0; }; } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_relay_publisher.cc b/quiche/quic/moqt/moqt_relay_publisher.cc new file mode 100644 index 0000000..432413f --- /dev/null +++ b/quiche/quic/moqt/moqt_relay_publisher.cc
@@ -0,0 +1,38 @@ +// Copyright 2025 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/quic/moqt/moqt_relay_publisher.h" + +#include <memory> + +#include "absl/base/nullability.h" +#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_publisher.h" +#include "quiche/quic/moqt/moqt_relay_track_publisher.h" +#include "quiche/common/platform/api/quiche_bug_tracker.h" + +namespace moqt { + +absl_nullable std::shared_ptr<MoqtTrackPublisher> MoqtRelayPublisher::GetTrack( + const FullTrackName& track_name) { + auto it = tracks_.find(track_name); + if (it == tracks_.end()) { + return nullptr; + } + return it->second; +} + +void MoqtRelayPublisher::Add( + std::shared_ptr<MoqtRelayTrackPublisher> track_publisher) { + const FullTrackName& track_name = track_publisher->GetTrackName(); + auto [it, success] = tracks_.emplace(track_name, track_publisher); + QUICHE_BUG_IF(MoqtRelayPublisher_duplicate, !success) + << "Trying to add a duplicate track into a RelayPublisher"; +} + +void MoqtRelayPublisher::Delete(const FullTrackName& track_name) { + tracks_.erase(track_name); +} + +} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_relay_publisher.h b/quiche/quic/moqt/moqt_relay_publisher.h new file mode 100644 index 0000000..8d8e0f2 --- /dev/null +++ b/quiche/quic/moqt/moqt_relay_publisher.h
@@ -0,0 +1,47 @@ +// Copyright 2025 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_MOQT_MOQT_RELAY_PUBLISHER_H_ +#define QUICHE_QUIC_MOQT_MOQT_RELAY_PUBLISHER_H_ + +#include <memory> + +#include "absl/base/nullability.h" +#include "absl/container/flat_hash_map.h" +#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_publisher.h" +#include "quiche/quic/moqt/moqt_relay_track_publisher.h" + +namespace moqt { + +// MoqtRelayPublisher is a publisher that connects sessions that request objects +// and namespaces with upstream sessions that can deliver those things. +class MoqtRelayPublisher : public MoqtPublisher { + public: + MoqtRelayPublisher() = default; + MoqtRelayPublisher(const MoqtRelayPublisher&) = delete; + MoqtRelayPublisher(MoqtRelayPublisher&&) = delete; + MoqtRelayPublisher& operator=(const MoqtRelayPublisher&) = delete; + MoqtRelayPublisher& operator=(MoqtRelayPublisher&&) = delete; + + // MoqtPublisher implementation. + absl_nullable std::shared_ptr<MoqtTrackPublisher> GetTrack( + const FullTrackName& track_name) override; + // TODO(martinduke): Implement namespace support. + void AddNamespaceListener(NamespaceListener* /*listener*/) override {} + void RemoveNamespaceListener(NamespaceListener* /*listener*/) override {} + + void Add(std::shared_ptr<MoqtRelayTrackPublisher> track_publisher); + void Delete(const FullTrackName& track_name); + + private: + absl::flat_hash_map<FullTrackName, std::shared_ptr<MoqtRelayTrackPublisher>> + tracks_; + // TODO(martinduke): Add a map of Namespaces to source sessions and + // namespace listeners. +}; + +} // namespace moqt + +#endif // QUICHE_QUIC_MOQT_MOQT_RELAY_PUBLISHER_H_
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.cc b/quiche/quic/moqt/moqt_relay_track_publisher.cc index 275caf1..b091731 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher.cc +++ b/quiche/quic/moqt/moqt_relay_track_publisher.cc
@@ -10,8 +10,8 @@ #include "absl/base/attributes.h" #include "absl/strings/string_view.h" -#include "quiche/quic/moqt/moqt_cached_object.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_buffer_allocator.h"
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.h b/quiche/quic/moqt/moqt_relay_track_publisher.h index 22cba24..2a1d153 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher.h +++ b/quiche/quic/moqt/moqt_relay_track_publisher.h
@@ -18,11 +18,12 @@ #include "quiche/quic/core/quic_clock.h" #include "quiche/quic/core/quic_default_clock.h" #include "quiche/quic/core/quic_time.h" -#include "quiche/quic/moqt/moqt_cached_object.h" -#include "quiche/quic/moqt/moqt_failed_fetch.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" +#include "quiche/quic/moqt/moqt_track.h" #include "quiche/common/quiche_callbacks.h" #include "quiche/web_transport/web_transport.h" @@ -38,7 +39,8 @@ // // This class is primarily meant to be used by live relays to buffer the // frames that arrive for a short time. -class MoqtRelayTrackPublisher : public MoqtTrackPublisher { +class MoqtRelayTrackPublisher : public MoqtTrackPublisher, + public SubscribeRemoteTrack::Visitor { public: MoqtRelayTrackPublisher( FullTrackName track, @@ -58,6 +60,20 @@ MoqtRelayTrackPublisher& operator=(const MoqtRelayTrackPublisher&) = delete; MoqtRelayTrackPublisher& operator=(MoqtRelayTrackPublisher&&) = default; + // SubscribeRemoteTrack::Visitor implementation. + // TODO(martinduke): Implement these. + void OnReply( + const FullTrackName& /*full_track_name*/, + std::optional<Location> /*largest_location*/, + std::optional<absl::string_view> /*error_reason_phrase*/) override {} + void OnCanAckObjects(MoqtObjectAckFunction /*ack_function*/) override {} + void OnObjectFragment(const FullTrackName& /*full_track_name*/, + const PublishedObjectMetadata& /*metadata*/, + absl::string_view /*object*/, + bool /*end_of_message*/) override {} + void OnSubscribeDone(FullTrackName /*full_track_name*/) override {} + void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {} + // Publish a received object. Returns false if the object is invalid, given // other non-normal objects indicate that the sequence number should not // occur. A false return value might result in a session error on the
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc index 6b24c91..c76a725 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc +++ b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
@@ -9,8 +9,8 @@ #include "absl/strings/string_view.h" #include "quiche/quic/core/quic_time.h" -#include "quiche/quic/moqt/moqt_cached_object.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h"
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index ffa88a1..9ccb1db 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -10,13 +10,13 @@ #include <memory> #include <optional> #include <string> -#include <tuple> #include <utility> #include <variant> #include <vector> #include "absl/algorithm/container.h" +#include "absl/base/nullability.h" #include "absl/container/btree_map.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" @@ -24,15 +24,16 @@ #include "absl/functional/bind_front.h" #include "absl/memory/memory.h" #include "absl/status/status.h" -#include "absl/status/statusor.h" #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "absl/types/span.h" #include "quiche/quic/core/quic_alarm_factory.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/core/quic_types.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_framer.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_parser.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" @@ -83,11 +84,14 @@ return instance; } - absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack( + // MoqtPublisher implementation. + absl_nullable std::shared_ptr<MoqtTrackPublisher> GetTrack( const FullTrackName& track_name) override { QUICHE_DCHECK(track_name.IsValid()); - return absl::NotFoundError("No tracks published"); + return nullptr; } + void AddNamespaceListener(NamespaceListener*) override {} + void RemoveNamespaceListener(NamespaceListener*) override {} }; } // namespace @@ -1036,14 +1040,13 @@ return; } const FullTrackName& track_name = message.full_track_name; - absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> track_publisher = + std::shared_ptr<MoqtTrackPublisher> track_publisher = session_->publisher_->GetTrack(track_name); - if (!track_publisher.ok()) { + if (track_publisher == nullptr) { QUIC_DLOG(INFO) << ENDPOINT << "SUBSCRIBE for " << track_name - << " rejected by the application: " - << track_publisher.status(); + << " rejected by the application: does not exist"; SendSubscribeError(message.request_id, RequestErrorCode::kTrackDoesNotExist, - track_publisher.status().message()); + "not found"); return; } @@ -1056,9 +1059,9 @@ session_->monitoring_interfaces_for_published_tracks_.erase(monitoring_it); } - MoqtTrackPublisher* track_publisher_ptr = track_publisher->get(); + MoqtTrackPublisher* track_publisher_ptr = track_publisher.get(); auto subscription = std::make_unique<MoqtSession::PublishedSubscription>( - session_, *std::move(track_publisher), message, monitoring); + session_, track_publisher, message, monitoring); subscription->set_delivery_timeout(message.parameters.delivery_timeout); MoqtSession::PublishedSubscription* subscription_ptr = subscription.get(); auto [it, success] = session_->published_subscriptions_.emplace( @@ -1298,9 +1301,9 @@ return; } // TODO(martinduke): Handle authentication. - absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> track = + std::shared_ptr<MoqtTrackPublisher> track = session_->publisher_->GetTrack(message.full_track_name); - if (!track.ok()) { + if (track == nullptr) { MoqtTrackStatusError error; error.request_id = message.request_id; error.error_code = RequestErrorCode::kTrackDoesNotExist; @@ -1310,8 +1313,8 @@ } auto [it, inserted] = session_->incoming_track_status_.emplace( message.request_id, std::make_unique<DownstreamTrackStatus>( - message.request_id, session_, track->get())); - track->get()->AddObjectListener(it->second.get()); + message.request_id, session_, track.get())); + track->AddObjectListener(it->second.get()); } void MoqtSession::ControlStream::OnGoAwayMessage(const MoqtGoAway& message) { @@ -1450,23 +1453,21 @@ const StandaloneFetch& standalone_fetch = std::get<StandaloneFetch>(message.fetch); track_name = standalone_fetch.full_track_name; - absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> track_publisher = + std::shared_ptr<MoqtTrackPublisher> track_publisher = session_->publisher_->GetTrack(track_name); - if (!track_publisher.ok()) { + if (track_publisher == nullptr) { QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name - << " rejected by the application: " - << track_publisher.status(); + << " rejected by the application: not found"; SendFetchError(message.request_id, RequestErrorCode::kTrackDoesNotExist, - track_publisher.status().message()); + "not found"); } QUIC_DLOG(INFO) << ENDPOINT << "Received a StandaloneFETCH for " << track_name; // The check for end_object < start_object is done in // MoqtTrackPublisher::Fetch(). - fetch = (*track_publisher) - ->StandaloneFetch(standalone_fetch.start_location, - standalone_fetch.end_location, - message.group_order); + fetch = track_publisher->StandaloneFetch(standalone_fetch.start_location, + standalone_fetch.end_location, + message.group_order); } else { // Joining Fetch processing. uint64_t joining_request_id =
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h index bbdcbee..69e425f 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows.h +++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -12,7 +12,7 @@ #include "absl/container/btree_map.h" #include "quiche/quic/core/quic_interval.h" #include "quiche/quic/moqt/moqt_messages.h" -#include "quiche/quic/moqt/moqt_publisher.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/web_transport/web_transport.h"
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc index 5deee1c..dcb26f0 100644 --- a/quiche/quic/moqt/moqt_track.cc +++ b/quiche/quic/moqt/moqt_track.cc
@@ -16,10 +16,10 @@ #include "quiche/quic/core/quic_alarm.h" #include "quiche/quic/core/quic_clock.h" #include "quiche/quic/core/quic_time.h" -#include "quiche/quic/moqt/moqt_failed_fetch.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" -#include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_mem_slice.h"
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index 4ff4ab8..eeb3412 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -14,9 +14,10 @@ #include "absl/strings/string_view.h" #include "quiche/quic/core/quic_alarm.h" #include "quiche/quic/core/quic_time.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" -#include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h" #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_callbacks.h"
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc index 58ca466..8ed99f9 100644 --- a/quiche/quic/moqt/moqt_track_test.cc +++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -10,9 +10,10 @@ #include "absl/status/status.h" #include "quiche/quic/core/quic_alarm.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" -#include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/tools/moqt_mock_visitor.h" #include "quiche/quic/platform/api/quic_test.h" #include "quiche/common/quiche_mem_slice.h"