blob: 89045f377d8a32244dcb80d6e50ea99f8af7aeec [file]
// Copyright (c) 2026 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_subscription.h"
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include "absl/base/casts.h"
#include "absl/base/nullability.h"
#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h"
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "quiche/quic/core/quic_alarm_factory.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_bidi_stream.h"
#include "quiche/quic/moqt/moqt_error.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_key_value_pair.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_names.h"
#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_session_callbacks.h"
#include "quiche/quic/moqt/moqt_session_interface.h"
#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/quic/moqt/moqt_types.h"
#include "quiche/quic/moqt/moqt_uni_stream.h"
#include "quiche/quic/moqt/test_tools/moqt_framer_utils.h"
#include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
#include "quiche/quic/moqt/test_tools/moqt_session_peer.h"
#include "quiche/quic/platform/api/quic_test.h"
#include "quiche/quic/test_tools/mock_clock.h"
#include "quiche/quic/test_tools/quic_test_utils.h"
#include "quiche/common/quiche_mem_slice.h"
#include "quiche/web_transport/test_tools/mock_web_transport.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt::test {
class SubscriptionPublisherPeer {
public:
static size_t num_open_streams(SubscriptionPublisher* publisher) {
return publisher->stream_map_.GetAllStreams().size();
}
static std::optional<Location> largest_sent(
const SubscriptionPublisher* publisher) {
return publisher->largest_sent_;
}
static const absl::flat_hash_set<DataStreamIndex>& reset_subgroups(
const SubscriptionPublisher* publisher) {
return publisher->reset_subgroups_;
}
};
namespace {
using ::testing::_;
using ::testing::AtLeast;
using ::testing::Return;
using ::testing::ReturnRef;
using ::testing::StrictMock;
using ::webtransport::DatagramStatus;
using ::webtransport::DatagramStatusCode;
class MockSessionToPublisherInterface : public SessionToPublisherInterface {
public:
~MockSessionToPublisherInterface() override = default;
MOCK_METHOD(bool, alternate_delivery_timeout, (), (const, override));
MOCK_METHOD(void, UpdateTrackPriority,
(uint64_t, std::optional<MoqtTrackPriority>, MoqtTrackPriority),
(override));
MOCK_METHOD(quic::QuicAlarmFactory*, alarm_factory, (), (override));
MOCK_METHOD(void, PublishIsDone, (uint64_t), (override));
MOCK_METHOD(webtransport::Session*, session, (), (override));
};
class TestMoqtBidiStream : public MoqtBidiStreamBase {
public:
TestMoqtBidiStream(MoqtFramer* absl_nonnull framer,
const MoqtControlMessageParser& message_parser,
BidiStreamDeletedCallback stream_deleted_callback,
SessionErrorCallback session_error_callback)
: MoqtBidiStreamBase(framer, message_parser,
std::move(stream_deleted_callback),
std::move(session_error_callback)) {}
~TestMoqtBidiStream() override = default;
void OnStreamBound() override {};
absl::Status OnRawControlMessage(
const MoqtRawControlMessage& message) override {
return absl::OkStatus();
}
};
std::optional<PublishedObject> DefaultPublishedObject(
Location location, std::optional<uint64_t> subgroup,
MoqtPriority publisher_priority) {
PublishedObject object;
object.metadata.location = location;
object.metadata.subgroup = subgroup;
object.metadata.status = MoqtObjectStatus::kNormal;
object.metadata.publisher_priority = publisher_priority;
object.metadata.extensions = "extensions";
object.metadata.payload_length = 8;
object.payload.push_back(quiche::QuicheMemSlice::Copy("deadbeef"));
return object;
}
class SubscriptionPublisherTest : public quic::test::QuicTest {
public:
SubscriptionPublisherTest()
: track_publisher_(
std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"))),
bidi_stream_(
&framer_, message_parser_, [] {},
[](MoqtError, absl::string_view) {}),
trace_recorder_(nullptr) {
bidi_stream_.BindStream(&mock_bidi_stream_);
parameters_.set_forward(true);
parameters_.delivery_timeout = quic::QuicTimeDelta::FromSeconds(1);
parameters_.group_order = MoqtDeliveryOrder::kAscending;
EXPECT_CALL(monitoring_interface_, OnObjectAckSupportKnown)
.Times(AtLeast(0));
EXPECT_CALL(visitor_, session).WillRepeatedly(Return(&webtrans_));
publisher_ = std::make_unique<SubscriptionPublisher>(
framer_, track_publisher_, &bidi_stream_, kRequestId, kTrackAlias,
parameters_, &visitor_, &monitoring_interface_, &mock_clock_,
trace_recorder_);
ON_CALL(visitor_, alternate_delivery_timeout).WillByDefault(Return(false));
ON_CALL(webtrans_, GetStreamById(kStreamId))
.WillByDefault(Return(&mock_uni_stream_));
ON_CALL(visitor_, alarm_factory).WillByDefault(Return(&alarm_factory_));
}
~SubscriptionPublisherTest() override {
EXPECT_CALL(*track_publisher_, RemoveObjectListener(publisher_.get()));
size_t num_open_streams =
SubscriptionPublisherPeer::num_open_streams(publisher_.get());
EXPECT_CALL(mock_uni_stream_, ResetWithUserCode).Times(num_open_streams);
}
MoqtPriority subscriber_priority() const {
return parameters_.subscriber_priority.value_or(kDefaultSubscriberPriority);
}
// Create a stream with the given parameters and send the first object. Will
// check that the first bytes written to the stream are equal to
// |opening_bytes|.
void CreateStream(Location location, uint64_t subgroup,
MoqtPriority publisher_priority,
std::string opening_bytes = "") {
EXPECT_CALL(
*track_publisher_,
GetCachedObject(location.group, std::make_optional<uint64_t>(subgroup),
location.object, 0))
.WillOnce(Return( // Once for monitoring interface.
DefaultPublishedObject(location, subgroup, publisher_priority)))
.WillOnce(Return( // To actually deliver the object.
DefaultPublishedObject(location, subgroup, publisher_priority)));
EXPECT_CALL(
*track_publisher_,
GetCachedObject(location.group, std::make_optional<uint64_t>(subgroup),
location.object + 1, 0))
.WillOnce(Return(std::nullopt));
// Additional object retrievals will return nullopt.
EXPECT_CALL(monitoring_interface_, OnNewObjectEnqueued(location));
EXPECT_CALL(mock_uni_stream_, GetStreamId())
.WillRepeatedly(Return(kStreamId));
EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(true));
EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
.WillOnce(Return(&mock_uni_stream_));
EXPECT_CALL(mock_uni_stream_, SetVisitor)
.WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
uni_stream_ = std::move(visitor);
});
EXPECT_CALL(mock_uni_stream_, SetPriority);
ON_CALL(mock_uni_stream_, visitor()).WillByDefault([&]() {
return uni_stream_.get();
});
EXPECT_CALL(mock_uni_stream_, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(mock_uni_stream_, Writev)
.WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
const webtransport::StreamWriteOptions& options) {
EXPECT_TRUE(absl::StartsWith(data[0].AsStringView(), opening_bytes));
EXPECT_FALSE(options.send_fin());
return absl::OkStatus();
});
publisher_->OnNewObjectAvailable(location, subgroup, publisher_priority);
++open_streams_;
}
void CreatePendingStream(Location location, uint64_t subgroup,
MoqtPriority publisher_priority) {
EXPECT_CALL(
*track_publisher_,
GetCachedObject(location.group, std::make_optional<uint64_t>(subgroup),
location.object, 0))
.WillOnce(Return( // Once for monitoring interface.
DefaultPublishedObject(location, subgroup, publisher_priority)));
// Additional object retrievals will return nullopt.
EXPECT_CALL(monitoring_interface_, OnNewObjectEnqueued(location));
EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(false));
EXPECT_CALL(visitor_,
UpdateTrackPriority(1, _,
MoqtTrackPriority{subscriber_priority(),
publisher_priority}));
publisher_->OnNewObjectAvailable(location, subgroup, publisher_priority);
}
static constexpr webtransport::StreamId kStreamId = 100;
static constexpr uint64_t kTrackAlias = 10;
static constexpr uint64_t kRequestId = 1;
MoqtFramer framer_{true};
MoqtControlMessageParser message_parser_{kDefaultMoqtVersion, true};
webtransport::test::MockSession webtrans_;
StrictMock<webtransport::test::MockStream> mock_bidi_stream_;
webtransport::test::MockStream mock_uni_stream_;
std::shared_ptr<MockTrackPublisher> track_publisher_;
TestMoqtBidiStream bidi_stream_;
std::unique_ptr<webtransport::StreamVisitor> uni_stream_;
MessageParameters parameters_;
MockSessionToPublisherInterface visitor_;
StrictMock<MockPublishingMonitorInterface> monitoring_interface_;
MoqtTraceRecorder trace_recorder_;
std::unique_ptr<SubscriptionPublisher> publisher_;
const TrackExtensions extensions_;
quic::MockClock mock_clock_;
MoqtSessionCallbacks callbacks_;
quic::test::TestAlarmFactory alarm_factory_;
int open_streams_ = 0;
};
TEST_F(SubscriptionPublisherTest, OnSubscribeAcceptedNoFilter) {
EXPECT_CALL(mock_bidi_stream_, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(*track_publisher_, largest_location())
.WillOnce(Return(Location(1, 2)));
EXPECT_CALL(*track_publisher_, expiration)
.WillOnce(Return(quic::QuicTimeDelta::FromSeconds(10)));
EXPECT_CALL(*track_publisher_, extensions)
.WillRepeatedly(ReturnRef(extensions_));
EXPECT_CALL(mock_bidi_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _))
.WillOnce(Return(absl::OkStatus()));
publisher_->OnSubscribeAccepted();
EXPECT_TRUE(publisher_->established());
EXPECT_EQ(publisher_->parameters().largest_object, Location(1, 2));
EXPECT_FALSE(publisher_->parameters().subscription_filter.has_value());
}
TEST_F(SubscriptionPublisherTest, OnSubscribeAcceptedWithFilter) {
publisher_->parameters().subscription_filter =
SubscriptionFilter(MoqtFilterType::kLargestObject);
const TrackExtensions extensions(std::nullopt, std::nullopt,
/*default_publisher_priority=*/64,
std::nullopt, std::nullopt, std::nullopt);
EXPECT_CALL(mock_bidi_stream_, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(*track_publisher_, largest_location())
.WillOnce(Return(Location(1, 2)));
EXPECT_CALL(*track_publisher_, expiration)
.WillOnce(Return(quic::QuicTimeDelta::FromSeconds(10)));
EXPECT_CALL(*track_publisher_, extensions)
.WillRepeatedly(ReturnRef(extensions));
EXPECT_CALL(mock_bidi_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _))
.WillOnce(Return(absl::OkStatus()));
publisher_->OnSubscribeAccepted();
ASSERT_TRUE(publisher_->parameters().subscription_filter.has_value());
EXPECT_EQ(publisher_->parameters().subscription_filter->start(),
Location(1, 3));
// Check that default_publisher_priority is set. A datagram set at priority
// 64 should not explicitly encode that.
EXPECT_CALL(*track_publisher_,
GetCachedObject(1, std::optional<uint64_t>(), 3, 0))
.WillOnce(
Return(DefaultPublishedObject(Location(1, 3), std::nullopt, 64)))
.WillOnce(
Return(DefaultPublishedObject(Location(1, 3), std::nullopt, 64)));
EXPECT_CALL(monitoring_interface_, OnNewObjectEnqueued(Location(1, 3)));
EXPECT_CALL(webtrans_, SendOrQueueDatagram)
.WillOnce([](absl::string_view datagram) {
EXPECT_FALSE(datagram.empty());
std::optional<MoqtDatagramType> type =
MoqtDatagramType::FromValue(static_cast<uint64_t>(datagram[0]));
EXPECT_TRUE(type.has_value() && type->has_default_priority());
return DatagramStatus(DatagramStatusCode::kSuccess, "");
});
publisher_->OnNewObjectAvailable(Location(1, 3), std::nullopt, 64);
}
TEST_F(SubscriptionPublisherTest, OnSubscribeRejected) {
EXPECT_CALL(mock_bidi_stream_, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(mock_bidi_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _))
.WillOnce(Return(absl::OkStatus()));
EXPECT_CALL(visitor_, PublishIsDone(1));
publisher_->OnSubscribeRejected(MoqtRequestErrorInfo(
RequestErrorCode::kDoesNotExist, std::nullopt, "reason"));
}
TEST_F(SubscriptionPublisherTest, Update) {
MessageParameters new_params;
new_params.delivery_timeout = quic::QuicTimeDelta::FromSeconds(5);
publisher_->Update(new_params);
// Changing forward preference updates can_have_joining_fetch_
new_params.set_forward(false);
publisher_->Update(new_params);
EXPECT_FALSE(publisher_->parameters().forward());
EXPECT_FALSE(publisher_->can_have_joining_fetch());
}
TEST_F(SubscriptionPublisherTest, UpdatePriorityNoStreams) {
MessageParameters new_params;
new_params.subscriber_priority = 20;
publisher_->Update(new_params);
EXPECT_EQ(publisher_->parameters().subscriber_priority, 20);
}
TEST_F(SubscriptionPublisherTest, UpdatePriorityWithPendingStreams) {
CreatePendingStream(Location(1, 0), 0, 64);
MessageParameters new_params;
new_params.subscriber_priority = 20;
EXPECT_CALL(*track_publisher_, extensions())
.WillRepeatedly(ReturnRef(extensions_));
EXPECT_CALL(visitor_, UpdateTrackPriority(1,
std::optional<MoqtTrackPriority>(
{subscriber_priority(), 64}),
MoqtTrackPriority{20, 64}));
publisher_->Update(new_params);
}
TEST_F(SubscriptionPublisherTest, UpdatePriorityWithActiveStreams) {
CreateStream(
Location(1, 0), 0, 127,
{0x11, static_cast<uint8_t>(kTrackAlias), 0x01, 0x7f, 0x00, 0x0a});
MessageParameters new_params;
new_params.subscriber_priority = 20;
EXPECT_CALL(mock_uni_stream_, SetPriority);
publisher_->Update(new_params);
}
TEST_F(SubscriptionPublisherTest, OnNewObjectAvailableNotInWindow) {
MessageParameters params;
params.subscription_filter = SubscriptionFilter(Location(10, 0), 10);
publisher_->Update(params);
EXPECT_CALL(*track_publisher_, GetCachedObject).Times(0);
publisher_->OnNewObjectAvailable(Location(5, 0), 0, 128);
}
TEST_F(SubscriptionPublisherTest, OnNewObjectAvailableDatagram) {
EXPECT_CALL(*track_publisher_,
GetCachedObject(1, std::optional<uint64_t>(), 0, 0))
.WillOnce(
Return(DefaultPublishedObject(Location(1, 0), std::nullopt, 128)))
.WillOnce(
Return(DefaultPublishedObject(Location(1, 0), std::nullopt, 128)));
EXPECT_CALL(monitoring_interface_, OnNewObjectEnqueued(Location(1, 0)));
EXPECT_CALL(webtrans_, SendOrQueueDatagram)
.WillOnce(Return(DatagramStatus(DatagramStatusCode::kSuccess, "")));
EXPECT_CALL(*track_publisher_, extensions())
.WillRepeatedly(ReturnRef(extensions_));
publisher_->OnNewObjectAvailable(Location(1, 0), std::nullopt, 128);
}
TEST_F(SubscriptionPublisherTest, OnNewObjectAvailableStreamCreationBlocked) {
CreatePendingStream(Location(1, 0), 0, 128);
}
TEST_F(SubscriptionPublisherTest, OnNewFinAvailableNoops) {
// Not in window
MessageParameters params;
params.subscription_filter = SubscriptionFilter(Location(10, 0), 10);
publisher_->Update(params);
EXPECT_CALL(webtrans_, GetStreamById).Times(0);
EXPECT_CALL(mock_uni_stream_, Writev).Times(0);
publisher_->OnNewFinAvailable(Location(0, 0), 0);
// In window but no stream
publisher_->Update(parameters_);
EXPECT_CALL(webtrans_, GetStreamById).Times(0);
EXPECT_CALL(mock_uni_stream_, Writev).Times(0);
publisher_->OnNewFinAvailable(Location(10, 10), 0);
EXPECT_CALL(webtrans_, GetStreamById)
.WillRepeatedly(Return(&mock_uni_stream_));
// Stream hasn't gotten there yet. The cache will tell us when to send FIN.
CreateStream(Location(10, 0), 0, 128);
EXPECT_CALL(mock_uni_stream_, Writev).Times(0);
publisher_->OnNewFinAvailable(Location(10, 1), 0);
}
TEST_F(SubscriptionPublisherTest, OnNewFinAvailableWithStream) {
CreateStream(Location(1, 0), 0, 128);
EXPECT_CALL(mock_uni_stream_, Writev)
.WillOnce([](absl::Span<quiche::QuicheMemSlice> data,
const webtransport::StreamWriteOptions& options) {
EXPECT_TRUE(data.empty());
EXPECT_TRUE(options.send_fin());
return absl::OkStatus();
});
quic::test::MockAlarmFactory alarm_factory;
EXPECT_CALL(visitor_, alarm_factory).WillOnce(Return(&alarm_factory));
publisher_->OnNewFinAvailable(Location(1, 0), 0);
}
TEST_F(SubscriptionPublisherTest, OnSubgroupAbandonedNoEffect) {
// Not in window
MessageParameters params;
params.subscription_filter = SubscriptionFilter(Location(10, 0), 10);
publisher_->Update(params);
EXPECT_CALL(webtrans_, GetStreamById).Times(0);
publisher_->OnSubgroupAbandoned(1, 0, 17);
// In window but no stream
publisher_->Update(parameters_);
EXPECT_CALL(webtrans_, GetStreamById).Times(0);
publisher_->OnSubgroupAbandoned(1, 0, 17);
}
TEST_F(SubscriptionPublisherTest, OnGroupAbandoned) {
// Not in window
MessageParameters params;
params.subscription_filter = SubscriptionFilter(Location(10, 0), 10);
publisher_->Update(params);
EXPECT_CALL(webtrans_, GetStreamById).Times(0);
publisher_->OnGroupAbandoned(1);
// In window
publisher_->Update(parameters_);
EXPECT_CALL(webtrans_, GetStreamById).Times(0);
publisher_->OnGroupAbandoned(1);
EXPECT_CALL(*track_publisher_, GetCachedObject).Times(0);
publisher_->OnNewObjectAvailable(Location(1, 0), 0, 128);
}
TEST_F(SubscriptionPublisherTest, OnGroupAbandonedWithStreams) {
// The delivery timeout is not infinite, so it will not send a PUBLISH_DONE
// with kTooFarBehind.
CreateStream(Location(1, 0), 0, 128);
EXPECT_CALL(mock_uni_stream_, ResetWithUserCode);
EXPECT_CALL(mock_bidi_stream_, Writev).Times(0); // No PUBLISH_DONE.
publisher_->OnGroupAbandoned(1);
}
TEST_F(SubscriptionPublisherTest, OnGroupAbandonedTooFarBehind) {
// Set the delivery timeout to infinite so that TooFarBehind is possible.
parameters_.delivery_timeout = quic::QuicTimeDelta::Infinite();
publisher_->Update(parameters_);
CreateStream(Location(5, 0), 0, 128);
struct MoqtPublishDone expected_publish_done = {
/*request_id=*/kRequestId,
PublishDoneCode::kTooFarBehind,
/*stream_count=*/1,
/*error_reason=*/"",
};
EXPECT_CALL(mock_bidi_stream_, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(mock_bidi_stream_,
Writev(SerializedControlMessage(expected_publish_done), _));
EXPECT_CALL(visitor_, PublishIsDone(kRequestId));
publisher_->OnGroupAbandoned(5);
}
TEST_F(SubscriptionPublisherTest, OnCanCreateNewUniStreamPendingCleanup) {
CreatePendingStream(Location(1, 0), 0, 128);
// Abandon the group.
publisher_->OnGroupAbandoned(1);
// OnCanCreateNewUniStream should clean it up; no attempt to create a stream.
EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream)
.WillOnce(Return(true));
EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream).Times(0);
publisher_->OnCanCreateNewUniStream();
}
TEST_F(SubscriptionPublisherTest, AlternateDeliveryTimeoutSetAlarm) {
ON_CALL(visitor_, alternate_delivery_timeout).WillByDefault(Return(true));
// Create a stream for group 1.
CreateStream(Location(1, 0), 0, 128);
// Create a pending stream for group 2, which should start the timer but does
// less work than an active stream.
EXPECT_CALL(visitor_, alarm_factory).WillOnce(Return(&alarm_factory_));
CreatePendingStream(Location(2, 0), 0, 128);
}
TEST_F(SubscriptionPublisherTest, OnTrackPublisherGone) {
EXPECT_CALL(mock_bidi_stream_, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(mock_bidi_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kPublishDone), _))
.WillOnce(Return(absl::OkStatus()));
EXPECT_CALL(visitor_, PublishIsDone(1));
publisher_->OnTrackPublisherGone();
}
TEST_F(SubscriptionPublisherTest, ProcessObjectAck) {
MoqtObjectAck ack;
ack.group_id = 1;
ack.object_id = 2;
ack.delta_from_deadline = quic::QuicTimeDelta::FromMilliseconds(100);
EXPECT_CALL(monitoring_interface_,
OnObjectAckReceived(Location(1, 2), ack.delta_from_deadline));
publisher_->ProcessObjectAck(ack);
}
TEST_F(SubscriptionPublisherTest, OnSubgroupAbandonedWithStream) {
CreateStream(Location(1, 0), 0, 128);
EXPECT_CALL(mock_uni_stream_, ResetWithUserCode(17));
publisher_->OnSubgroupAbandoned(1, 0, 17);
}
TEST_F(SubscriptionPublisherTest, OnCanCreateNewUniStreamSuccess) {
CreatePendingStream(Location(1, 0), 0, 128);
// Call OnCanCreateNewUniStream and succeed.
EXPECT_CALL(mock_uni_stream_, GetStreamId())
.WillRepeatedly(Return(kStreamId));
EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
.WillRepeatedly(Return(true));
EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
.WillOnce(Return(&mock_uni_stream_));
EXPECT_CALL(mock_uni_stream_, SetVisitor)
.WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
uni_stream_ = std::move(visitor);
});
EXPECT_CALL(mock_uni_stream_, SetPriority);
EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
return uni_stream_.get();
});
EXPECT_CALL(mock_uni_stream_, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(*track_publisher_,
GetCachedObject(1, std::optional<uint64_t>(0), 0, 0))
.WillOnce(Return(DefaultPublishedObject(Location(1, 0), 0, 128)));
EXPECT_CALL(*track_publisher_,
GetCachedObject(1, std::optional<uint64_t>(0), 1, 0))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(mock_uni_stream_, Writev).WillOnce(Return(absl::OkStatus()));
publisher_->OnCanCreateNewUniStream();
}
TEST_F(SubscriptionPublisherTest, PendingStreamsInOrder) {
CreatePendingStream(Location(1, 0), 0, 128);
CreatePendingStream(Location(0, 0), 0, 128);
CreatePendingStream(Location(2, 0), 0, 127);
// Should be opened in the order (2, 0), (0, 0), (1, 0),
// Open stream and send (2, 0).
EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(true))
.WillOnce(Return(true))
.WillOnce(Return(false));
EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
.WillOnce(Return(&mock_uni_stream_));
EXPECT_CALL(mock_uni_stream_, GetStreamId).WillRepeatedly(Return(kStreamId));
std::unique_ptr<webtransport::StreamVisitor> stream_visitor2;
EXPECT_CALL(mock_uni_stream_, SetVisitor)
.WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
stream_visitor2 = std::move(visitor);
});
EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
return stream_visitor2.get();
});
EXPECT_CALL(mock_uni_stream_, SetPriority);
EXPECT_CALL(*track_publisher_,
GetCachedObject(2, std::optional<uint64_t>(0), 0, 0))
.WillOnce(Return(DefaultPublishedObject(Location(2, 0), 0, 127)));
EXPECT_CALL(*track_publisher_,
GetCachedObject(2, std::optional<uint64_t>(0), 1, 0))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(mock_uni_stream_, CanWrite).WillRepeatedly(Return(true));
EXPECT_CALL(mock_uni_stream_, Writev).WillOnce(Return(absl::OkStatus()));
EXPECT_CALL(visitor_, UpdateTrackPriority);
publisher_->OnCanCreateNewUniStream();
// Open (0, 0)
EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(true))
.WillOnce(Return(true))
.WillOnce(Return(false));
EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
.WillOnce(Return(&mock_uni_stream_));
EXPECT_CALL(mock_uni_stream_, GetStreamId)
.WillRepeatedly(Return(kStreamId + 4));
std::unique_ptr<webtransport::StreamVisitor> stream_visitor0;
EXPECT_CALL(mock_uni_stream_, SetVisitor)
.WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
stream_visitor0 = std::move(visitor);
});
EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
return stream_visitor0.get();
});
EXPECT_CALL(mock_uni_stream_, SetPriority);
EXPECT_CALL(*track_publisher_,
GetCachedObject(0, std::optional<uint64_t>(0), 0, 0))
.WillOnce(Return(DefaultPublishedObject(Location(0, 0), 0, 128)));
EXPECT_CALL(*track_publisher_,
GetCachedObject(0, std::optional<uint64_t>(0), 1, 0))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(mock_uni_stream_, CanWrite).WillRepeatedly(Return(true));
EXPECT_CALL(mock_uni_stream_, Writev).WillOnce(Return(absl::OkStatus()));
EXPECT_CALL(visitor_, UpdateTrackPriority);
publisher_->OnCanCreateNewUniStream();
// Open (1, 0)
EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
.WillRepeatedly(Return(true)); // Unlimited credit but only one stream.
EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
.WillOnce(Return(&mock_uni_stream_));
EXPECT_CALL(mock_uni_stream_, GetStreamId)
.WillRepeatedly(Return(kStreamId + 8));
std::unique_ptr<webtransport::StreamVisitor> stream_visitor1;
EXPECT_CALL(mock_uni_stream_, SetVisitor)
.WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
stream_visitor1 = std::move(visitor);
});
EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
return stream_visitor1.get();
});
EXPECT_CALL(mock_uni_stream_, SetPriority);
EXPECT_CALL(*track_publisher_,
GetCachedObject(1, std::optional<uint64_t>(0), 0, 0))
.WillOnce(Return(DefaultPublishedObject(Location(1, 0), 0, 128)));
EXPECT_CALL(*track_publisher_,
GetCachedObject(1, std::optional<uint64_t>(0), 1, 0))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(mock_uni_stream_, CanWrite).WillRepeatedly(Return(true));
EXPECT_CALL(mock_uni_stream_, Writev).WillOnce(Return(absl::OkStatus()));
EXPECT_CALL(visitor_, UpdateTrackPriority).Times(0);
publisher_->OnCanCreateNewUniStream();
}
TEST_F(SubscriptionPublisherTest, OnDataStreamDestroyed) {
CreateStream(Location(1, 0), 0, 128);
DataStreamIndex index(1, 0);
publisher_->OnDataStreamDestroyed(index);
// No entries in the stream map.
EXPECT_CALL(webtrans_, GetStreamById).Times(0);
parameters_.subscriber_priority = 20;
publisher_->Update(parameters_);
}
TEST_F(SubscriptionPublisherTest, OnObjectSentTwice) {
publisher_->OnObjectSent(Location(1, 0));
EXPECT_TRUE(
SubscriptionPublisherPeer::largest_sent(publisher_.get()).has_value() &&
*SubscriptionPublisherPeer::largest_sent(publisher_.get()) ==
Location(1, 0));
}
TEST_F(SubscriptionPublisherTest, AlternateDeliveryTimeout) {
EXPECT_CALL(visitor_, alternate_delivery_timeout)
.WillRepeatedly(Return(true));
CreateStream(Location(0, 0), 0, 128);
// Save the visitor before it's overwritten.
std::unique_ptr<webtransport::StreamVisitor> uni_stream =
std::move(uni_stream_);
CreateStream(Location(0, 1), 1, 200);
std::unique_ptr<webtransport::StreamVisitor> uni_stream1 =
std::move(uni_stream_);
// Timers aren't running.
EXPECT_EQ(OutgoingSubgroupStreamPeer::GetAlarm(
absl::down_cast<OutgoingSubgroupStream*>(uni_stream.get())),
nullptr);
EXPECT_EQ(OutgoingSubgroupStreamPeer::GetAlarm(
absl::down_cast<OutgoingSubgroupStream*>(uni_stream1.get())),
nullptr);
// Second group starts the timer.
EXPECT_CALL(mock_uni_stream_, visitor)
.WillOnce(Return(uni_stream.get()))
.WillOnce(Return(uni_stream1.get()))
.WillRepeatedly([&]() { return uni_stream_.get(); });
CreateStream(Location(1, 0), 0, 128);
// Group 0 streams now have a timer running.
EXPECT_NE(OutgoingSubgroupStreamPeer::GetAlarm(
absl::down_cast<OutgoingSubgroupStream*>(uni_stream.get())),
nullptr);
EXPECT_NE(OutgoingSubgroupStreamPeer::GetAlarm(
absl::down_cast<OutgoingSubgroupStream*>(uni_stream1.get())),
nullptr);
// No timer on group 1.
EXPECT_EQ(OutgoingSubgroupStreamPeer::GetAlarm(
absl::down_cast<OutgoingSubgroupStream*>(uni_stream_.get())),
nullptr);
}
TEST_F(SubscriptionPublisherTest, IncomingUpdateTruncatesSubscription) {
// Track gets to Group 5.
CreateStream(Location(5, 0), 0, 128);
parameters_.subscription_filter = SubscriptionFilter(Location(0, 0), 4);
publisher_->Update(parameters_);
EXPECT_CALL(*track_publisher_, GetCachedObject).Times(0);
publisher_->OnNewObjectAvailable(Location(5, 1), 0, 128);
}
} // namespace
} // namespace moqt::test