Replace MoqtLiveRelayQueue with TestTrackPublisher.
With ChatServer gone and MoqtRelayTrackPublisher available as a superior relay implementation, MoqtLiveRelayQueue is only used in certain tests to generate a specific sequence of Locations and subgroup IDs, since MoqtOutgoingQueue is rigid in its object creation.
TestTrackPublisher will provide that much more compactly.
PiperOrigin-RevId: 824557346
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 4829b61..cb84ae0 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1558,7 +1558,6 @@
"quic/moqt/moqt_fetch_task.h",
"quic/moqt/moqt_framer.h",
"quic/moqt/moqt_known_track_publisher.h",
- "quic/moqt/moqt_live_relay_queue.h",
"quic/moqt/moqt_messages.h",
"quic/moqt/moqt_object.h",
"quic/moqt/moqt_outgoing_queue.h",
@@ -1586,7 +1585,6 @@
"quic/moqt/moqt_bitrate_adjuster.cc",
"quic/moqt/moqt_framer.cc",
"quic/moqt/moqt_known_track_publisher.cc",
- "quic/moqt/moqt_live_relay_queue.cc",
"quic/moqt/moqt_messages.cc",
"quic/moqt/moqt_object.cc",
"quic/moqt/moqt_outgoing_queue.cc",
@@ -1611,7 +1609,6 @@
"quic/moqt/moqt_bitrate_adjuster_test.cc",
"quic/moqt/moqt_framer_test.cc",
"quic/moqt/moqt_integration_test.cc",
- "quic/moqt/moqt_live_relay_queue_test.cc",
"quic/moqt/moqt_messages_test.cc",
"quic/moqt/moqt_outgoing_queue_test.cc",
"quic/moqt/moqt_parser_fuzz_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 041eea7..5f30475 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1562,7 +1562,6 @@
"src/quiche/quic/moqt/moqt_fetch_task.h",
"src/quiche/quic/moqt/moqt_framer.h",
"src/quiche/quic/moqt/moqt_known_track_publisher.h",
- "src/quiche/quic/moqt/moqt_live_relay_queue.h",
"src/quiche/quic/moqt/moqt_messages.h",
"src/quiche/quic/moqt/moqt_object.h",
"src/quiche/quic/moqt/moqt_outgoing_queue.h",
@@ -1590,7 +1589,6 @@
"src/quiche/quic/moqt/moqt_bitrate_adjuster.cc",
"src/quiche/quic/moqt/moqt_framer.cc",
"src/quiche/quic/moqt/moqt_known_track_publisher.cc",
- "src/quiche/quic/moqt/moqt_live_relay_queue.cc",
"src/quiche/quic/moqt/moqt_messages.cc",
"src/quiche/quic/moqt/moqt_object.cc",
"src/quiche/quic/moqt/moqt_outgoing_queue.cc",
@@ -1616,7 +1614,6 @@
"src/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc",
"src/quiche/quic/moqt/moqt_framer_test.cc",
"src/quiche/quic/moqt/moqt_integration_test.cc",
- "src/quiche/quic/moqt/moqt_live_relay_queue_test.cc",
"src/quiche/quic/moqt/moqt_messages_test.cc",
"src/quiche/quic/moqt/moqt_outgoing_queue_test.cc",
"src/quiche/quic/moqt/moqt_parser_fuzz_test.cc",
diff --git a/build/source_list.json b/build/source_list.json
index f514dc6..3d3f419 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1561,7 +1561,6 @@
"quiche/quic/moqt/moqt_fetch_task.h",
"quiche/quic/moqt/moqt_framer.h",
"quiche/quic/moqt/moqt_known_track_publisher.h",
- "quiche/quic/moqt/moqt_live_relay_queue.h",
"quiche/quic/moqt/moqt_messages.h",
"quiche/quic/moqt/moqt_object.h",
"quiche/quic/moqt/moqt_outgoing_queue.h",
@@ -1589,7 +1588,6 @@
"quiche/quic/moqt/moqt_bitrate_adjuster.cc",
"quiche/quic/moqt/moqt_framer.cc",
"quiche/quic/moqt/moqt_known_track_publisher.cc",
- "quiche/quic/moqt/moqt_live_relay_queue.cc",
"quiche/quic/moqt/moqt_messages.cc",
"quiche/quic/moqt/moqt_object.cc",
"quiche/quic/moqt/moqt_outgoing_queue.cc",
@@ -1615,7 +1613,6 @@
"quiche/quic/moqt/moqt_bitrate_adjuster_test.cc",
"quiche/quic/moqt/moqt_framer_test.cc",
"quiche/quic/moqt/moqt_integration_test.cc",
- "quiche/quic/moqt/moqt_live_relay_queue_test.cc",
"quiche/quic/moqt/moqt_messages_test.cc",
"quiche/quic/moqt/moqt_outgoing_queue_test.cc",
"quiche/quic/moqt/moqt_parser_fuzz_test.cc",
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 66f0fd1..1935580 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -17,7 +17,6 @@
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_known_track_publisher.h"
-#include "quiche/quic/moqt/moqt_live_relay_queue.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
@@ -660,9 +659,7 @@
MoqtKnownTrackPublisher publisher;
server_->session()->set_publisher(&publisher);
- auto queue = std::make_shared<MoqtLiveRelayQueue>(
- full_track_name, MoqtForwardingPreference::kSubgroup,
- MoqtDeliveryOrder::kAscending, quic::QuicTime::Infinite());
+ auto queue = std::make_shared<TestTrackPublisher>(full_track_name);
publisher.Add(queue);
SubscribeLatestObject(full_track_name, &subscribe_visitor_);
@@ -764,10 +761,7 @@
MoqtKnownTrackPublisher publisher;
server_->session()->set_publisher(&publisher);
- auto queue = std::make_shared<MoqtLiveRelayQueue>(
- full_track_name, MoqtForwardingPreference::kSubgroup,
- MoqtDeliveryOrder::kAscending, quic::QuicTime::Infinite(),
- test_harness_.simulator().GetClock());
+ auto queue = std::make_shared<TestTrackPublisher>(full_track_name);
auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
publisher.Add(queue);
@@ -815,10 +809,7 @@
MoqtKnownTrackPublisher publisher;
server_->session()->set_publisher(&publisher);
server_->session()->UseAlternateDeliveryTimeout();
- auto queue = std::make_shared<MoqtLiveRelayQueue>(
- full_track_name, MoqtForwardingPreference::kSubgroup,
- MoqtDeliveryOrder::kAscending, quic::QuicTime::Infinite(),
- test_harness_.simulator().GetClock());
+ auto queue = std::make_shared<TestTrackPublisher>(full_track_name);
auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
publisher.Add(queue);
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc
deleted file mode 100644
index 0e145cd..0000000
--- a/quiche/quic/moqt/moqt_live_relay_queue.cc
+++ /dev/null
@@ -1,231 +0,0 @@
-// Copyright 2024 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "quiche/quic/moqt/moqt_live_relay_queue.h"
-
-#include <cstdint>
-#include <memory>
-#include <optional>
-
-#include "absl/base/attributes.h"
-#include "absl/strings/string_view.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_object.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
-#include "quiche/common/platform/api/quiche_logging.h"
-#include "quiche/common/quiche_buffer_allocator.h"
-#include "quiche/common/quiche_callbacks.h"
-#include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/simple_buffer_allocator.h"
-#include "quiche/web_transport/web_transport.h"
-
-namespace moqt {
-
-bool MoqtLiveRelayQueue::AddFin(Location sequence, uint64_t subgroup) {
- if (!forwarding_preference_.has_value() ||
- *forwarding_preference_ == MoqtForwardingPreference::kDatagram) {
- return false;
- }
- auto group_it = queue_.find(sequence.group);
- if (group_it == queue_.end()) {
- // Group does not exist.
- return false;
- }
- Group& group = group_it->second;
- auto subgroup_it = group.subgroups.find(subgroup);
- if (subgroup_it == group.subgroups.end()) {
- // Subgroup does not exist.
- return false;
- }
- if (subgroup_it->second.empty()) {
- // Cannot FIN an empty subgroup.
- return false;
- }
- if (subgroup_it->second.rbegin()->first != sequence.object) {
- // The queue does not yet have the last object.
- return false;
- }
- subgroup_it->second.rbegin()->second.fin_after_this = true;
- for (MoqtObjectListener* listener : listeners_) {
- listener->OnNewFinAvailable(sequence, subgroup);
- }
- return true;
-}
-
-bool MoqtLiveRelayQueue::OnStreamReset(
- uint64_t group_id, uint64_t subgroup_id,
- webtransport::StreamErrorCode error_code) {
- if (!forwarding_preference_.has_value() ||
- *forwarding_preference_ == MoqtForwardingPreference::kDatagram) {
- return false;
- }
- auto group_it = queue_.find(group_id);
- if (group_it == queue_.end()) {
- // Group does not exist.
- return false;
- }
- Group& group = group_it->second;
- auto subgroup_it = group.subgroups.find(subgroup_id);
- if (subgroup_it == group.subgroups.end()) {
- // Subgroup does not exist.
- return false;
- }
- for (MoqtObjectListener* listener : listeners_) {
- listener->OnSubgroupAbandoned(group_id, subgroup_id, error_code);
- }
- group.subgroups.erase(subgroup_id);
- return true;
-}
-
-bool MoqtLiveRelayQueue::AddObject(const PublishedObjectMetadata& metadata,
- absl::string_view payload, bool fin) {
- const Location& sequence = metadata.location;
- bool last_object_in_stream = fin;
- if (queue_.size() == kMaxQueuedGroups) {
- if (queue_.begin()->first > sequence.group) {
- QUICHE_DLOG(INFO) << "Skipping object from group " << sequence.group
- << " because it is too old.";
- return true;
- }
- if (queue_.find(sequence.group) == queue_.end()) {
- // Erase the oldest group.
- for (MoqtObjectListener* listener : listeners_) {
- listener->OnGroupAbandoned(queue_.begin()->first);
- }
- queue_.erase(queue_.begin());
- }
- }
- // Validate the input given previously received markers.
- if (end_of_track_.has_value() && sequence > *end_of_track_) {
- QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the "
- << "track";
- return false;
- }
- if (metadata.status == MoqtObjectStatus::kEndOfTrack) {
- if (sequence < next_sequence_) {
- QUICHE_DLOG(INFO) << "EndOfTrack is too early.";
- return false;
- }
- // TODO(martinduke): Check that EndOfTrack has normal IDs.
- end_of_track_ = sequence;
- }
- auto group_it = queue_.try_emplace(sequence.group);
- Group& group = group_it.first->second;
- if (!group_it.second) { // Group already exists.
- if (group.complete && sequence.object >= group.next_object) {
- QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the "
- << "group";
- return false;
- }
- if (metadata.status == MoqtObjectStatus::kEndOfGroup &&
- sequence.object < group.next_object) {
- QUICHE_DLOG(INFO) << "Skipping EndOfGroup because it is not the last "
- << "object in the group.";
- return false;
- }
- }
- auto subgroup_it = group.subgroups.try_emplace(metadata.subgroup);
- auto& subgroup = subgroup_it.first->second;
- if (!subgroup.empty()) { // Check if the new object is valid
- CachedObject& last_object = subgroup.rbegin()->second;
- if (last_object.metadata.publisher_priority !=
- metadata.publisher_priority) {
- QUICHE_DLOG(INFO) << "Publisher priority changing in a subgroup";
- return false;
- }
- if (last_object.fin_after_this) {
- QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the "
- << "subgroup";
- return false;
- }
- // If last_object has stream-ending status, it should have been caught by
- // the fin_after_this check above.
- QUICHE_DCHECK(
- last_object.metadata.status != MoqtObjectStatus::kEndOfGroup &&
- last_object.metadata.status != MoqtObjectStatus::kEndOfTrack);
- if (last_object.metadata.location.object >= sequence.object) {
- QUICHE_DLOG(INFO) << "Skipping object because it does not increase the "
- << "object ID monotonically in the subgroup.";
- return false;
- }
- }
- // Object is valid. Update state.
- if (next_sequence_ <= sequence) {
- next_sequence_ = Location{sequence.group, sequence.object + 1};
- }
- if (sequence.object >= group.next_object) {
- group.next_object = sequence.object + 1;
- }
- // Anticipate stream FIN with most non-normal objects.
- switch (metadata.status) {
- case MoqtObjectStatus::kEndOfTrack:
- end_of_track_ = sequence;
- last_object_in_stream = true;
- ABSL_FALLTHROUGH_INTENDED;
- case MoqtObjectStatus::kEndOfGroup:
- group.complete = true;
- last_object_in_stream = true;
- break;
- default:
- break;
- }
- std::shared_ptr<quiche::QuicheMemSlice> slice =
- payload.empty()
- ? nullptr
- : std::make_shared<quiche::QuicheMemSlice>(quiche::QuicheBuffer::Copy(
- quiche::SimpleBufferAllocator::Get(), payload));
- subgroup.emplace(sequence.object,
- CachedObject{metadata, slice, last_object_in_stream});
- for (MoqtObjectListener* listener : listeners_) {
- listener->OnNewObjectAvailable(sequence, metadata.subgroup,
- metadata.publisher_priority);
- }
- return true;
-}
-
-std::optional<PublishedObject> MoqtLiveRelayQueue::GetCachedObject(
- uint64_t group_id, uint64_t subgroup_id, uint64_t min_object_id) const {
- auto group_it = queue_.find(group_id);
- if (group_it == queue_.end()) {
- // Group does not exist.
- return std::nullopt;
- }
- const Group& group = group_it->second;
- auto subgroup_it = group.subgroups.find(subgroup_id);
- if (subgroup_it == group.subgroups.end()) {
- // Subgroup does not exist.
- return std::nullopt;
- }
- const Subgroup& subgroup = subgroup_it->second;
- if (subgroup.empty()) {
- return std::nullopt; // There are no objects.
- }
- // Find an object with ID of at least min_object_id.
- auto object_it = subgroup.lower_bound(min_object_id);
- if (object_it == subgroup.end()) {
- // No object after the last one received.
- return std::nullopt;
- }
- return CachedObjectToPublishedObject(object_it->second);
-}
-
-void MoqtLiveRelayQueue::ForAllObjects(
- quiche::UnretainedCallback<void(const CachedObject&)> callback) {
- for (auto& group_it : queue_) {
- for (auto& subgroup_it : group_it.second.subgroups) {
- for (auto& object_it : subgroup_it.second) {
- callback(object_it.second);
- }
- }
- }
-}
-
-std::optional<Location> MoqtLiveRelayQueue::largest_location() const {
- if (queue_.empty()) {
- return std::nullopt;
- }
- return Location{next_sequence_.group, next_sequence_.object - 1};
-}
-
-} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h
deleted file mode 100644
index 1baf993..0000000
--- a/quiche/quic/moqt/moqt_live_relay_queue.h
+++ /dev/null
@@ -1,193 +0,0 @@
-// Copyright 2024 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef QUICHE_QUIC_MOQT_TOOLS_MOQT_RELAY_QUEUE_H_
-#define QUICHE_QUIC_MOQT_TOOLS_MOQT_RELAY_QUEUE_H_
-
-#include <cstddef>
-#include <cstdint>
-#include <memory>
-#include <optional>
-#include <utility>
-
-#include "absl/container/btree_map.h"
-#include "absl/container/flat_hash_set.h"
-#include "absl/status/status.h"
-#include "absl/strings/string_view.h"
-#include "quiche/quic/core/quic_clock.h"
-#include "quiche/quic/core/quic_default_clock.h"
-#include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/moqt/moqt_fetch_task.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_object.h"
-#include "quiche/quic/moqt/moqt_priority.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
-#include "quiche/common/quiche_callbacks.h"
-#include "quiche/web_transport/web_transport.h"
-
-namespace moqt {
-
-// MoqtLiveRelayQueue lets the user send objects by providing the contents of
-// the object and the object metadata. It will store these by sequence number.
-// When called on to provide a range of objects, it will fill in any missing
-// objects and groups.
-//
-// The queue will maintain a buffer of three most recent groups that will be
-// provided to subscribers automatically.
-//
-// This class is primarily meant to be used by live relays to buffer the
-// frames that arrive for a short time.
-class MoqtLiveRelayQueue : public MoqtTrackPublisher {
- public:
- MoqtLiveRelayQueue(
- FullTrackName track,
- std::optional<MoqtForwardingPreference> forwarding_preference,
- std::optional<MoqtDeliveryOrder> delivery_order,
- std::optional<quic::QuicTime> expiration = quic::QuicTime::Infinite(),
- const quic::QuicClock* clock = quic::QuicDefaultClock::Get())
- : clock_(clock),
- track_(std::move(track)),
- forwarding_preference_(forwarding_preference),
- delivery_order_(delivery_order),
- expiration_(expiration),
- next_sequence_(0, 0) {}
-
- MoqtLiveRelayQueue(const MoqtLiveRelayQueue&) = delete;
- MoqtLiveRelayQueue(MoqtLiveRelayQueue&&) = default;
- MoqtLiveRelayQueue& operator=(const MoqtLiveRelayQueue&) = delete;
- MoqtLiveRelayQueue& operator=(MoqtLiveRelayQueue&&) = default;
-
- // Publish a received object. Returns false if the object is invalid, given
- // other non-normal objects indicate that the sequence number should not
- // occur. A false return value might result in a session error on the
- // inbound session, but this queue is the only place that retains enough state
- // to check.
- bool AddObject(const PublishedObjectMetadata& metadata,
- absl::string_view payload, bool fin);
-
- // Convenience methods primarily for use in tests. Prefer the
- // `PublishedObjectMetadata` version in real forwarding code to ensure all
- // metadata is copied correctly.
- bool AddObject(Location location, uint64_t subgroup, MoqtObjectStatus status,
- bool fin = false) {
- PublishedObjectMetadata metadata;
- metadata.location = location;
- metadata.subgroup = subgroup;
- metadata.status = status;
- metadata.publisher_priority = 0;
- return AddObject(metadata, "", fin);
- }
- bool AddObject(Location location, uint64_t subgroup, absl::string_view object,
- bool fin = false) {
- PublishedObjectMetadata metadata;
- metadata.location = location;
- metadata.subgroup = subgroup;
- metadata.status = MoqtObjectStatus::kNormal;
- metadata.publisher_priority = 0;
- return AddObject(metadata, object, fin);
- }
-
- // Record a received FIN from upstream that did not come with the last object.
- // If the forwarding preference is kDatagram or kTrack, |sequence| is ignored.
- // Otherwise, |sequence| is used to determine which stream is being FINed. If
- // the object ID does not match the last object ID in the stream, no action
- // is taken.
- bool AddFin(Location sequence, uint64_t subgroup_id);
- // Record a received RESET_STREAM from upstream. Returns false on datagram
- // tracks, or if the stream does not exist.
- bool OnStreamReset(uint64_t group_id, uint64_t subgroup_id,
- webtransport::StreamErrorCode error_code);
-
- // MoqtTrackPublisher implementation.
- const FullTrackName& GetTrackName() const override { return track_; }
- std::optional<PublishedObject> GetCachedObject(
- uint64_t group_id, uint64_t subgroup_id,
- uint64_t min_object) const override;
- void AddObjectListener(MoqtObjectListener* listener) override {
- listeners_.insert(listener);
- listener->OnSubscribeAccepted();
- }
- void RemoveObjectListener(MoqtObjectListener* listener) override {
- listeners_.erase(listener);
- }
- std::optional<Location> largest_location() const override;
- std::optional<MoqtForwardingPreference> forwarding_preference()
- const override {
- return forwarding_preference_;
- }
- std::optional<MoqtDeliveryOrder> delivery_order() const override {
- return delivery_order_;
- }
- std::optional<quic::QuicTimeDelta> expiration() const override {
- if (!expiration_.has_value()) {
- return std::nullopt;
- }
- if (expiration_ == quic::QuicTime::Infinite()) {
- return quic::QuicTimeDelta::Zero();
- }
- if (expiration_ < clock_->Now()) {
- // TODO(martinduke): Tear everything down; the track is expired.
- return quic::QuicTimeDelta::Zero();
- }
- return clock_->Now() - *expiration_;
- }
- std::unique_ptr<MoqtFetchTask> StandaloneFetch(
- Location /*start*/, Location /*end*/,
- std::optional<MoqtDeliveryOrder> /*order*/) override {
- return std::make_unique<MoqtFailedFetch>(
- absl::UnimplementedError("Fetch not implemented"));
- }
- std::unique_ptr<MoqtFetchTask> RelativeFetch(
- uint64_t /*group_diff*/,
- std::optional<MoqtDeliveryOrder> /*order*/) override {
- return std::make_unique<MoqtFailedFetch>(
- absl::UnimplementedError("Fetch not implemented"));
- }
- std::unique_ptr<MoqtFetchTask> AbsoluteFetch(
- uint64_t /*group*/, std::optional<MoqtDeliveryOrder> /*order*/) override {
- return std::make_unique<MoqtFailedFetch>(
- absl::UnimplementedError("Fetch not implemented"));
- }
-
- bool HasSubscribers() const { return !listeners_.empty(); }
-
- // Since MoqtTrackPublisher is generally held in a shared_ptr, an explicit
- // call allows all the listeners to delete their reference and actually
- // destroy the object.
- void RemoveAllSubscriptions() {
- for (MoqtObjectListener* listener : listeners_) {
- listener->OnTrackPublisherGone();
- }
- }
-
- void ForAllObjects(
- quiche::UnretainedCallback<void(const CachedObject&)> callback);
-
- private:
- // The number of recent groups to keep around for newly joined subscribers.
- static constexpr size_t kMaxQueuedGroups = 3;
-
- // Ordered by object id.
- using Subgroup = absl::btree_map<uint64_t, CachedObject>;
-
- struct Group {
- uint64_t next_object = 0;
- bool complete = false; // If true, kEndOfGroup has been received.
- absl::btree_map<uint64_t, Subgroup> subgroups; // Ordered by subgroup id.
- };
-
- const quic::QuicClock* clock_;
- FullTrackName track_;
- std::optional<MoqtForwardingPreference> forwarding_preference_;
- std::optional<MoqtDeliveryOrder> delivery_order_;
- std::optional<quic::QuicTime> expiration_;
- absl::btree_map<uint64_t, Group> queue_; // Ordered by group id.
- absl::flat_hash_set<MoqtObjectListener*> listeners_;
- std::optional<Location> end_of_track_;
- Location next_sequence_;
-};
-
-} // namespace moqt
-
-#endif // QUICHE_QUIC_MOQT_TOOLS_MOQT_RELAY_QUEUE_H_
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
deleted file mode 100644
index 6980709..0000000
--- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc
+++ /dev/null
@@ -1,476 +0,0 @@
-// Copyright 2024 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "quiche/quic/moqt/moqt_live_relay_queue.h"
-
-#include <cstdint>
-#include <optional>
-
-#include "absl/strings/string_view.h"
-#include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_object.h"
-#include "quiche/quic/moqt/moqt_priority.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
-#include "quiche/quic/moqt/moqt_subscribe_windows.h"
-#include "quiche/common/platform/api/quiche_logging.h"
-#include "quiche/common/platform/api/quiche_test.h"
-#include "quiche/web_transport/web_transport.h"
-
-namespace moqt::test {
-
-namespace {
-
-class TestMoqtLiveRelayQueue : public MoqtLiveRelayQueue,
- public MoqtObjectListener {
- public:
- TestMoqtLiveRelayQueue()
- : MoqtLiveRelayQueue(
- FullTrackName{"test", "track"}, MoqtForwardingPreference::kSubgroup,
- MoqtDeliveryOrder::kAscending, quic::QuicTime::Infinite()) {
- AddObjectListener(this);
- }
-
- void OnNewObjectAvailable(Location sequence, uint64_t subgroup_id,
- MoqtPriority /*publisher_priority*/) {
- std::optional<PublishedObject> object =
- GetCachedObject(sequence.group, subgroup_id, sequence.object);
- QUICHE_CHECK(object.has_value());
- if (!object.has_value()) {
- return;
- }
- switch (object->metadata.status) {
- case MoqtObjectStatus::kNormal:
- PublishObject(object->metadata.location.group,
- object->metadata.location.object,
- object->payload.AsStringView());
- break;
- case MoqtObjectStatus::kObjectDoesNotExist:
- SkipObject(object->metadata.location.group,
- object->metadata.location.object);
- break;
- case MoqtObjectStatus::kEndOfGroup:
- CloseStreamForGroup(object->metadata.location.group);
- break;
- case MoqtObjectStatus::kEndOfTrack:
- CloseTrack();
- break;
- default:
- EXPECT_TRUE(false);
- }
- if (object->fin_after_this) {
- CloseStreamForSubgroup(object->metadata.location.group,
- object->metadata.subgroup);
- }
- }
-
- void GetObjectsFromPast(const SubscribeWindow& window) {
- ForAllObjects([&](const CachedObject& object) {
- if (window.InWindow(object.metadata.location)) {
- OnNewObjectAvailable(object.metadata.location, object.metadata.subgroup,
- object.metadata.publisher_priority);
- }
- });
- }
-
- MOCK_METHOD(void, OnNewFinAvailable, (Location sequence, uint64_t subgroup));
- MOCK_METHOD(void, OnSubgroupAbandoned,
- (uint64_t group, uint64_t subgroup,
- webtransport::StreamErrorCode error_code));
- MOCK_METHOD(void, OnGroupAbandoned, (uint64_t group_id));
- MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ());
- MOCK_METHOD(void, CloseStreamForSubgroup,
- (uint64_t group_id, uint64_t subgroup_id), ());
- MOCK_METHOD(void, PublishObject,
- (uint64_t group_id, uint64_t object_id,
- absl::string_view payload),
- ());
- MOCK_METHOD(void, SkipObject, (uint64_t group_id, uint64_t object_id), ());
- MOCK_METHOD(void, CloseTrack, (), ());
- MOCK_METHOD(void, OnTrackPublisherGone, (), (override));
- MOCK_METHOD(void, OnSubscribeAccepted, (), (override));
- MOCK_METHOD(void, OnSubscribeRejected, (MoqtSubscribeErrorReason reason),
- (override));
-};
-
-// Duplicates of MoqtOutgoingQueue test cases.
-TEST(MoqtLiveRelayQueue, SingleGroup) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- EXPECT_CALL(queue, CloseStreamForGroup(0));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
- EXPECT_TRUE(
- queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup));
-}
-
-TEST(MoqtLiveRelayQueue, SingleGroupPastSubscribeFromZero) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
-
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
- queue.GetObjectsFromPast(SubscribeWindow());
-}
-
-TEST(MoqtLiveRelayQueue, SingleGroupPastSubscribeFromMidGroup) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
-
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
- queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1)));
-}
-
-TEST(MoqtLiveRelayQueue, TwoGroups) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- EXPECT_CALL(queue, CloseStreamForGroup(0));
- EXPECT_CALL(queue, PublishObject(1, 0, "d"));
- EXPECT_CALL(queue, PublishObject(1, 1, "e"));
- EXPECT_CALL(queue, PublishObject(1, 2, "f"));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
- EXPECT_TRUE(
- queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "d"));
- EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "e"));
- EXPECT_TRUE(queue.AddObject(Location{1, 2}, 0, "f"));
-}
-
-TEST(MoqtLiveRelayQueue, TwoGroupsPastSubscribe) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- EXPECT_CALL(queue, CloseStreamForGroup(0));
- EXPECT_CALL(queue, PublishObject(1, 0, "d"));
- EXPECT_CALL(queue, PublishObject(1, 1, "e"));
- EXPECT_CALL(queue, PublishObject(1, 2, "f"));
-
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- EXPECT_CALL(queue, CloseStreamForGroup(0));
- EXPECT_CALL(queue, PublishObject(1, 0, "d"));
- EXPECT_CALL(queue, PublishObject(1, 1, "e"));
- EXPECT_CALL(queue, PublishObject(1, 2, "f"));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
- EXPECT_TRUE(
- queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "d"));
- EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "e"));
- EXPECT_TRUE(queue.AddObject(Location{1, 2}, 0, "f"));
- queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1)));
-}
-
-TEST(MoqtLiveRelayQueue, FiveGroups) {
- TestMoqtLiveRelayQueue queue;
- ;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, CloseStreamForGroup(0));
- EXPECT_CALL(queue, PublishObject(1, 0, "c"));
- EXPECT_CALL(queue, PublishObject(1, 1, "d"));
- EXPECT_CALL(queue, CloseStreamForGroup(1));
- EXPECT_CALL(queue, PublishObject(2, 0, "e"));
- EXPECT_CALL(queue, PublishObject(2, 1, "f"));
- EXPECT_CALL(queue, CloseStreamForGroup(2));
- EXPECT_CALL(queue, OnGroupAbandoned(0));
- EXPECT_CALL(queue, PublishObject(3, 0, "g"));
- EXPECT_CALL(queue, PublishObject(3, 1, "h"));
- EXPECT_CALL(queue, CloseStreamForGroup(3));
- EXPECT_CALL(queue, OnGroupAbandoned(1));
- EXPECT_CALL(queue, PublishObject(4, 0, "i"));
- EXPECT_CALL(queue, PublishObject(4, 1, "j"));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
- EXPECT_TRUE(
- queue.AddObject(Location{0, 2}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "c"));
- EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "d"));
- EXPECT_TRUE(
- queue.AddObject(Location{1, 2}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{2, 0}, 0, "e"));
- EXPECT_TRUE(queue.AddObject(Location{2, 1}, 0, "f"));
- EXPECT_TRUE(
- queue.AddObject(Location{2, 2}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{3, 0}, 0, "g"));
- EXPECT_TRUE(queue.AddObject(Location{3, 1}, 0, "h"));
- EXPECT_TRUE(
- queue.AddObject(Location{3, 2}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{4, 0}, 0, "i"));
- EXPECT_TRUE(queue.AddObject(Location{4, 1}, 0, "j"));
-}
-
-TEST(MoqtLiveRelayQueue, FiveGroupsPastSubscribe) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, CloseStreamForGroup(0));
- EXPECT_CALL(queue, PublishObject(1, 0, "c"));
- EXPECT_CALL(queue, PublishObject(1, 1, "d"));
- EXPECT_CALL(queue, CloseStreamForGroup(1));
- EXPECT_CALL(queue, PublishObject(2, 0, "e"));
- EXPECT_CALL(queue, PublishObject(2, 1, "f"));
- EXPECT_CALL(queue, CloseStreamForGroup(2));
- EXPECT_CALL(queue, OnGroupAbandoned(0));
- EXPECT_CALL(queue, PublishObject(3, 0, "g"));
- EXPECT_CALL(queue, PublishObject(3, 1, "h"));
- EXPECT_CALL(queue, CloseStreamForGroup(3));
- EXPECT_CALL(queue, OnGroupAbandoned(1));
- EXPECT_CALL(queue, PublishObject(4, 0, "i"));
- EXPECT_CALL(queue, PublishObject(4, 1, "j"));
-
- // Past SUBSCRIBE would only get the three most recent groups.
- EXPECT_CALL(queue, PublishObject(2, 0, "e"));
- EXPECT_CALL(queue, PublishObject(2, 1, "f"));
- EXPECT_CALL(queue, CloseStreamForGroup(2));
- EXPECT_CALL(queue, PublishObject(3, 0, "g"));
- EXPECT_CALL(queue, PublishObject(3, 1, "h"));
- EXPECT_CALL(queue, CloseStreamForGroup(3));
- EXPECT_CALL(queue, PublishObject(4, 0, "i"));
- EXPECT_CALL(queue, PublishObject(4, 1, "j"));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
- EXPECT_TRUE(
- queue.AddObject(Location{0, 2}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "c"));
- EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "d"));
- EXPECT_TRUE(
- queue.AddObject(Location{1, 2}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{2, 0}, 0, "e"));
- EXPECT_TRUE(queue.AddObject(Location{2, 1}, 0, "f"));
- EXPECT_TRUE(
- queue.AddObject(Location{2, 2}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{3, 0}, 0, "g"));
- EXPECT_TRUE(queue.AddObject(Location{3, 1}, 0, "h"));
- EXPECT_TRUE(
- queue.AddObject(Location{3, 2}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{4, 0}, 0, "i"));
- EXPECT_TRUE(queue.AddObject(Location{4, 1}, 0, "j"));
- queue.GetObjectsFromPast(SubscribeWindow());
-}
-
-TEST(MoqtLiveRelayQueue, FiveGroupsPastSubscribeFromMidGroup) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(1, 0, "c"));
- EXPECT_CALL(queue, PublishObject(1, 1, "d"));
- EXPECT_CALL(queue, CloseStreamForGroup(1));
- EXPECT_CALL(queue, PublishObject(2, 0, "e"));
- EXPECT_CALL(queue, PublishObject(2, 1, "f"));
- EXPECT_CALL(queue, CloseStreamForGroup(2));
- EXPECT_CALL(queue, OnGroupAbandoned(0));
- EXPECT_CALL(queue, PublishObject(3, 0, "g"));
- EXPECT_CALL(queue, PublishObject(3, 1, "h"));
- EXPECT_CALL(queue, CloseStreamForGroup(3));
- EXPECT_CALL(queue, OnGroupAbandoned(1));
- EXPECT_CALL(queue, PublishObject(4, 0, "i"));
- EXPECT_CALL(queue, PublishObject(4, 1, "j"));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
- EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "c"));
- EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "d"));
- EXPECT_TRUE(
- queue.AddObject(Location{1, 2}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{2, 0}, 0, "e"));
- EXPECT_TRUE(queue.AddObject(Location{2, 1}, 0, "f"));
- EXPECT_TRUE(
- queue.AddObject(Location{2, 2}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{3, 0}, 0, "g"));
- EXPECT_TRUE(queue.AddObject(Location{3, 1}, 0, "h"));
- EXPECT_TRUE(
- queue.AddObject(Location{3, 2}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{4, 0}, 0, "i"));
- EXPECT_TRUE(queue.AddObject(Location{4, 1}, 0, "j"));
- // This object will be ignored, but this is not an error.
- EXPECT_TRUE(
- queue.AddObject(Location{0, 2}, 0, MoqtObjectStatus::kEndOfGroup));
-}
-
-TEST(MoqtLiveRelayQueue, EndOfTrack) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- EXPECT_CALL(queue, CloseTrack());
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
- EXPECT_FALSE(
- queue.AddObject(Location{0, 1}, 0, MoqtObjectStatus::kEndOfTrack));
- EXPECT_TRUE(
- queue.AddObject(Location{1, 0}, 0, MoqtObjectStatus::kEndOfTrack));
-}
-
-TEST(MoqtLiveRelayQueue, EndOfGroup) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- EXPECT_CALL(queue, CloseStreamForGroup(0));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
- EXPECT_FALSE(
- queue.AddObject(Location{0, 1}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(
- queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_FALSE(queue.AddObject(Location{0, 4}, 0, "e"));
-}
-
-TEST(MoqtLiveRelayQueue, OverwriteObject) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
- EXPECT_TRUE(
- queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup));
- EXPECT_FALSE(queue.AddObject(Location{0, 1}, 0, "invalid"));
-}
-
-TEST(MoqtLiveRelayQueue, DifferentSubgroups) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(0, 3, "d"));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 3}, 0));
- EXPECT_CALL(queue, PublishObject(0, 5, "e"));
- EXPECT_CALL(queue, PublishObject(0, 7, "f"));
- EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 5}, 1));
- EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 7}, 2));
-
- // Serve them back in strict subgroup order.
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, PublishObject(0, 3, "d"));
- EXPECT_CALL(queue, CloseStreamForSubgroup(0, 0));
- EXPECT_CALL(queue, PublishObject(0, 1, "b"));
- EXPECT_CALL(queue, PublishObject(0, 5, "e"));
- EXPECT_CALL(queue, CloseStreamForSubgroup(0, 1));
- EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- EXPECT_CALL(queue, PublishObject(0, 7, "f"));
- EXPECT_CALL(queue, CloseStreamForSubgroup(0, 2));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, 1, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 3}, 0, "d"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, 2, "c"));
- EXPECT_TRUE(queue.AddFin(Location{0, 3}, 0));
- EXPECT_TRUE(queue.AddObject(Location{0, 5}, 1, "e"));
- EXPECT_TRUE(queue.AddObject(Location{0, 7}, 2, "f"));
- EXPECT_TRUE(queue.AddFin(Location{0, 5}, 1));
- EXPECT_TRUE(queue.AddFin(Location{0, 7}, 2));
- queue.GetObjectsFromPast(SubscribeWindow());
-}
-
-TEST(MoqtLiveRelayQueue, EndOfSubgroup) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0}, 0));
- EXPECT_CALL(queue, PublishObject(0, 2, "b")).Times(0);
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.AddFin(Location{0, 0}, 0));
- EXPECT_FALSE(queue.AddObject(Location{0, 2}, 0, "b"));
-}
-
-TEST(MoqtLiveRelayQueue, AddObjectWithFin) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a", true));
- std::optional<PublishedObject> object = queue.GetCachedObject(0, 0, 0);
- ASSERT_TRUE(object.has_value());
- EXPECT_EQ(object->metadata.status, MoqtObjectStatus::kNormal);
- EXPECT_TRUE(object->fin_after_this);
-}
-
-TEST(MoqtLiveRelayQueue, LateFin) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a", false));
- EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0}, 0));
- EXPECT_TRUE(queue.AddFin(Location{0, 0}, 0));
- std::optional<PublishedObject> object = queue.GetCachedObject(0, 0, 0);
- ASSERT_TRUE(object.has_value());
- EXPECT_EQ(object->metadata.status, MoqtObjectStatus::kNormal);
- EXPECT_TRUE(object->fin_after_this);
-}
-
-TEST(MoqtLiveRelayQueue, StreamReset) {
- TestMoqtLiveRelayQueue queue;
- {
- testing::InSequence seq;
- EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, OnSubgroupAbandoned(0, 0, 0x1));
- }
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
- EXPECT_TRUE(queue.OnStreamReset(0, 0, 0x1));
-}
-
-} // namespace
-
-} // namespace moqt::test
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
index afddd23..97bb809 100644
--- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -11,9 +11,12 @@
#include <utility>
#include <variant>
+#include "absl/container/flat_hash_map.h"
+#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
@@ -22,6 +25,7 @@
#include "quiche/quic/moqt/moqt_session_callbacks.h"
#include "quiche/quic/moqt/moqt_session_interface.h"
#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_mem_slice.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt::test {
@@ -92,6 +96,92 @@
FullTrackName track_name_;
};
+// A very simple MoqtTrackPublisher that allows tests to add arbitrary objects.
+class TestTrackPublisher : public MoqtTrackPublisher {
+ public:
+ explicit TestTrackPublisher(FullTrackName name)
+ : track_name_(std::move(name)) {}
+ const FullTrackName& GetTrackName() const override { return track_name_; }
+ std::optional<PublishedObject> GetCachedObject(
+ uint64_t group, uint64_t subgroup, uint64_t object) const override {
+ Location location(group, object);
+ auto it = objects_.find(location);
+ if (it == objects_.end()) {
+ return std::nullopt;
+ }
+ return CachedObjectToPublishedObject(it->second);
+ }
+ void AddObjectListener(MoqtObjectListener* listener) override {
+ listeners_.insert(listener);
+ listener->OnSubscribeAccepted();
+ }
+ void RemoveObjectListener(MoqtObjectListener* listener) override {
+ listeners_.erase(listener);
+ }
+ std::optional<Location> largest_location() const override {
+ return largest_location_;
+ }
+ std::optional<MoqtForwardingPreference> forwarding_preference()
+ const override {
+ return MoqtForwardingPreference::kSubgroup;
+ }
+ std::optional<MoqtDeliveryOrder> delivery_order() const override {
+ return MoqtDeliveryOrder::kAscending;
+ }
+ std::optional<quic::QuicTimeDelta> expiration() const override {
+ return quic::QuicTimeDelta::Infinite();
+ }
+ // TODO(martinduke): Support Fetch
+ std::unique_ptr<MoqtFetchTask> StandaloneFetch(
+ Location start, Location end,
+ std::optional<MoqtDeliveryOrder> delivery_order) override {
+ return std::make_unique<MoqtFailedFetch>(
+ absl::UnimplementedError("Fetch not implemented"));
+ }
+ std::unique_ptr<MoqtFetchTask> RelativeFetch(
+ uint64_t offset,
+ std::optional<MoqtDeliveryOrder> delivery_order) override {
+ return std::make_unique<MoqtFailedFetch>(
+ absl::UnimplementedError("Fetch not implemented"));
+ }
+ std::unique_ptr<MoqtFetchTask> AbsoluteFetch(
+ uint64_t offset,
+ std::optional<MoqtDeliveryOrder> delivery_order) override {
+ return std::make_unique<MoqtFailedFetch>(
+ absl::UnimplementedError("Fetch not implemented"));
+ }
+ void AddObject(Location location, uint64_t subgroup,
+ absl::string_view payload, bool fin) {
+ CachedObject object;
+ object.metadata.location = location;
+ object.metadata.subgroup = subgroup;
+ object.metadata.extensions = "";
+ object.metadata.status = MoqtObjectStatus::kNormal;
+ object.metadata.publisher_priority = 128;
+ object.payload = std::make_shared<quiche::QuicheMemSlice>(
+ quiche::QuicheMemSlice::Copy(payload));
+ object.fin_after_this = fin;
+ objects_[location] = std::move(object);
+ if (!largest_location_.has_value() || *largest_location_ < location) {
+ largest_location_ = location;
+ }
+ for (MoqtObjectListener* listener : listeners_) {
+ listener->OnNewObjectAvailable(location, subgroup, 128);
+ }
+ }
+ void RemoveAllSubscriptions() {
+ for (MoqtObjectListener* listener : listeners_) {
+ listener->OnTrackPublisherGone();
+ }
+ }
+
+ private:
+ FullTrackName track_name_;
+ absl::flat_hash_set<MoqtObjectListener*> listeners_;
+ absl::flat_hash_map<Location, CachedObject> objects_;
+ std::optional<Location> largest_location_;
+};
+
// TODO(martinduke): Rename to MockSubscribeVisitor.
class MockSubscribeRemoteTrackVisitor : public SubscribeVisitor {
public: